storage_service.py 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224
  1. """Atlas persistence/read service via virtuoso-mcp (MCP transport).
  2. We intentionally use the MCP SSE transport ("/mcp/sse") to match the standard across
  3. our MCP servers and avoid legacy direct "/rpc" calls.
  4. """
  5. from __future__ import annotations
  6. import json
  7. import logging
  8. import os
  9. import time
  10. from typing import Any, Awaitable, Callable
  11. from mcp import ClientSession
  12. from mcp.client.sse import sse_client
  13. from app.models import AtlasEntity
  14. from app.triple_export import entity_to_turtle
  15. logger = logging.getLogger(__name__)
  16. ATLAS_GRAPH_IRI = os.getenv("ATLAS_GRAPH_IRI", "http://world.eu.org/atlas_data#")
  17. VIRTUOSO_MCP_SSE_URL = os.getenv("ATLAS_VIRTUOSO_MCP_SSE_URL", "http://192.168.0.249:8501/mcp/sse")
  18. VIRTUOSO_MCP_TIMEOUT = float(os.getenv("ATLAS_VIRTUOSO_MCP_TIMEOUT", "10"))
  19. VIRTUOSO_MCP_SSE_READ_TIMEOUT = float(os.getenv("ATLAS_VIRTUOSO_MCP_SSE_READ_TIMEOUT", str(60 * 5)))
  20. CallToolFn = Callable[[str, dict[str, Any]], Awaitable[dict[str, Any]]]
  21. def _safe_fragment(value: str) -> str:
  22. value = (value or "").strip().lower()
  23. out = []
  24. for ch in value:
  25. if ch.isalnum() or ch in ["_", "-"]:
  26. out.append(ch)
  27. else:
  28. out.append("_")
  29. frag = "".join(out).strip("_")
  30. return frag or "entity"
  31. def entity_iri(entity_id: str) -> str:
  32. return f"http://world.eu.org/atlas_data#entity_{_safe_fragment(entity_id)}"
  33. class AtlasStorageService:
  34. def __init__(self, call_tool: CallToolFn | None = None):
  35. self._call_tool_override = call_tool
  36. self._tool_cache: dict[str, tuple[float, dict[str, Any]]] = {}
  37. self._tool_cache_ttl_seconds = float(os.getenv("ATLAS_VIRTUOSO_CALL_CACHE_TTL", "30"))
  38. def _cache_key(self, tool_name: str, payload: dict[str, Any]) -> str:
  39. return f"{tool_name}:{json.dumps(payload, sort_keys=True, separators=(',', ':'))}"
  40. def _cache_get(self, key: str) -> dict[str, Any] | None:
  41. item = self._tool_cache.get(key)
  42. if not item:
  43. return None
  44. expires_at, value = item
  45. if expires_at < time.time():
  46. self._tool_cache.pop(key, None)
  47. return None
  48. return value
  49. def _cache_set(self, key: str, value: dict[str, Any]) -> None:
  50. self._tool_cache[key] = (time.time() + self._tool_cache_ttl_seconds, value)
  51. async def _call_tool(self, tool_name: str, payload: dict[str, Any], *, cache_result: bool = True) -> dict[str, Any]:
  52. cache_key = self._cache_key(tool_name, payload)
  53. if cache_result:
  54. cached = self._cache_get(cache_key)
  55. if cached is not None:
  56. return cached
  57. if self._call_tool_override:
  58. result = await self._call_tool_override(tool_name, payload)
  59. if cache_result:
  60. self._cache_set(cache_key, result)
  61. return result
  62. try:
  63. async with sse_client(
  64. VIRTUOSO_MCP_SSE_URL,
  65. timeout=VIRTUOSO_MCP_TIMEOUT,
  66. sse_read_timeout=VIRTUOSO_MCP_SSE_READ_TIMEOUT,
  67. ) as (read_stream, write_stream):
  68. async with ClientSession(read_stream, write_stream) as session:
  69. await session.initialize()
  70. result = await session.call_tool(tool_name, {"input": payload})
  71. if result.isError:
  72. raise RuntimeError(f"Tool {tool_name} failed: {result.error}")
  73. data = result.structuredContent if result.structuredContent is not None else result.content
  74. if cache_result and isinstance(data, dict):
  75. self._cache_set(cache_key, data)
  76. return data
  77. except Exception as exc:
  78. raise RuntimeError(f"Virtuoso MCP call failed for {tool_name}: {exc}")
  79. async def write_entity(self, entity: AtlasEntity) -> dict[str, Any]:
  80. ttl = entity_to_turtle(entity)
  81. try:
  82. result = await self._call_tool(
  83. "batch_insert",
  84. {
  85. "ttl": ttl,
  86. "graph": ATLAS_GRAPH_IRI,
  87. },
  88. cache_result=False,
  89. )
  90. return {
  91. "status": "ok",
  92. "graph": ATLAS_GRAPH_IRI,
  93. "entity_id": entity.atlas_id,
  94. "result": result,
  95. }
  96. except Exception as exc:
  97. logger.warning(
  98. "Atlas persistence failed for %s into %s: %s",
  99. entity.atlas_id,
  100. ATLAS_GRAPH_IRI,
  101. exc,
  102. )
  103. return {
  104. "status": "unfinished",
  105. "message": "Persistence path not fully available yet",
  106. "error": str(exc),
  107. "entity_id": entity.atlas_id,
  108. }
  109. async def read_entity_claims(self, entity_id: str, include_superseded: bool = False) -> dict[str, Any]:
  110. iri = entity_iri(entity_id)
  111. status_filter = "" if include_superseded else 'FILTER(?status = "active")'
  112. query = f"""
  113. PREFIX atlas: <http://world.eu.org/atlas_ontology#>
  114. SELECT ?entity ?label ?canonType ?claim ?pred ?objIri ?objLit ?idVal ?idType ?layer ?status ?prov ?src ?method ?conf ?ts
  115. WHERE {{
  116. VALUES ?entity {{ <{iri}> }}
  117. ?entity a atlas:Entity ;
  118. atlas:canonicalLabel ?label ;
  119. atlas:hasCanonicalType ?canonType ;
  120. atlas:hasClaim ?claim .
  121. ?claim atlas:claimSubjectIri ?entity ;
  122. atlas:claimPredicate ?pred ;
  123. atlas:claimLayer ?layer ;
  124. atlas:claimStatus ?status .
  125. OPTIONAL {{ ?claim atlas:claimObjectIri ?objIri . }}
  126. OPTIONAL {{ ?claim atlas:claimObjectLiteral ?objLit . }}
  127. OPTIONAL {{ ?objIri atlas:identifierValue ?idVal . }}
  128. OPTIONAL {{ ?objIri atlas:identifierType ?idType . }}
  129. OPTIONAL {{
  130. ?claim atlas:hasProvenance ?prov .
  131. ?prov atlas:provenanceSource ?src .
  132. OPTIONAL {{ ?prov atlas:retrievalMethod ?method . }}
  133. OPTIONAL {{ ?prov atlas:confidence ?conf . }}
  134. OPTIONAL {{ ?prov atlas:retrievedAt ?ts . }}
  135. }}
  136. {status_filter}
  137. }}
  138. ORDER BY ?claim
  139. """
  140. try:
  141. result = await self._call_tool("sparql_query", {"query": query})
  142. return {
  143. "status": "ok",
  144. "entity_id": entity_id,
  145. "query": query,
  146. "result": result,
  147. }
  148. except Exception as exc:
  149. return {
  150. "status": "unfinished",
  151. "message": "Read path not fully available yet",
  152. "error": str(exc),
  153. "entity_id": entity_id,
  154. "query": query,
  155. }
  156. async def sparql_update(self, query: str) -> dict[str, Any]:
  157. return await self._call_tool("sparql_update", {"query": query}, cache_result=False)
  158. async def supersede_claims(self, claim_iris: list[str]) -> None:
  159. if not claim_iris:
  160. return
  161. values = " ".join(f"<{uri}>" for uri in claim_iris)
  162. query = f"""
  163. PREFIX atlas: <http://world.eu.org/atlas_ontology#>
  164. WITH <{ATLAS_GRAPH_IRI}>
  165. DELETE {{ ?claim atlas:claimStatus ?old . }}
  166. INSERT {{ ?claim atlas:claimStatus "superseded" }}
  167. WHERE {{
  168. VALUES ?claim {{ {values} }}
  169. OPTIONAL {{ ?claim atlas:claimStatus ?old . }}
  170. }}
  171. """
  172. await self.sparql_update(query)
  173. async def replace_entity_core(self, entity_id: str, *, canonical_label: str, canonical_description: str | None, canonical_type: str | None) -> None:
  174. iri = entity_iri(entity_id)
  175. desc_insert = f' <{iri}> atlas:canonicalDescription "{canonical_description.replace("\\", "\\\\").replace("\"", "\\\"")}" .\n' if canonical_description else ""
  176. type_insert = f" <{iri}> atlas:hasCanonicalType atlas:{canonical_type} .\n" if canonical_type else ""
  177. query = f"""
  178. PREFIX atlas: <http://world.eu.org/atlas_ontology#>
  179. WITH <{ATLAS_GRAPH_IRI}>
  180. DELETE {{
  181. <{iri}> atlas:canonicalLabel ?oldLabel .
  182. <{iri}> atlas:canonicalDescription ?oldDesc .
  183. <{iri}> atlas:hasCanonicalType ?oldType .
  184. }}
  185. INSERT {{
  186. <{iri}> atlas:canonicalLabel "{canonical_label.replace("\\", "\\\\").replace("\"", "\\\"")}" .
  187. {desc_insert}{type_insert}}}
  188. WHERE {{
  189. OPTIONAL {{ <{iri}> atlas:canonicalLabel ?oldLabel . }}
  190. OPTIONAL {{ <{iri}> atlas:canonicalDescription ?oldDesc . }}
  191. OPTIONAL {{ <{iri}> atlas:hasCanonicalType ?oldType . }}
  192. }}
  193. """
  194. await self.sparql_update(query)