storage_service.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. """Atlas persistence/read service via virtuoso-mcp (MCP transport).
  2. We intentionally use the MCP SSE transport ("/mcp/sse") to match the standard across
  3. our MCP servers and avoid legacy direct "/rpc" calls.
  4. """
  5. from __future__ import annotations
  6. import json
  7. import logging
  8. import os
  9. import time
  10. from typing import Any, Awaitable, Callable
  11. from mcp import ClientSession
  12. from mcp.client.sse import sse_client
  13. from app.models import AtlasEntity
  14. from app.triple_export import entity_to_turtle
  15. logger = logging.getLogger(__name__)
  16. ATLAS_GRAPH_IRI = os.getenv("ATLAS_GRAPH_IRI", "http://world.eu.org/atlas_data#")
  17. VIRTUOSO_MCP_SSE_URL = os.getenv("ATLAS_VIRTUOSO_MCP_SSE_URL", "http://192.168.0.249:8501/mcp/sse")
  18. VIRTUOSO_MCP_TIMEOUT = float(os.getenv("ATLAS_VIRTUOSO_MCP_TIMEOUT", "10"))
  19. VIRTUOSO_MCP_SSE_READ_TIMEOUT = float(os.getenv("ATLAS_VIRTUOSO_MCP_SSE_READ_TIMEOUT", str(60 * 5)))
  20. CallToolFn = Callable[[str, dict[str, Any]], Awaitable[dict[str, Any]]]
  21. def _safe_fragment(value: str) -> str:
  22. value = (value or "").strip().lower()
  23. out = []
  24. for ch in value:
  25. if ch.isalnum() or ch in ["_", "-"]:
  26. out.append(ch)
  27. else:
  28. out.append("_")
  29. frag = "".join(out).strip("_")
  30. return frag or "entity"
  31. def entity_iri(entity_id: str) -> str:
  32. return f"http://world.eu.org/atlas_data#entity_{_safe_fragment(entity_id)}"
  33. class AtlasStorageService:
  34. def __init__(self, call_tool: CallToolFn | None = None):
  35. self._call_tool_override = call_tool
  36. self._tool_cache: dict[str, tuple[float, dict[str, Any]]] = {}
  37. self._tool_cache_ttl_seconds = float(os.getenv("ATLAS_VIRTUOSO_CALL_CACHE_TTL", "30"))
  38. def _cache_key(self, tool_name: str, payload: dict[str, Any]) -> str:
  39. return f"{tool_name}:{json.dumps(payload, sort_keys=True, separators=(',', ':'))}"
  40. def _cache_get(self, key: str) -> dict[str, Any] | None:
  41. item = self._tool_cache.get(key)
  42. if not item:
  43. return None
  44. expires_at, value = item
  45. if expires_at < time.time():
  46. self._tool_cache.pop(key, None)
  47. return None
  48. return value
  49. def _cache_set(self, key: str, value: dict[str, Any]) -> None:
  50. self._tool_cache[key] = (time.time() + self._tool_cache_ttl_seconds, value)
  51. async def _call_tool(self, tool_name: str, payload: dict[str, Any]) -> dict[str, Any]:
  52. cache_key = self._cache_key(tool_name, payload)
  53. cached = self._cache_get(cache_key)
  54. if cached is not None:
  55. return cached
  56. if self._call_tool_override:
  57. result = await self._call_tool_override(tool_name, payload)
  58. self._cache_set(cache_key, result)
  59. return result
  60. try:
  61. async with sse_client(
  62. VIRTUOSO_MCP_SSE_URL,
  63. timeout=VIRTUOSO_MCP_TIMEOUT,
  64. sse_read_timeout=VIRTUOSO_MCP_SSE_READ_TIMEOUT,
  65. ) as (read_stream, write_stream):
  66. async with ClientSession(read_stream, write_stream) as session:
  67. await session.initialize()
  68. result = await session.call_tool(tool_name, {"input": payload})
  69. if result.isError:
  70. raise RuntimeError(f"Tool {tool_name} failed: {result.error}")
  71. data = result.structuredContent if result.structuredContent is not None else result.content
  72. if isinstance(data, dict):
  73. self._cache_set(cache_key, data)
  74. return data
  75. except Exception as exc:
  76. raise RuntimeError(f"Virtuoso MCP call failed for {tool_name}: {exc}")
  77. async def write_entity(self, entity: AtlasEntity) -> dict[str, Any]:
  78. ttl = entity_to_turtle(entity)
  79. try:
  80. result = await self._call_tool(
  81. "batch_insert",
  82. {
  83. "ttl": ttl,
  84. "graph": ATLAS_GRAPH_IRI,
  85. },
  86. )
  87. return {
  88. "status": "ok",
  89. "graph": ATLAS_GRAPH_IRI,
  90. "entity_id": entity.atlas_id,
  91. "result": result,
  92. }
  93. except Exception as exc:
  94. logger.warning(
  95. "Atlas persistence failed for %s into %s: %s",
  96. entity.atlas_id,
  97. ATLAS_GRAPH_IRI,
  98. exc,
  99. )
  100. return {
  101. "status": "unfinished",
  102. "message": "Persistence path not fully available yet",
  103. "error": str(exc),
  104. "entity_id": entity.atlas_id,
  105. }
  106. async def read_entity_claims(self, entity_id: str, include_superseded: bool = False) -> dict[str, Any]:
  107. iri = entity_iri(entity_id)
  108. status_filter = "" if include_superseded else 'FILTER(?status = "active")'
  109. query = f"""
  110. PREFIX atlas: <http://world.eu.org/atlas_ontology#>
  111. SELECT ?entity ?label ?claim ?pred ?objIri ?objLit ?layer ?status ?prov ?src ?method ?conf ?ts
  112. WHERE {{
  113. VALUES ?entity {{ <{iri}> }}
  114. ?entity a atlas:Entity ;
  115. atlas:canonicalLabel ?label ;
  116. atlas:hasClaim ?claim .
  117. ?claim atlas:claimSubjectIri ?entity ;
  118. atlas:claimPredicate ?pred ;
  119. atlas:claimLayer ?layer ;
  120. atlas:claimStatus ?status .
  121. OPTIONAL {{ ?claim atlas:claimObjectIri ?objIri . }}
  122. OPTIONAL {{ ?claim atlas:claimObjectLiteral ?objLit . }}
  123. OPTIONAL {{
  124. ?claim atlas:hasProvenance ?prov .
  125. ?prov atlas:provenanceSource ?src .
  126. OPTIONAL {{ ?prov atlas:retrievalMethod ?method . }}
  127. OPTIONAL {{ ?prov atlas:confidence ?conf . }}
  128. OPTIONAL {{ ?prov atlas:retrievedAt ?ts . }}
  129. }}
  130. {status_filter}
  131. }}
  132. ORDER BY ?claim
  133. """
  134. try:
  135. result = await self._call_tool("sparql_query", {"query": query})
  136. return {
  137. "status": "ok",
  138. "entity_id": entity_id,
  139. "query": query,
  140. "result": result,
  141. }
  142. except Exception as exc:
  143. return {
  144. "status": "unfinished",
  145. "message": "Read path not fully available yet",
  146. "error": str(exc),
  147. "entity_id": entity_id,
  148. "query": query,
  149. }