| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198 |
- from __future__ import annotations
- import hashlib
- import os
- from dataclasses import dataclass
- from typing import Any
- from .atlas_model import CurateFlag, Entity, Identifier
- from .atlas_store import _sparql_select, _sparql_update
- from .wikidata import WikidataSearch
- ATLAS = "http://world.eu.org/atlas_ontology#"
- ATLAS_D = "http://world.eu.org/atlas_data#"
- DEFAULT_ENDPOINT = os.getenv("ATLAS_VIRTUOSO_MCP_SSE_URL", "http://192.168.0.249:8501/mcp/sse")
- DEFAULT_UPDATE_ENDPOINT = os.getenv("ATLAS_VIRTUOSO_MCP_SSE_URL", DEFAULT_ENDPOINT)
- def _hash_id(subject: str) -> str:
- return hashlib.sha1(subject.strip().lower().encode("utf-8")).hexdigest()[:16]
- def _entity_iri(atlas_id: str) -> str:
- return f"<{ATLAS_D}entity_{atlas_id}>"
- def _escape_literal(value: str) -> str:
- return value.replace("\\", "\\\\").replace('"', '\\"').replace("\n", "\\n")
- def _label_lookup_query(subject: str) -> str:
- safe = _escape_literal(subject)
- return f"""
- PREFIX atlas: <{ATLAS}>
- SELECT ?atlasId ?label ?type ?qid ?alias WHERE {{
- VALUES ?needle {{ \"{safe}\" }}
- {{
- ?entity a atlas:Entity ;
- atlas:atlasId ?atlasId ;
- atlas:canonicalLabel ?label .
- OPTIONAL {{ ?entity atlas:hasCanonicalType ?type . }}
- OPTIONAL {{ ?entity atlas:hasIdentifier ?ident . ?ident atlas:scheme \"wikidata-qid\" ; atlas:value ?qid . }}
- FILTER(LCASE(STR(?label)) = LCASE(?needle))
- }}
- UNION
- {{
- ?entity a atlas:Entity ;
- atlas:atlasId ?atlasId ;
- atlas:aliasLabel ?alias .
- OPTIONAL {{ ?entity atlas:canonicalLabel ?label . }}
- OPTIONAL {{ ?entity atlas:hasCanonicalType ?type . }}
- OPTIONAL {{ ?entity atlas:hasIdentifier ?ident . ?ident atlas:scheme \"wikidata-qid\" ; atlas:value ?qid . }}
- FILTER(LCASE(STR(?alias)) = LCASE(?needle))
- }}
- }}
- LIMIT 1
- """.strip()
- async def _wikidata_lookup(subject: str) -> dict[str, Any] | None:
- search = WikidataSearch({"search": subject, "limit": 1})
- result = await search.search()
- items = result.get("results", [])
- return items[0] if items else None
- def _entity_from_wikidata(subject: str, wd: dict[str, Any]) -> Entity:
- atlas_id = _hash_id(subject)
- label = wd.get("label") or subject
- description = wd.get("description")
- qid = wd.get("id")
- entity_type = wd.get("type") or "Thing"
- ent = Entity(
- id=atlas_id,
- label=label,
- description=description,
- type=f"atlas:{entity_type}" if not entity_type.startswith("atlas:") else entity_type,
- aliases=[subject] if subject.lower() != label.lower() else [],
- identifiers=[Identifier(scheme="wikidata-qid", value=qid)] if qid else [],
- needs_curation=False,
- )
- return ent
- def _entity_to_turtle(entity: Entity) -> str:
- lines = []
- e = _entity_iri(entity.id)
- lines.append(f"{e}")
- lines.append(" a atlas:Entity ;")
- lines.append(f' atlas:atlasId "{_escape_literal(entity.id)}" ;')
- lines.append(f' atlas:canonicalLabel "{_escape_literal(entity.label)}"@en ;')
- if entity.description:
- lines.append(f' atlas:canonicalDescription "{_escape_literal(entity.description)}"@en ;')
- if entity.type:
- lines.append(f" atlas:hasCanonicalType {entity.type} ;")
- for alias in entity.aliases:
- lines.append(f' atlas:aliasLabel "{_escape_literal(alias)}"@en ;')
- for ident in entity.identifiers:
- ident_iri = f"<{ATLAS_D}ident_{ident.scheme}_{_hash_id(ident.value)}>.".rstrip(".")
- lines.append(f" atlas:hasIdentifier {ident_iri} ;")
- lines.append(f' atlas:needsCuration "{str(entity.needs_curation).lower()}"^^xsd:boolean .')
- lines.append("")
- for ident in entity.identifiers:
- ident_iri = f"<{ATLAS_D}ident_{ident.scheme}_{_hash_id(ident.value)}>"
- lines.append(f"{ident_iri}")
- lines.append(" a atlas:Identifier ;")
- lines.append(f' atlas:scheme "{_escape_literal(ident.scheme)}" ;')
- lines.append(f' atlas:value "{_escape_literal(ident.value)}" .')
- lines.append("")
- return "\n".join(lines)
- def _flatten_exception_details(exc: BaseException) -> list[str]:
- parts = [f"{type(exc).__name__}: {exc}"]
- nested = getattr(exc, "exceptions", None)
- if nested:
- for sub in nested:
- parts.extend(_flatten_exception_details(sub))
- return parts
- async def _persist_entity(entity: Entity) -> None:
- ttl = _entity_to_turtle(entity)
- query = f"""
- PREFIX atlas: <{ATLAS}>
- PREFIX atlas_data: <{ATLAS_D}>
- PREFIX xsd: <http://www.w3.org/2001/XMLSchema#>
- INSERT DATA {{
- GRAPH <{ATLAS_D}> {{
- {ttl}
- }}
- }}
- """.strip()
- await _sparql_update(DEFAULT_UPDATE_ENDPOINT, query)
- async def _load_entity(subject: str) -> dict[str, Any] | None:
- rows = await _sparql_select(DEFAULT_ENDPOINT, _label_lookup_query(subject))
- if not rows:
- return None
- row = rows[0]
- return {
- "atlas_id": row.get("atlasId", {}).get("value"),
- "label": row.get("label", {}).get("value"),
- "type": row.get("type", {}).get("value"),
- "wikidata_id": row.get("qid", {}).get("value"),
- "alias": row.get("alias", {}).get("value"),
- }
- @dataclass
- class ResolveService:
- load_entity_fn: Any = _load_entity
- wikidata_lookup_fn: Any = _wikidata_lookup
- persist_entity_fn: Any = _persist_entity
- async def resolve(self, *, subject: str, context: dict[str, Any] | None = None,
- constraints: dict[str, Any] | None = None,
- hints: dict[str, Any] | None = None,
- debug: dict[str, Any] | None = None) -> dict[str, Any]:
- try:
- subject = (subject or "").strip()
- if not subject:
- return {"status": "not_found"}
- stored = await self.load_entity_fn(subject)
- if stored:
- return {
- "status": "resolved",
- "atlas_id": stored.get("atlas_id"),
- "label": stored.get("label"),
- "type": stored.get("type"),
- "wikidata_id": stored.get("wikidata_id"),
- "alias": stored.get("alias") or subject,
- }
- wd = await self.wikidata_lookup_fn(subject)
- if not wd:
- return {"status": "not_found"}
- entity = _entity_from_wikidata(subject, wd)
- await self.persist_entity_fn(entity)
- return {
- "status": "resolved",
- "atlas_id": entity.id,
- "label": entity.label,
- "type": entity.type,
- "wikidata_id": wd.get("id"),
- "alias": subject,
- }
- except Exception as exc:
- detail = " | ".join(_flatten_exception_details(exc))
- return {
- "status": "error",
- "error": {"code": "RESOLVE_FAILED", "message": detail},
- }
|