from __future__ import annotations import hashlib import os from dataclasses import dataclass from typing import Any from .atlas_model import CurateFlag, Entity, Identifier from .atlas_store import _sparql_select, _sparql_update from .wikidata import WikidataSearch ATLAS = "http://world.eu.org/atlas_ontology#" ATLAS_D = "http://world.eu.org/atlas_data#" DEFAULT_ENDPOINT = os.getenv("ATLAS_VIRTUOSO_MCP_SSE_URL", "http://192.168.0.249:8501/mcp/sse") DEFAULT_UPDATE_ENDPOINT = os.getenv("ATLAS_VIRTUOSO_MCP_SSE_URL", DEFAULT_ENDPOINT) def _hash_id(subject: str) -> str: return hashlib.sha1(subject.strip().lower().encode("utf-8")).hexdigest()[:16] def _entity_iri(atlas_id: str) -> str: return f"<{ATLAS_D}entity_{atlas_id}>" def _escape_literal(value: str) -> str: return value.replace("\\", "\\\\").replace('"', '\\"').replace("\n", "\\n") def _label_lookup_query(subject: str) -> str: safe = _escape_literal(subject) return f""" PREFIX atlas: <{ATLAS}> SELECT ?atlasId ?label ?type ?qid ?alias WHERE {{ VALUES ?needle {{ \"{safe}\" }} {{ ?entity a atlas:Entity ; atlas:atlasId ?atlasId ; atlas:canonicalLabel ?label . OPTIONAL {{ ?entity atlas:hasCanonicalType ?type . }} OPTIONAL {{ ?entity atlas:hasIdentifier ?ident . ?ident atlas:scheme \"wikidata-qid\" ; atlas:value ?qid . }} FILTER(LCASE(STR(?label)) = LCASE(?needle)) }} UNION {{ ?entity a atlas:Entity ; atlas:atlasId ?atlasId ; atlas:aliasLabel ?alias . OPTIONAL {{ ?entity atlas:canonicalLabel ?label . }} OPTIONAL {{ ?entity atlas:hasCanonicalType ?type . }} OPTIONAL {{ ?entity atlas:hasIdentifier ?ident . ?ident atlas:scheme \"wikidata-qid\" ; atlas:value ?qid . }} FILTER(LCASE(STR(?alias)) = LCASE(?needle)) }} }} LIMIT 1 """.strip() async def _wikidata_lookup(subject: str) -> dict[str, Any] | None: search = WikidataSearch({"search": subject, "limit": 1}) result = await search.search() items = result.get("results", []) return items[0] if items else None def _entity_from_wikidata(subject: str, wd: dict[str, Any]) -> Entity: atlas_id = _hash_id(subject) label = wd.get("label") or subject description = wd.get("description") qid = wd.get("id") entity_type = wd.get("type") or "Thing" ent = Entity( id=atlas_id, label=label, description=description, type=f"atlas:{entity_type}" if not entity_type.startswith("atlas:") else entity_type, aliases=[subject] if subject.lower() != label.lower() else [], identifiers=[Identifier(scheme="wikidata-qid", value=qid)] if qid else [], needs_curation=False, ) return ent def _entity_to_turtle(entity: Entity) -> str: lines = [] e = _entity_iri(entity.id) lines.append(f"{e}") lines.append(" a atlas:Entity ;") lines.append(f' atlas:atlasId "{_escape_literal(entity.id)}" ;') lines.append(f' atlas:canonicalLabel "{_escape_literal(entity.label)}"@en ;') if entity.description: lines.append(f' atlas:canonicalDescription "{_escape_literal(entity.description)}"@en ;') if entity.type: lines.append(f" atlas:hasCanonicalType {entity.type} ;") for alias in entity.aliases: lines.append(f' atlas:aliasLabel "{_escape_literal(alias)}"@en ;') for ident in entity.identifiers: ident_iri = f"<{ATLAS_D}ident_{ident.scheme}_{_hash_id(ident.value)}>.".rstrip(".") lines.append(f" atlas:hasIdentifier {ident_iri} ;") lines.append(f' atlas:needsCuration "{str(entity.needs_curation).lower()}"^^xsd:boolean .') lines.append("") for ident in entity.identifiers: ident_iri = f"<{ATLAS_D}ident_{ident.scheme}_{_hash_id(ident.value)}>" lines.append(f"{ident_iri}") lines.append(" a atlas:Identifier ;") lines.append(f' atlas:scheme "{_escape_literal(ident.scheme)}" ;') lines.append(f' atlas:value "{_escape_literal(ident.value)}" .') lines.append("") return "\n".join(lines) def _flatten_exception_details(exc: BaseException) -> list[str]: parts = [f"{type(exc).__name__}: {exc}"] nested = getattr(exc, "exceptions", None) if nested: for sub in nested: parts.extend(_flatten_exception_details(sub)) return parts async def _persist_entity(entity: Entity) -> None: ttl = _entity_to_turtle(entity) query = f""" PREFIX atlas: <{ATLAS}> PREFIX atlas_data: <{ATLAS_D}> PREFIX xsd: INSERT DATA {{ GRAPH <{ATLAS_D}> {{ {ttl} }} }} """.strip() await _sparql_update(DEFAULT_UPDATE_ENDPOINT, query) async def _load_entity(subject: str) -> dict[str, Any] | None: rows = await _sparql_select(DEFAULT_ENDPOINT, _label_lookup_query(subject)) if not rows: return None row = rows[0] return { "atlas_id": row.get("atlasId", {}).get("value"), "label": row.get("label", {}).get("value"), "type": row.get("type", {}).get("value"), "wikidata_id": row.get("qid", {}).get("value"), "alias": row.get("alias", {}).get("value"), } @dataclass class ResolveService: load_entity_fn: Any = _load_entity wikidata_lookup_fn: Any = _wikidata_lookup persist_entity_fn: Any = _persist_entity async def resolve(self, *, subject: str, context: dict[str, Any] | None = None, constraints: dict[str, Any] | None = None, hints: dict[str, Any] | None = None, debug: dict[str, Any] | None = None) -> dict[str, Any]: try: subject = (subject or "").strip() if not subject: return {"status": "not_found"} stored = await self.load_entity_fn(subject) if stored: return { "status": "resolved", "atlas_id": stored.get("atlas_id"), "label": stored.get("label"), "type": stored.get("type"), "wikidata_id": stored.get("wikidata_id"), "alias": stored.get("alias") or subject, } wd = await self.wikidata_lookup_fn(subject) if not wd: return {"status": "not_found"} entity = _entity_from_wikidata(subject, wd) await self.persist_entity_fn(entity) return { "status": "resolved", "atlas_id": entity.id, "label": entity.label, "type": entity.type, "wikidata_id": wd.get("id"), "alias": subject, } except Exception as exc: detail = " | ".join(_flatten_exception_details(exc)) return { "status": "error", "error": {"code": "RESOLVE_FAILED", "message": detail}, }