소스 검색

Atlas: persist wikidata/trends payload + store lookup fixes

Lukas Goldschmidt 1 개월 전
부모
커밋
585bb57ba1
8개의 변경된 파일126개의 추가작업 그리고 31개의 파일을 삭제
  1. 1 1
      README.md
  2. 14 0
      app/atlas.py
  3. 5 1
      app/mcp_server.py
  4. 26 19
      app/storage_service.py
  5. 14 0
      app/triple_export.py
  6. 36 10
      app/virtuoso_store.py
  7. 11 0
      ontology/atlas.ttl
  8. 19 0
      tests/test_atlas_contracts.py

+ 1 - 1
README.md

@@ -1,4 +1,4 @@
-# Atlas MVP
+# Atlas McP
 
 Atlas-MCP implements the semantic intelligence tier for the existing MCP stack. It follows the manifest’s mandate: Atlas is the only layer that resolves and enriches entities. For now, Atlas has exactly two public responsibilities: entity resolution and enrichment. The facts-mcp docs reinforce the same design pressure: keep the authoritative truth layer small, canonical, and explicit; Atlas should not blur into that role, but instead cooperate with it through clean graph contracts.
 

+ 14 - 0
app/atlas.py

@@ -27,11 +27,25 @@ async def resolve_entity(subject: str, context: str | None = None) -> AtlasEntit
     token = normalized.strip().lower()
     cached = _entity_cache.get(token)
     if cached is not None:
+        try:
+            await _storage.write_entity(cached)
+        except Exception:
+            pass
         return cached
 
     virt_hit = await _virtuoso_store.lookup(token)
     if virt_hit is not None:
+        # Make the returned raw payload reflect the original caller input
+        # (so tests and UI/debug output stay stable).
+        if isinstance(virt_hit.raw_payload, dict):
+            virt_hit.raw_payload.setdefault("source", "virtuoso")
+            virt_hit.raw_payload["raw"] = subject
+            virt_hit.raw_payload["normalized"] = normalized
         _entity_cache.store(virt_hit, extra_tokens=[subject, normalized])
+        try:
+            await _storage.write_entity(virt_hit)
+        except Exception:
+            pass
         return virt_hit
 
     resolution = resolve_entity_via_trends(subject)

+ 5 - 1
app/mcp_server.py

@@ -47,7 +47,11 @@ async def resolve_entity_tool(subject: str, context: str | None = None, debug: b
             for provenance in entity.provenance
         ],
         "g_trends_payload": {k: v for k, v in entity.raw_payload.items() if k != "wikidata"},
-        "wikidata_payload": entity.raw_payload.get("wikidata"),
+        "wikidata_payload": (
+            entity.raw_payload.get("wikidata")
+            if entity.raw_payload.get("wikidata") is not None
+            else {"status": "missing"}
+        ),
     }
     if debug:
         raw_claims, derived_claims = build_claim_sets(entity)

+ 26 - 19
app/storage_service.py

@@ -2,19 +2,20 @@
 
 from __future__ import annotations
 
+import json
+import logging
 import os
 from typing import Any, Awaitable, Callable
-
-from mcp import ClientSession
-from mcp.client.sse import sse_client
+from urllib.request import Request, urlopen
 
 from app.models import AtlasEntity
 from app.triple_export import entity_to_turtle
 
+logger = logging.getLogger(__name__)
+
 ATLAS_GRAPH_IRI = os.getenv("ATLAS_GRAPH_IRI", "http://world.eu.org/atlas_data#")
-VIRTUOSO_MCP_SSE_URL = os.getenv("ATLAS_VIRTUOSO_MCP_SSE_URL", "http://192.168.0.249:8501/mcp/sse")
-VIRTUOSO_MCP_TIMEOUT = float(os.getenv("ATLAS_VIRTUOSO_MCP_TIMEOUT", "10"))
-VIRTUOSO_MCP_SSE_READ_TIMEOUT = float(os.getenv("ATLAS_VIRTUOSO_MCP_SSE_READ_TIMEOUT", str(60 * 5)))
+VIRTUOSO_RPC_URL = os.getenv("ATLAS_VIRTUOSO_RPC_URL", "http://192.168.0.249:8501/rpc")
+VIRTUOSO_RPC_TIMEOUT = float(os.getenv("ATLAS_VIRTUOSO_RPC_TIMEOUT", "20"))
 
 CallToolFn = Callable[[str, dict[str, Any]], Awaitable[dict[str, Any]]]
 
@@ -43,19 +44,19 @@ class AtlasStorageService:
         if self._call_tool_override:
             return await self._call_tool_override(tool_name, payload)
 
