"""Virtuoso MCP bridge for cached entity lookups.""" from __future__ import annotations import json import os from collections import OrderedDict from typing import Optional from mcp import ClientSession from mcp.client.sse import sse_client from app.models import AtlasAlias, AtlasEntity, AtlasIdentifier, AtlasProvenance VIRTUOSO_MCP_SSE_URL = os.getenv("ATLAS_VIRTUOSO_MCP_SSE_URL", "http://192.168.0.249:8501/mcp/sse") VIRTUOSO_MCP_TIMEOUT = float(os.getenv("ATLAS_VIRTUOSO_MCP_TIMEOUT", "10")) VIRTUOSO_MCP_SSE_READ_TIMEOUT = float(os.getenv("ATLAS_VIRTUOSO_MCP_SSE_READ_TIMEOUT", str(60 * 5))) ATLAS_GRAPH_IRI = os.getenv("ATLAS_GRAPH_IRI", "http://world.eu.org/atlas_data#") PREFIX_ATLAS = os.getenv("ATLAS_PREFIX_IRI", "http://world.eu.org/atlas_ontology#") class VirtuosoEntityStore: def __init__(self, max_cache_entries: int = 256): self.max_cache_entries = max_cache_entries self._cache: OrderedDict[str, AtlasEntity] = OrderedDict() def _cache_key(self, token: str) -> str: return str(token or "").strip().lower() def _cache_get(self, token: str) -> Optional[AtlasEntity]: key = self._cache_key(token) if not key: return None hit = self._cache.get(key) if hit is not None: self._cache.move_to_end(key) return hit def _cache_set(self, token: str, entity: AtlasEntity) -> None: key = self._cache_key(token) if not key: return self._cache[key] = entity self._cache.move_to_end(key) while len(self._cache) > self.max_cache_entries: self._cache.popitem(last=False) async def lookup(self, token: str) -> Optional[AtlasEntity]: cached = self._cache_get(token) if cached is not None: return cached entity = await self._lookup_remote(token) if entity is not None: self._cache_set(token, entity) return entity async def _lookup_remote(self, token: str) -> Optional[AtlasEntity]: literal = token.strip().lower() if not literal or not VIRTUOSO_MCP_SSE_URL: return None query = _build_sparql_query(literal) try: async with sse_client( VIRTUOSO_MCP_SSE_URL, timeout=VIRTUOSO_MCP_TIMEOUT, sse_read_timeout=VIRTUOSO_MCP_SSE_READ_TIMEOUT, ) as (read_stream, write_stream): async with ClientSession(read_stream, write_stream) as session: await session.initialize() result = await session.call_tool("sparql_query", {"query": query}) if result.isError: return None payload = result.structuredContent or _content_to_json(result.content) if not isinstance(payload, dict): return None bindings = ( payload.get("results", {}) .get("bindings", []) if isinstance(payload.get("results"), dict) else [] ) if not bindings: return None return _entity_from_binding(bindings[0]) except Exception: return None def _content_to_json(content): if not content: return None first = content[0] text = getattr(first, "text", None) if not text: return None try: return json.loads(text) except Exception: return None def _build_sparql_query(literal: str) -> str: esc = literal.replace("\\", "\\\\").replace("\"", "\\\"") return f""" PREFIX atlas: <{PREFIX_ATLAS}> SELECT ?entity ?label ?type ?mid WHERE {{ GRAPH <{ATLAS_GRAPH_IRI}> {{ ?entity atlas:canonicalLabel ?label . OPTIONAL {{ ?entity atlas:entityType ?type. }} OPTIONAL {{ ?entity atlas:hasExternalIdentifier ?identifier . ?identifier atlas:identifierType "mid" . ?identifier atlas:identifierValue ?mid . }} }} FILTER(LCASE(?label) = \"{esc}\") }} LIMIT 1 """ def _entity_from_binding(binding: dict) -> AtlasEntity: label = binding.get("label", {}).get("value", "") entity_uri = binding.get("entity", {}).get("value", "") entity_type = binding.get("type", {}).get("value", "unknown") mid = binding.get("mid", {}).get("value") identifiers = [] if mid: identifiers.append(AtlasIdentifier(value=mid, source="virtuoso", identifier_type="mid")) provenance = [ AtlasProvenance( source="virtuoso-cache", retrieval_method="sparql", confidence=0.95, ) ] return AtlasEntity( atlas_id=entity_uri or f"atlas:{label.strip().lower().replace(' ', '-')}", canonical_label=label or entity_uri, entity_type=entity_type or "unknown", aliases=[AtlasAlias(label=label or entity_uri)], identifiers=identifiers, provenance=provenance, raw_payload={"source": "virtuoso", "binding": binding}, )