storage_service.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. """Atlas persistence/read service via virtuoso-mcp (MCP transport)."""
  2. from __future__ import annotations
  3. import os
  4. from typing import Any, Awaitable, Callable
  5. from mcp import ClientSession
  6. from mcp.client.sse import sse_client
  7. from app.models import AtlasEntity
  8. from app.triple_export import entity_to_turtle
  9. ATLAS_GRAPH_IRI = os.getenv("ATLAS_GRAPH_IRI", "http://world.eu.org/atlas_data#")
  10. VIRTUOSO_MCP_SSE_URL = os.getenv("ATLAS_VIRTUOSO_MCP_SSE_URL", "http://192.168.0.249:8501/mcp/sse")
  11. VIRTUOSO_MCP_TIMEOUT = float(os.getenv("ATLAS_VIRTUOSO_MCP_TIMEOUT", "10"))
  12. VIRTUOSO_MCP_SSE_READ_TIMEOUT = float(os.getenv("ATLAS_VIRTUOSO_MCP_SSE_READ_TIMEOUT", str(60 * 5)))
  13. CallToolFn = Callable[[str, dict[str, Any]], Awaitable[dict[str, Any]]]
  14. def _safe_fragment(value: str) -> str:
  15. value = (value or "").strip().lower()
  16. out = []
  17. for ch in value:
  18. if ch.isalnum() or ch in ["_", "-"]:
  19. out.append(ch)
  20. else:
  21. out.append("_")
  22. frag = "".join(out).strip("_")
  23. return frag or "entity"
  24. def entity_iri(entity_id: str) -> str:
  25. return f"http://world.eu.org/atlas_data#entity_{_safe_fragment(entity_id)}"
  26. class AtlasStorageService:
  27. def __init__(self, call_tool: CallToolFn | None = None):
  28. self._call_tool_override = call_tool
  29. async def _call_tool(self, tool_name: str, payload: dict[str, Any]) -> dict[str, Any]:
  30. if self._call_tool_override:
  31. return await self._call_tool_override(tool_name, payload)
  32. async with sse_client(
  33. VIRTUOSO_MCP_SSE_URL,
  34. timeout=VIRTUOSO_MCP_TIMEOUT,
  35. sse_read_timeout=VIRTUOSO_MCP_SSE_READ_TIMEOUT,
  36. ) as (read_stream, write_stream):
  37. async with ClientSession(read_stream, write_stream) as session:
  38. await session.initialize()
  39. result = await session.call_tool(tool_name, payload)
  40. if result.isError:
  41. raise RuntimeError(f"Tool {tool_name} failed: {result.content}")
  42. if isinstance(result.structuredContent, dict):
  43. return result.structuredContent
  44. return {"content": result.content}
  45. async def write_entity(self, entity: AtlasEntity) -> dict[str, Any]:
  46. ttl = entity_to_turtle(entity)
  47. try:
  48. result = await self._call_tool(
  49. "batch_insert",
  50. {
  51. "ttl": ttl,
  52. "graph": ATLAS_GRAPH_IRI,
  53. },
  54. )
  55. return {
  56. "status": "ok",
  57. "graph": ATLAS_GRAPH_IRI,
  58. "entity_id": entity.atlas_id,
  59. "result": result,
  60. }
  61. except Exception as exc:
  62. return {
  63. "status": "unfinished",
  64. "message": "Persistence path not fully available yet",
  65. "error": str(exc),
  66. "entity_id": entity.atlas_id,
  67. }
  68. async def read_entity_claims(self, entity_id: str, include_superseded: bool = False) -> dict[str, Any]:
  69. iri = entity_iri(entity_id)
  70. status_filter = "" if include_superseded else 'FILTER(?status = "active")'
  71. query = f"""
  72. PREFIX atlas: <http://world.eu.org/atlas_ontology#>
  73. SELECT ?entity ?label ?claim ?pred ?objIri ?objLit ?layer ?status ?prov ?src ?method ?conf ?ts
  74. WHERE {{
  75. VALUES ?entity {{ <{iri}> }}
  76. ?entity a atlas:Entity ;
  77. atlas:canonicalLabel ?label ;
  78. atlas:hasClaim ?claim .
  79. ?claim atlas:claimSubjectIri ?entity ;
  80. atlas:claimPredicate ?pred ;
  81. atlas:claimLayer ?layer ;
  82. atlas:claimStatus ?status .
  83. OPTIONAL {{ ?claim atlas:claimObjectIri ?objIri . }}
  84. OPTIONAL {{ ?claim atlas:claimObjectLiteral ?objLit . }}
  85. OPTIONAL {{
  86. ?claim atlas:hasProvenance ?prov .
  87. ?prov atlas:provenanceSource ?src .
  88. OPTIONAL {{ ?prov atlas:retrievalMethod ?method . }}
  89. OPTIONAL {{ ?prov atlas:confidence ?conf . }}
  90. OPTIONAL {{ ?prov atlas:retrievedAt ?ts . }}
  91. }}
  92. {status_filter}
  93. }}
  94. ORDER BY ?claim
  95. """
  96. try:
  97. result = await self._call_tool("sparql_query", {"query": query})
  98. return {
  99. "status": "ok",
  100. "entity_id": entity_id,
  101. "query": query,
  102. "result": result,
  103. }
  104. except Exception as exc:
  105. return {
  106. "status": "unfinished",
  107. "message": "Read path not fully available yet",
  108. "error": str(exc),
  109. "entity_id": entity_id,
  110. "query": query,
  111. }