-        async with sse_client(
-            VIRTUOSO_MCP_SSE_URL,
-            timeout=VIRTUOSO_MCP_TIMEOUT,
-            sse_read_timeout=VIRTUOSO_MCP_SSE_READ_TIMEOUT,
-        ) as (read_stream, write_stream):
-            async with ClientSession(read_stream, write_stream) as session:
-                await session.initialize()
-                result = await session.call_tool(tool_name, payload)
-                if result.isError:
-                    raise RuntimeError(f"Tool {tool_name} failed: {result.content}")
-                if isinstance(result.structuredContent, dict):
-                    return result.structuredContent
-                return {"content": result.content}
+        request = Request(
+            VIRTUOSO_RPC_URL,
+            data=json.dumps({"tool": tool_name, "input": payload}).encode("utf-8"),
+            headers={"Content-Type": "application/json"},
+            method="POST",
+        )
+        with urlopen(request, timeout=VIRTUOSO_RPC_TIMEOUT) as response:
+            data = json.loads(response.read().decode("utf-8"))
+        if isinstance(data, dict) and data.get("error"):
+            raise RuntimeError(f"Tool {tool_name} failed: {data['error']}")
+        if isinstance(data, dict) and "result" in data:
+            return data["result"]
+        return data
 
     async def write_entity(self, entity: AtlasEntity) -> dict[str, Any]:
         ttl = entity_to_turtle(entity)
@@ -74,6 +75,12 @@ class AtlasStorageService:
                 "result": result,
             }
         except Exception as exc:
