| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145 |
- """Virtuoso MCP bridge for cached entity lookups."""
- from __future__ import annotations
- import json
- import os
- from collections import OrderedDict
- from typing import Optional
- from mcp import ClientSession
- from mcp.client.sse import sse_client
- from app.models import AtlasAlias, AtlasEntity, AtlasIdentifier, AtlasProvenance
- VIRTUOSO_MCP_SSE_URL = os.getenv("ATLAS_VIRTUOSO_MCP_SSE_URL", "http://192.168.0.249:8501/mcp/sse")
- VIRTUOSO_MCP_TIMEOUT = float(os.getenv("ATLAS_VIRTUOSO_MCP_TIMEOUT", "10"))
- VIRTUOSO_MCP_SSE_READ_TIMEOUT = float(os.getenv("ATLAS_VIRTUOSO_MCP_SSE_READ_TIMEOUT", str(60 * 5)))
- ATLAS_GRAPH_IRI = os.getenv("ATLAS_GRAPH_IRI", "http://world.eu.org/atlas_data#")
- PREFIX_ATLAS = os.getenv("ATLAS_PREFIX_IRI", "http://world.eu.org/atlas_ontology#")
- class VirtuosoEntityStore:
- def __init__(self, max_cache_entries: int = 256):
- self.max_cache_entries = max_cache_entries
- self._cache: OrderedDict[str, AtlasEntity] = OrderedDict()
- def _cache_key(self, token: str) -> str:
- return str(token or "").strip().lower()
- def _cache_get(self, token: str) -> Optional[AtlasEntity]:
- key = self._cache_key(token)
- if not key:
- return None
- hit = self._cache.get(key)
- if hit is not None:
- self._cache.move_to_end(key)
- return hit
- def _cache_set(self, token: str, entity: AtlasEntity) -> None:
- key = self._cache_key(token)
- if not key:
- return
- self._cache[key] = entity
- self._cache.move_to_end(key)
- while len(self._cache) > self.max_cache_entries:
- self._cache.popitem(last=False)
- async def lookup(self, token: str) -> Optional[AtlasEntity]:
- cached = self._cache_get(token)
- if cached is not None:
- return cached
- entity = await self._lookup_remote(token)
- if entity is not None:
- self._cache_set(token, entity)
- return entity
- async def _lookup_remote(self, token: str) -> Optional[AtlasEntity]:
- literal = token.strip().lower()
- if not literal or not VIRTUOSO_MCP_SSE_URL:
- return None
- query = _build_sparql_query(literal)
- try:
- async with sse_client(
- VIRTUOSO_MCP_SSE_URL,
- timeout=VIRTUOSO_MCP_TIMEOUT,
- sse_read_timeout=VIRTUOSO_MCP_SSE_READ_TIMEOUT,
- ) as (read_stream, write_stream):
- async with ClientSession(read_stream, write_stream) as session:
- await session.initialize()
- result = await session.call_tool("sparql_query", {"query": query})
- if result.isError:
- return None
- payload = result.structuredContent or _content_to_json(result.content)
- if not isinstance(payload, dict):
- return None
- bindings = (
- payload.get("results", {})
- .get("bindings", [])
- if isinstance(payload.get("results"), dict)
- else []
- )
- if not bindings:
- return None
- return _entity_from_binding(bindings[0])
- except Exception:
- return None
- def _content_to_json(content):
- if not content:
- return None
- first = content[0]
- text = getattr(first, "text", None)
- if not text:
- return None
- try:
- return json.loads(text)
- except Exception:
- return None
- def _build_sparql_query(literal: str) -> str:
- esc = literal.replace("\\", "\\\\").replace("\"", "\\\"")
- return f"""
- PREFIX atlas: <{PREFIX_ATLAS}>
- SELECT ?entity ?label ?type ?mid WHERE {{
- GRAPH <{ATLAS_GRAPH_IRI}> {{
- ?entity atlas:canonicalLabel ?label .
- OPTIONAL {{ ?entity atlas:entityType ?type. }}
- OPTIONAL {{
- ?entity atlas:hasExternalIdentifier ?identifier .
- ?identifier atlas:identifierType "mid" .
- ?identifier atlas:identifierValue ?mid .
- }}
- }}
- FILTER(LCASE(?label) = \"{esc}\")
- }}
- LIMIT 1
- """
- def _entity_from_binding(binding: dict) -> AtlasEntity:
- label = binding.get("label", {}).get("value", "")
- entity_uri = binding.get("entity", {}).get("value", "")
- entity_type = binding.get("type", {}).get("value", "unknown")
- mid = binding.get("mid", {}).get("value")
- identifiers = []
- if mid:
- identifiers.append(AtlasIdentifier(value=mid, source="virtuoso", identifier_type="mid"))
- provenance = [
- AtlasProvenance(
- source="virtuoso-cache",
- retrieval_method="sparql",
- confidence=0.95,
- )
- ]
- return AtlasEntity(
- atlas_id=entity_uri or f"atlas:{label.strip().lower().replace(' ', '-')}",
- canonical_label=label or entity_uri,
- entity_type=entity_type or "unknown",
- aliases=[AtlasAlias(label=label or entity_uri)],
- identifiers=identifiers,
- provenance=provenance,
- raw_payload={"source": "virtuoso", "binding": binding},
- )
|