storage_service.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. """Atlas persistence/read service via virtuoso-mcp (MCP transport)."""
  2. from __future__ import annotations
  3. import json
  4. import logging
  5. import os
  6. from typing import Any, Awaitable, Callable
  7. from urllib.request import Request, urlopen
  8. from app.models import AtlasEntity
  9. from app.triple_export import entity_to_turtle
  10. logger = logging.getLogger(__name__)
  11. ATLAS_GRAPH_IRI = os.getenv("ATLAS_GRAPH_IRI", "http://world.eu.org/atlas_data#")
  12. VIRTUOSO_RPC_URL = os.getenv("ATLAS_VIRTUOSO_RPC_URL", "http://192.168.0.249:8501/rpc")
  13. VIRTUOSO_RPC_TIMEOUT = float(os.getenv("ATLAS_VIRTUOSO_RPC_TIMEOUT", "20"))
  14. CallToolFn = Callable[[str, dict[str, Any]], Awaitable[dict[str, Any]]]
  15. def _safe_fragment(value: str) -> str:
  16. value = (value or "").strip().lower()
  17. out = []
  18. for ch in value:
  19. if ch.isalnum() or ch in ["_", "-"]:
  20. out.append(ch)
  21. else:
  22. out.append("_")
  23. frag = "".join(out).strip("_")
  24. return frag or "entity"
  25. def entity_iri(entity_id: str) -> str:
  26. return f"http://world.eu.org/atlas_data#entity_{_safe_fragment(entity_id)}"
  27. class AtlasStorageService:
  28. def __init__(self, call_tool: CallToolFn | None = None):
  29. self._call_tool_override = call_tool
  30. async def _call_tool(self, tool_name: str, payload: dict[str, Any]) -> dict[str, Any]:
  31. if self._call_tool_override:
  32. return await self._call_tool_override(tool_name, payload)
  33. request = Request(
  34. VIRTUOSO_RPC_URL,
  35. data=json.dumps({"tool": tool_name, "input": payload}).encode("utf-8"),
  36. headers={"Content-Type": "application/json"},
  37. method="POST",
  38. )
  39. with urlopen(request, timeout=VIRTUOSO_RPC_TIMEOUT) as response:
  40. data = json.loads(response.read().decode("utf-8"))
  41. if isinstance(data, dict) and data.get("error"):
  42. raise RuntimeError(f"Tool {tool_name} failed: {data['error']}")
  43. if isinstance(data, dict) and "result" in data:
  44. return data["result"]
  45. return data
  46. async def write_entity(self, entity: AtlasEntity) -> dict[str, Any]:
  47. ttl = entity_to_turtle(entity)
  48. try:
  49. result = await self._call_tool(
  50. "batch_insert",
  51. {
  52. "ttl": ttl,
  53. "graph": ATLAS_GRAPH_IRI,
  54. },
  55. )
  56. return {
  57. "status": "ok",
  58. "graph": ATLAS_GRAPH_IRI,
  59. "entity_id": entity.atlas_id,
  60. "result": result,
  61. }
  62. except Exception as exc:
  63. logger.warning(
  64. "Atlas persistence failed for %s into %s: %s",
  65. entity.atlas_id,
  66. ATLAS_GRAPH_IRI,
  67. exc,
  68. )
  69. return {
  70. "status": "unfinished",
  71. "message": "Persistence path not fully available yet",
  72. "error": str(exc),
  73. "entity_id": entity.atlas_id,
  74. }
  75. async def read_entity_claims(self, entity_id: str, include_superseded: bool = False) -> dict[str, Any]:
  76. iri = entity_iri(entity_id)
  77. status_filter = "" if include_superseded else 'FILTER(?status = "active")'
  78. query = f"""
  79. PREFIX atlas: <http://world.eu.org/atlas_ontology#>
  80. SELECT ?entity ?label ?claim ?pred ?objIri ?objLit ?layer ?status ?prov ?src ?method ?conf ?ts
  81. WHERE {{
  82. VALUES ?entity {{ <{iri}> }}
  83. ?entity a atlas:Entity ;
  84. atlas:canonicalLabel ?label ;
  85. atlas:hasClaim ?claim .
  86. ?claim atlas:claimSubjectIri ?entity ;
  87. atlas:claimPredicate ?pred ;
  88. atlas:claimLayer ?layer ;
  89. atlas:claimStatus ?status .
  90. OPTIONAL {{ ?claim atlas:claimObjectIri ?objIri . }}
  91. OPTIONAL {{ ?claim atlas:claimObjectLiteral ?objLit . }}
  92. OPTIONAL {{
  93. ?claim atlas:hasProvenance ?prov .
  94. ?prov atlas:provenanceSource ?src .
  95. OPTIONAL {{ ?prov atlas:retrievalMethod ?method . }}
  96. OPTIONAL {{ ?prov atlas:confidence ?conf . }}
  97. OPTIONAL {{ ?prov atlas:retrievedAt ?ts . }}
  98. }}
  99. {status_filter}
  100. }}
  101. ORDER BY ?claim
  102. """
  103. try:
  104. result = await self._call_tool("sparql_query", {"query": query})
  105. return {
  106. "status": "ok",
  107. "entity_id": entity_id,
  108. "query": query,
  109. "result": result,
  110. }
  111. except Exception as exc:
  112. return {
  113. "status": "unfinished",
  114. "message": "Read path not fully available yet",
  115. "error": str(exc),
  116. "entity_id": entity_id,
  117. "query": query,
  118. }