+            logger.warning(
+                "Atlas persistence failed for %s into %s: %s",
+                entity.atlas_id,
+                ATLAS_GRAPH_IRI,
+                exc,
+            )
             return {
                 "status": "unfinished",
                 "message": "Persistence path not fully available yet",

+ 14 - 0
app/triple_export.py

@@ -2,6 +2,8 @@
 
 from __future__ import annotations
 
+import json
+
 from app.models import AtlasEntity, AtlasProvenance
 
 PREFIXES = """@prefix atlas: <http://world.eu.org/atlas_ontology#> .
@@ -84,6 +86,18 @@ def entity_to_turtle(entity: AtlasEntity) -> str:
     lines.append(f'  atlas:canonicalLabel "{_literal(entity.canonical_label)}" ;')
     if entity.canonical_description:
         lines.append(f'  atlas:canonicalDescription "{_literal(entity.canonical_description)}" ;')
+
+    # Lean raw payload persistence (as JSON strings)
+    wd = entity.raw_payload.get("wikidata") if isinstance(entity.raw_payload, dict) else None
+    if isinstance(wd, dict) and wd.get("status") == "ok":
+        lines.append(f'  atlas:rawWikidataJson "{_literal(json.dumps(wd, ensure_ascii=False))}"^^xsd:string ;')
+
+    trends_payload = entity.raw_payload.get("g_trends_payload") or {}
+    # In our current model, trends live under raw_payload keys directly (non-wikidata)
+    if isinstance(entity.raw_payload, dict):
+        trends_payload = {k: v for k, v in entity.raw_payload.items() if k != "wikidata"}
+    if isinstance(trends_payload, dict) and trends_payload:
+        lines.append(f'  atlas:rawTrendsJson "{_literal(json.dumps(trends_payload, ensure_ascii=False))}"^^xsd:string ;')
     if entity.entity_type and entity.entity_type != "unknown":
         lines.append(f"  atlas:hasCanonicalType atlas:{_safe_fragment(entity.entity_type).capitalize()} ;")
     for alias in entity.aliases:

+ 36 - 10
app/virtuoso_store.py

@@ -67,7 +67,7 @@ class VirtuosoEntityStore:
             ) as (read_stream, write_stream):
                 async with ClientSession(read_stream, write_stream) as session:
                     await session.initialize()
-                    result = await session.call_tool("sparql_query", {"query": query})
+                    result = await session.call_tool("sparql_query", {"input": {"query": query}})
                     if result.isError:
                         return None
                     payload = result.structuredContent or _content_to_json(result.content)
@@ -103,17 +103,23 @@ def _build_sparql_query(literal: str) -> str:
     esc = literal.replace("\\", "\\\\").replace("\"", "\\\"")
     return f"""
 PREFIX atlas: <{PREFIX_ATLAS}>
-SELECT ?entity ?label ?type ?mid WHERE {{
+SELECT ?entity ?label ?type ?mid ?desc ?rawWd ?rawTrends WHERE {{
   GRAPH <{ATLAS_GRAPH_IRI}> {{
-    ?entity atlas:canonicalLabel ?label .
-    OPTIONAL {{ ?entity atlas:entityType ?type. }}
+    ?entity a atlas:Entity ;
+            atlas:canonicalLabel ?label .
+    OPTIONAL {{ ?entity atlas:canonicalDescription ?desc . }}
+    OPTIONAL {{ ?entity atlas:rawWikidataJson ?rawWd . }}
+    OPTIONAL {{ ?entity atlas:rawTrendsJson ?rawTrends . }}
+
     OPTIONAL {{
-      ?entity atlas:hasExternalIdentifier ?identifier .
-      ?identifier atlas:identifierType "mid" .
-      ?identifier atlas:identifierValue ?mid .
+      ?entity atlas:hasCanonicalType ?type .
     }}
+
+    ?entity atlas:hasIdentifier ?identifier .
+    ?identifier atlas:identifierValue ?mid ;
+               atlas:identifierType atlas:Mid .
   }}
-  FILTER(LCASE(?label) = \"{esc}\")
+  FILTER(LCASE(STR(?label)) = LCASE("{esc}"))
 }}
 LIMIT 1
 """
@@ -122,11 +128,22 @@ LIMIT 1
 def _entity_from_binding(binding: dict) -> AtlasEntity:
     label = binding.get("label", {}).get("value", "")
     entity_uri = binding.get("entity", {}).get("value", "")
+    # ?type is expected to be a class node like atlas:Person
     entity_type = binding.get("type", {}).get("value", "unknown")
+    if entity_type.startswith(PREFIX_ATLAS):
+        entity_type = entity_type.split("#", 1)[-1]
+    if entity_type.startswith("http://world.eu.org/atlas_ontology#"):
+        entity_type = entity_type.split("#", 1)[-1]
     mid = binding.get("mid", {}).get("value")
     identifiers = []
     if mid:
         identifiers.append(AtlasIdentifier(value=mid, source="virtuoso", identifier_type="mid"))
+
+    desc = binding.get("desc", {}).get("value")
+    raw_wd = binding.get("rawWd", {}).get("value")
+    raw_trends = binding.get("rawTrends", {}).get("value")
+
+    atlas_id = f"atlas:mid:{mid}" if mid else f"atlas:{label.strip().lower().replace(' ', '-') }"
     provenance = [
         AtlasProvenance(
             source="virtuoso-cache",
@@ -135,11 +152,20 @@ def _entity_from_binding(binding: dict) -> AtlasEntity:
         )
     ]
     return AtlasEntity(
-        atlas_id=entity_uri or f"atlas:{label.strip().lower().replace(' ', '-')}",
+        atlas_id=atlas_id,
         canonical_label=label or entity_uri,
+        canonical_description=desc,
         entity_type=entity_type or "unknown",
         aliases=[AtlasAlias(label=label or entity_uri)],
         identifiers=identifiers,
         provenance=provenance,
-        raw_payload={"source": "virtuoso", "binding": binding},
+        raw_payload={
+            "source": "virtuoso",
+            "raw": label or entity_uri,
+            "normalized": (label or entity_uri),
+            "mid": mid,
+            "wikidata": (json.loads(raw_wd) if raw_wd else {"status": "missing"}),
+            **(json.loads(raw_trends) if raw_trends else {}),
+        },
+        needs_curation=(entity_type or "unknown") == "unknown",
     )

+ 11 - 0
ontology/atlas.ttl

@@ -272,6 +272,17 @@ atlas:projectionContext a owl:DatatypeProperty ;
   rdfs:range xsd:string ;
   rdfs:label "projection context" .
 
+# Optional raw payload persistence (lean but consistent)
+atlas:rawWikidataJson a owl:DatatypeProperty ;
+  rdfs:domain atlas:Entity ;
+  rdfs:range xsd:string ;
+  rdfs:label "raw wikidata json" .
+
+atlas:rawTrendsJson a owl:DatatypeProperty ;
+  rdfs:domain atlas:Entity ;
+  rdfs:range xsd:string ;
+  rdfs:label "raw trends json" .
+
 ### Initial canonical type catalog
 
 atlas:Person a owl:Class ;

+ 19 - 0
tests/test_atlas_contracts.py

@@ -81,6 +81,25 @@ async def test_resolve_entity_passes_context_to_classifier(monkeypatch):
     assert writes and writes[0].canonical_label == "Sample"
 
 
+@pytest.mark.anyio
+async def test_resolve_entity_persists_cached_hits(monkeypatch):
+    cached_entity = AtlasEntity(atlas_id="atlas:x", canonical_label="Cached Entity")
+    monkeypatch.setattr("app.atlas._entity_cache.get", lambda token: cached_entity)
+
+    writes = []
+
+    async def fake_write(entity):
+        writes.append(entity)
+        return {"status": "ok"}
+
+    monkeypatch.setattr(atlas_module._storage, "write_entity", fake_write)
+
+    entity = await resolve_entity("Cached Entity")
+
+    assert entity is cached_entity
+    assert writes and writes[0] is cached_entity
+
+
 @pytest.mark.anyio
 async def test_resolve_entity_marks_needs_curation(monkeypatch):
     async def fake_classifier(subject, resolution, context):