| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145 |
- from __future__ import annotations
- import hashlib
- import os
- import logging
- from dataclasses import dataclass
- from typing import Any
- from .atlas_model import Entity, Identifier
- from .atlas_store import load_entity_by_subject, save_entity_minimal
- from .wikidata import WikidataSearch
- ATLAS = "http://world.eu.org/atlas_ontology#"
- DEFAULT_ENDPOINT = os.getenv("ATLAS_VIRTUOSO_MCP_SSE_URL", "http://192.168.0.249:8501/mcp/sse")
- DEFAULT_UPDATE_ENDPOINT = os.getenv("ATLAS_VIRTUOSO_MCP_SSE_URL", DEFAULT_ENDPOINT)
- DEBUG_LOGS = os.getenv("ATLAS_DEBUG_LOGS", "false").lower() in {"1", "true", "yes", "on"}
- logger = logging.getLogger(__name__)
- def _hash_id(subject: str) -> str:
- return hashlib.sha1(subject.strip().lower().encode("utf-8")).hexdigest()[:16]
- def _entity_iri(atlas_id: str) -> str:
- return f"atlas_data:entity_{atlas_id}"
- async def _wikidata_lookup(subject: str) -> dict[str, Any] | None:
- search = WikidataSearch({"search": subject, "limit": 1})
- result = await search.search()
- items = result.get("results", [])
- return items[0] if items else None
- def _infer_atlas_type(label: str | None, description: str | None) -> str:
- text = f"{label or ''} {description or ''}".lower()
- if any(k in text for k in ["president", "person", "singer", "composer", "human", "actor", "writer"]):
- return "atlas:Person"
- if any(k in text for k in ["city", "town", "village", "country", "state", "location", "place"]):
- return "atlas:Location"
- if any(k in text for k in ["company", "organization", "organisation", "institution", "foundation", "band"]):
- return "atlas:Organization"
- return "atlas:Other"
- def _entity_from_wikidata(subject: str, wd: dict[str, Any]) -> Entity:
- atlas_id = _hash_id(subject)
- label = wd.get("label") or subject
- description = wd.get("description")
- qid = wd.get("id")
- entity_type = _infer_atlas_type(label, description)
- ent = Entity(
- id=atlas_id,
- label=label,
- description=description,
- type=entity_type,
- aliases=[subject] if subject.lower() != label.lower() else [],
- identifiers=[Identifier(scheme="wikidata-qid", value=qid)] if qid else [],
- needs_curation=True,
- )
- return ent
- def _flatten_exception_details(exc: BaseException) -> list[str]:
- parts = [f"{type(exc).__name__}: {exc}"]
- nested = getattr(exc, "exceptions", None)
- if nested:
- for sub in nested:
- parts.extend(_flatten_exception_details(sub))
- return parts
- async def _persist_entity(entity: Entity) -> None:
- await save_entity_minimal(entity, DEFAULT_UPDATE_ENDPOINT)
- async def _load_entity(subject: str) -> dict[str, Any] | None:
- return await load_entity_by_subject(subject, DEFAULT_ENDPOINT)
- @dataclass
- class ResolveService:
- load_entity_fn: Any = _load_entity
- wikidata_lookup_fn: Any = _wikidata_lookup
- persist_entity_fn: Any = _persist_entity
- async def resolve(self, *, subject: str, context: dict[str, Any] | None = None,
- constraints: dict[str, Any] | None = None,
- hints: dict[str, Any] | None = None,
- debug: dict[str, Any] | None = None) -> dict[str, Any]:
- try:
- subject = (subject or "").strip()
- if not subject:
- return {"status": "not_found"}
- if DEBUG_LOGS:
- logger.info("resolve start subject=%s", subject)
- stored = await self.load_entity_fn(subject)
- if stored:
- if DEBUG_LOGS:
- logger.info("store hit subject=%s atlas_id=%s", subject, stored.get("atlas_id"))
- return {
- "status": "resolved",
- "atlas_id": stored.get("atlas_id"),
- "label": stored.get("label"),
- "type": stored.get("type"),
- "wikidata_id": stored.get("wikidata_id"),
- "alias": stored.get("alias") or subject,
- }
- wd = await self.wikidata_lookup_fn(subject)
- if not wd:
- if DEBUG_LOGS:
- logger.info("wikidata miss subject=%s", subject)
- return {"status": "not_found"}
- entity = _entity_from_wikidata(subject, wd)
- if DEBUG_LOGS:
- logger.info(
- "wikidata hit subject=%s qid=%s atlas_id=%s type=%s",
- subject,
- wd.get("id"),
- entity.id,
- entity.type,
- )
- await self.persist_entity_fn(entity)
- return {
- "status": "resolved",
- "atlas_id": entity.id,
- "label": entity.label,
- "type": entity.type,
- "wikidata_id": wd.get("id"),
- "alias": subject,
- }
- except Exception as exc:
- detail = " | ".join(_flatten_exception_details(exc))
- return {
- "status": "error",
- "error": {"code": "RESOLVE_FAILED", "message": detail},
- }
|