|
@@ -46,11 +46,13 @@ def entity_iri(entity_id: str) -> str:
|
|
|
|
|
|
|
|
class AtlasStorageService:
|
|
class AtlasStorageService:
|
|
|
def __init__(self, call_tool: CallToolFn | None = None):
|
|
def __init__(self, call_tool: CallToolFn | None = None):
|
|
|
|
|
+ # Tests can inject a fake transport; production uses the MCP session client.
|
|
|
self._call_tool_override = call_tool
|
|
self._call_tool_override = call_tool
|
|
|
self._tool_cache: dict[str, tuple[float, dict[str, Any]]] = {}
|
|
self._tool_cache: dict[str, tuple[float, dict[str, Any]]] = {}
|
|
|
self._tool_cache_ttl_seconds = float(os.getenv("ATLAS_VIRTUOSO_CALL_CACHE_TTL", "30"))
|
|
self._tool_cache_ttl_seconds = float(os.getenv("ATLAS_VIRTUOSO_CALL_CACHE_TTL", "30"))
|
|
|
|
|
|
|
|
def _cache_key(self, tool_name: str, payload: dict[str, Any]) -> str:
|
|
def _cache_key(self, tool_name: str, payload: dict[str, Any]) -> str:
|
|
|
|
|
+ # Stable keying keeps equivalent tool calls from duplicating work.
|
|
|
return f"{tool_name}:{json.dumps(payload, sort_keys=True, separators=(',', ':'))}"
|
|
return f"{tool_name}:{json.dumps(payload, sort_keys=True, separators=(',', ':'))}"
|
|
|
|
|
|
|
|
def _cache_get(self, key: str) -> dict[str, Any] | None:
|
|
def _cache_get(self, key: str) -> dict[str, Any] | None:
|
|
@@ -67,6 +69,7 @@ class AtlasStorageService:
|
|
|
self._tool_cache[key] = (time.time() + self._tool_cache_ttl_seconds, value)
|
|
self._tool_cache[key] = (time.time() + self._tool_cache_ttl_seconds, value)
|
|
|
|
|
|
|
|
async def _call_tool(self, tool_name: str, payload: dict[str, Any], *, cache_result: bool = True) -> dict[str, Any]:
|
|
async def _call_tool(self, tool_name: str, payload: dict[str, Any], *, cache_result: bool = True) -> dict[str, Any]:
|
|
|
|
|
+ # Cache read-heavy calls, but let write paths pass through untouched.
|
|
|
cache_key = self._cache_key(tool_name, payload)
|
|
cache_key = self._cache_key(tool_name, payload)
|
|
|
if cache_result:
|
|
if cache_result:
|
|
|
cached = self._cache_get(cache_key)
|
|
cached = self._cache_get(cache_key)
|
|
@@ -98,6 +101,7 @@ class AtlasStorageService:
|
|
|
raise RuntimeError(f"Virtuoso MCP call failed for {tool_name}: {exc}")
|
|
raise RuntimeError(f"Virtuoso MCP call failed for {tool_name}: {exc}")
|
|
|
|
|
|
|
|
async def write_entity(self, entity: AtlasEntity) -> dict[str, Any]:
|
|
async def write_entity(self, entity: AtlasEntity) -> dict[str, Any]:
|
|
|
|
|
+ # Turn an Atlas entity into Turtle, then hand it to Virtuoso in one insert.
|
|
|
ttl = entity_to_turtle(entity)
|
|
ttl = entity_to_turtle(entity)
|
|
|
try:
|
|
try:
|
|
|
result = await self._call_tool(
|
|
result = await self._call_tool(
|
|
@@ -129,6 +133,7 @@ class AtlasStorageService:
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
async def read_entity_claims(self, entity_id: str, include_superseded: bool = False) -> dict[str, Any]:
|
|
async def read_entity_claims(self, entity_id: str, include_superseded: bool = False) -> dict[str, Any]:
|
|
|
|
|
+ # Pull the entity's claim graph, with active claims by default.
|
|
|
iri = entity_iri(entity_id)
|
|
iri = entity_iri(entity_id)
|
|
|
status_filter = "" if include_superseded else 'FILTER(?status = "active")'
|
|
status_filter = "" if include_superseded else 'FILTER(?status = "active")'
|
|
|
query = f"""
|
|
query = f"""
|
|
@@ -182,6 +187,7 @@ ORDER BY ?claim
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
async def sparql_update(self, query: str) -> dict[str, Any]:
|
|
async def sparql_update(self, query: str) -> dict[str, Any]:
|
|
|
|
|
+ # Write raw SPARQL when a higher-level helper would just get in the way.
|
|
|
return await self._call_tool("sparql_update", {"query": query}, cache_result=False)
|
|
return await self._call_tool("sparql_update", {"query": query}, cache_result=False)
|
|
|
|
|
|
|
|
async def supersede_claims(self, claim_iris: list[str]) -> None:
|
|
async def supersede_claims(self, claim_iris: list[str]) -> None:
|
|
@@ -201,6 +207,7 @@ WHERE {{
|
|
|
await self.sparql_update(query)
|
|
await self.sparql_update(query)
|
|
|
|
|
|
|
|
async def replace_entity_core(self, entity_id: str, *, canonical_label: str, canonical_description: str | None, canonical_type: str | None) -> None:
|
|
async def replace_entity_core(self, entity_id: str, *, canonical_label: str, canonical_description: str | None, canonical_type: str | None) -> None:
|
|
|
|
|
+ # Replace the entity's canonical fields without disturbing its claims.
|
|
|
iri = entity_iri(entity_id)
|
|
iri = entity_iri(entity_id)
|
|
|
desc_insert = f' <{iri}> atlas:canonicalDescription "{canonical_description.replace("\\", "\\\\").replace("\"", "\\\"")}" .\n' if canonical_description else ""
|
|
desc_insert = f' <{iri}> atlas:canonicalDescription "{canonical_description.replace("\\", "\\\\").replace("\"", "\\\"")}" .\n' if canonical_description else ""
|
|
|
type_insert = f" <{iri}> atlas:hasCanonicalType atlas:{canonical_type} .\n" if canonical_type else ""
|
|
type_insert = f" <{iri}> atlas:hasCanonicalType atlas:{canonical_type} .\n" if canonical_type else ""
|