| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144 |
- """Atlas semantic core for entity resolution and enrichment."""
- from __future__ import annotations
- from app.cache import EntityCache
- from app.entity_normalize import normalize_entity
- from app.models import (
- AtlasAlias,
- AtlasEntity,
- AtlasEnrichmentDataset,
- AtlasIdentifier,
- AtlasProvenance,
- )
- from app.trends_resolution import resolve_entity_via_trends
- from app.type_classifier import TypeClassification, classify_entity_type
- from app.storage_service import AtlasStorageService
- from app.virtuoso_store import VirtuosoEntityStore
- from app.wikidata_lookup import lookup_wikidata
- _entity_cache = EntityCache(max_entries=512)
- _virtuoso_store = VirtuosoEntityStore(max_cache_entries=256)
- _storage = AtlasStorageService()
- async def resolve_entity(subject: str, context: str | None = None) -> AtlasEntity:
- normalized = normalize_entity(subject)
- token = normalized.strip().lower()
- cached = _entity_cache.get(token)
- if cached is not None:
- try:
- await _storage.write_entity(cached)
- except Exception:
- pass
- return cached
- virt_hit = await _virtuoso_store.lookup(token)
- if virt_hit is not None:
- # Make the returned raw payload reflect the original caller input
- # (so tests and UI/debug output stay stable).
- if isinstance(virt_hit.raw_payload, dict):
- virt_hit.raw_payload.setdefault("source", "virtuoso")
- virt_hit.raw_payload["raw"] = subject
- virt_hit.raw_payload["normalized"] = normalized
- _entity_cache.store(virt_hit, extra_tokens=[subject, normalized])
- try:
- await _storage.write_entity(virt_hit)
- except Exception:
- pass
- return virt_hit
- resolution = resolve_entity_via_trends(subject)
- classification = await classify_entity_type(subject, resolution, context)
- wikidata = await lookup_wikidata(subject)
- entity = _entity_from_resolution(subject, resolution, classification, wikidata)
- _entity_cache.store(entity, extra_tokens=[subject, normalized])
- try:
- await _storage.write_entity(entity)
- except Exception:
- pass
- return entity
- def _entity_from_resolution(subject: str, resolution: dict, classification: TypeClassification, wikidata: dict | None = None) -> AtlasEntity:
- canonical_label = (
- resolution.get("canonical_label")
- or resolution.get("normalized")
- or subject.strip()
- )
- atlas_id = resolution.get("mid")
- if atlas_id:
- atlas_id = f"atlas:mid:{atlas_id.strip()}"
- else:
- slug = canonical_label.strip().lower().replace(" ", "-") or "entity"
- atlas_id = f"atlas:{slug}"
- identifiers = []
- mid = resolution.get("mid")
- if mid:
- identifiers.append(
- AtlasIdentifier(value=mid, source="google", identifier_type="mid")
- )
- if wikidata and wikidata.get("qid"):
- identifiers.append(
- AtlasIdentifier(value=wikidata["qid"], source="wikidata", identifier_type="qid")
- )
- provenance = [
- AtlasProvenance(
- source=resolution.get("source") or "resolver",
- retrieval_method="trends-resolution",
- confidence=0.9 if resolution.get("mid") else 0.3,
- retrieved_at=resolution.get("resolved_at"),
- )
- ]
- if classification.provenance:
- provenance.append(classification.provenance)
- if wikidata and wikidata.get("qid"):
- provenance.append(
- AtlasProvenance(
- source="wikidata",
- retrieval_method="wbsearchentities + entitydata",
- confidence=0.99,
- retrieved_at=wikidata.get("retrieved_at"),
- )
- )
- canonical_type = (
- classification.canonical_type
- or resolution.get("type")
- or "unknown"
- )
- payload = dict(resolution)
- if wikidata:
- payload["wikidata"] = {
- "status": "ok",
- "qid": wikidata.get("qid"),
- "label": wikidata.get("label"),
- "description": wikidata.get("description"),
- "retrieved_at": wikidata.get("retrieved_at"),
- }
- else:
- payload["wikidata"] = {"status": "missing"}
- return AtlasEntity(
- atlas_id=atlas_id,
- canonical_label=canonical_label,
- canonical_description=(wikidata or {}).get("description"),
- entity_type=canonical_type,
- aliases=[AtlasAlias(label=subject.strip() or canonical_label)],
- identifiers=identifiers,
- provenance=provenance,
- raw_payload=payload,
- needs_curation=classification.needs_curation,
- )
- def enrich_entity(entity: AtlasEntity, constraints=None, depth: int = 1) -> AtlasEnrichmentDataset:
- return AtlasEnrichmentDataset(
- seed_entity=entity,
- related_entities=[],
- query_context=constraints or {},
- depth=depth,
- )
|