"""Atlas semantic core for entity resolution and enrichment.""" from __future__ import annotations from app.cache import EntityCache from app.entity_normalize import normalize_entity from app.models import ( AtlasAlias, AtlasEntity, AtlasEnrichmentDataset, AtlasIdentifier, AtlasProvenance, ) from app.trends_resolution import resolve_entity_via_trends from app.type_classifier import TypeClassification, classify_entity_type from app.storage_service import AtlasStorageService from app.virtuoso_store import VirtuosoEntityStore from app.wikidata_lookup import lookup_wikidata _entity_cache = EntityCache(max_entries=512) _virtuoso_store = VirtuosoEntityStore(max_cache_entries=256) _storage = AtlasStorageService() async def resolve_entity(subject: str, context: str | None = None) -> AtlasEntity: normalized = normalize_entity(subject) token = normalized.strip().lower() cached = _entity_cache.get(token) if cached is not None: try: await _storage.write_entity(cached) except Exception: pass return cached virt_hit = await _virtuoso_store.lookup(token) if virt_hit is not None: # Make the returned raw payload reflect the original caller input # (so tests and UI/debug output stay stable). if isinstance(virt_hit.raw_payload, dict): virt_hit.raw_payload.setdefault("source", "virtuoso") virt_hit.raw_payload["raw"] = subject virt_hit.raw_payload["normalized"] = normalized _entity_cache.store(virt_hit, extra_tokens=[subject, normalized]) try: await _storage.write_entity(virt_hit) except Exception: pass return virt_hit resolution = resolve_entity_via_trends(subject) classification = await classify_entity_type(subject, resolution, context) wikidata = await lookup_wikidata(subject) entity = _entity_from_resolution(subject, resolution, classification, wikidata) _entity_cache.store(entity, extra_tokens=[subject, normalized]) try: await _storage.write_entity(entity) except Exception: pass return entity def _entity_from_resolution(subject: str, resolution: dict, classification: TypeClassification, wikidata: dict | None = None) -> AtlasEntity: canonical_label = ( resolution.get("canonical_label") or resolution.get("normalized") or subject.strip() ) atlas_id = resolution.get("mid") if atlas_id: atlas_id = f"atlas:mid:{atlas_id.strip()}" else: slug = canonical_label.strip().lower().replace(" ", "-") or "entity" atlas_id = f"atlas:{slug}" identifiers = [] mid = resolution.get("mid") if mid: identifiers.append( AtlasIdentifier(value=mid, source="google", identifier_type="mid") ) if wikidata and wikidata.get("qid"): identifiers.append( AtlasIdentifier(value=wikidata["qid"], source="wikidata", identifier_type="qid") ) provenance = [ AtlasProvenance( source=resolution.get("source") or "resolver", retrieval_method="trends-resolution", confidence=0.9 if resolution.get("mid") else 0.3, retrieved_at=resolution.get("resolved_at"), ) ] if classification.provenance: provenance.append(classification.provenance) if wikidata and wikidata.get("qid"): provenance.append( AtlasProvenance( source="wikidata", retrieval_method="wbsearchentities + entitydata", confidence=0.99, retrieved_at=wikidata.get("retrieved_at"), ) ) canonical_type = ( classification.canonical_type or resolution.get("type") or "unknown" ) payload = dict(resolution) if wikidata: payload["wikidata"] = { "status": "ok", "qid": wikidata.get("qid"), "label": wikidata.get("label"), "description": wikidata.get("description"), "retrieved_at": wikidata.get("retrieved_at"), } else: payload["wikidata"] = {"status": "missing"} return AtlasEntity( atlas_id=atlas_id, canonical_label=canonical_label, canonical_description=(wikidata or {}).get("description"), entity_type=canonical_type, aliases=[AtlasAlias(label=subject.strip() or canonical_label)], identifiers=identifiers, provenance=provenance, raw_payload=payload, needs_curation=classification.needs_curation, ) def enrich_entity(entity: AtlasEntity, constraints=None, depth: int = 1) -> AtlasEnrichmentDataset: return AtlasEnrichmentDataset( seed_entity=entity, related_entities=[], query_context=constraints or {}, depth=depth, )