storage_service.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. """Atlas persistence/read service via virtuoso-mcp (MCP transport).
  2. We intentionally use the MCP SSE transport ("/mcp/sse") to match the standard across
  3. our MCP servers and avoid legacy direct "/rpc" calls.
  4. """
  5. from __future__ import annotations
  6. import json
  7. import logging
  8. import os
  9. from typing import Any, Awaitable, Callable
  10. from mcp import ClientSession
  11. from mcp.client.sse import sse_client
  12. from app.models import AtlasEntity
  13. from app.triple_export import entity_to_turtle
  14. logger = logging.getLogger(__name__)
  15. ATLAS_GRAPH_IRI = os.getenv("ATLAS_GRAPH_IRI", "http://world.eu.org/atlas_data#")
  16. VIRTUOSO_MCP_SSE_URL = os.getenv("ATLAS_VIRTUOSO_MCP_SSE_URL", "http://192.168.0.249:8501/mcp/sse")
  17. VIRTUOSO_MCP_TIMEOUT = float(os.getenv("ATLAS_VIRTUOSO_MCP_TIMEOUT", "10"))
  18. VIRTUOSO_MCP_SSE_READ_TIMEOUT = float(os.getenv("ATLAS_VIRTUOSO_MCP_SSE_READ_TIMEOUT", str(60 * 5)))
  19. CallToolFn = Callable[[str, dict[str, Any]], Awaitable[dict[str, Any]]]
  20. def _safe_fragment(value: str) -> str:
  21. value = (value or "").strip().lower()
  22. out = []
  23. for ch in value:
  24. if ch.isalnum() or ch in ["_", "-"]:
  25. out.append(ch)
  26. else:
  27. out.append("_")
  28. frag = "".join(out).strip("_")
  29. return frag or "entity"
  30. def entity_iri(entity_id: str) -> str:
  31. return f"http://world.eu.org/atlas_data#entity_{_safe_fragment(entity_id)}"
  32. class AtlasStorageService:
  33. def __init__(self, call_tool: CallToolFn | None = None):
  34. self._call_tool_override = call_tool
  35. async def _call_tool(self, tool_name: str, payload: dict[str, Any]) -> dict[str, Any]:
  36. if self._call_tool_override:
  37. return await self._call_tool_override(tool_name, payload)
  38. try:
  39. async with sse_client(
  40. VIRTUOSO_MCP_SSE_URL,
  41. timeout=VIRTUOSO_MCP_TIMEOUT,
  42. sse_read_timeout=VIRTUOSO_MCP_SSE_READ_TIMEOUT,
  43. ) as (read_stream, write_stream):
  44. async with ClientSession(read_stream, write_stream) as session:
  45. await session.initialize()
  46. result = await session.call_tool(tool_name, {"input": payload})
  47. if result.isError:
  48. raise RuntimeError(f"Tool {tool_name} failed: {result.error}")
  49. return result.structuredContent if result.structuredContent is not None else result.content
  50. except Exception as exc:
  51. raise RuntimeError(f"Virtuoso MCP call failed for {tool_name}: {exc}")
  52. async def write_entity(self, entity: AtlasEntity) -> dict[str, Any]:
  53. ttl = entity_to_turtle(entity)
  54. try:
  55. result = await self._call_tool(
  56. "batch_insert",
  57. {
  58. "ttl": ttl,
  59. "graph": ATLAS_GRAPH_IRI,
  60. },
  61. )
  62. return {
  63. "status": "ok",
  64. "graph": ATLAS_GRAPH_IRI,
  65. "entity_id": entity.atlas_id,
  66. "result": result,
  67. }
  68. except Exception as exc:
  69. logger.warning(
  70. "Atlas persistence failed for %s into %s: %s",
  71. entity.atlas_id,
  72. ATLAS_GRAPH_IRI,
  73. exc,
  74. )
  75. return {
  76. "status": "unfinished",
  77. "message": "Persistence path not fully available yet",
  78. "error": str(exc),
  79. "entity_id": entity.atlas_id,
  80. }
  81. async def read_entity_claims(self, entity_id: str, include_superseded: bool = False) -> dict[str, Any]:
  82. iri = entity_iri(entity_id)
  83. status_filter = "" if include_superseded else 'FILTER(?status = "active")'
  84. query = f"""
  85. PREFIX atlas: <http://world.eu.org/atlas_ontology#>
  86. SELECT ?entity ?label ?claim ?pred ?objIri ?objLit ?layer ?status ?prov ?src ?method ?conf ?ts
  87. WHERE {{
  88. VALUES ?entity {{ <{iri}> }}
  89. ?entity a atlas:Entity ;
  90. atlas:canonicalLabel ?label ;
  91. atlas:hasClaim ?claim .
  92. ?claim atlas:claimSubjectIri ?entity ;
  93. atlas:claimPredicate ?pred ;
  94. atlas:claimLayer ?layer ;
  95. atlas:claimStatus ?status .
  96. OPTIONAL {{ ?claim atlas:claimObjectIri ?objIri . }}
  97. OPTIONAL {{ ?claim atlas:claimObjectLiteral ?objLit . }}
  98. OPTIONAL {{
  99. ?claim atlas:hasProvenance ?prov .
  100. ?prov atlas:provenanceSource ?src .
  101. OPTIONAL {{ ?prov atlas:retrievalMethod ?method . }}
  102. OPTIONAL {{ ?prov atlas:confidence ?conf . }}
  103. OPTIONAL {{ ?prov atlas:retrievedAt ?ts . }}
  104. }}
  105. {status_filter}
  106. }}
  107. ORDER BY ?claim
  108. """
  109. try:
  110. result = await self._call_tool("sparql_query", {"query": query})
  111. return {
  112. "status": "ok",
  113. "entity_id": entity_id,
  114. "query": query,
  115. "result": result,
  116. }
  117. except Exception as exc:
  118. return {
  119. "status": "unfinished",
  120. "message": "Read path not fully available yet",
  121. "error": str(exc),
  122. "entity_id": entity_id,
  123. "query": query,
  124. }