"""Atlas semantic core for entity resolution and enrichment.""" from __future__ import annotations from datetime import datetime, timezone from app.cache import EntityCache from app.entity_normalize import normalize_entity from app.ids import claim_hash, entity_hash from app.models import ( AtlasAlias, AtlasClaim, AtlasClaimObject, AtlasEntity, AtlasEnrichmentDataset, AtlasProvenance, ) from app.trends_resolution import resolve_entity_via_trends from app.type_classifier import TypeClassification, classify_entity_type from app.storage_service import AtlasStorageService from app.virtuoso_store import VirtuosoEntityStore from app.wikidata_lookup import lookup_wikidata _entity_cache = EntityCache(max_entries=512) _virtuoso_store = VirtuosoEntityStore(max_cache_entries=256) _storage = AtlasStorageService() def _now_date() -> str: return datetime.now(timezone.utc).date().isoformat() async def resolve_entity(subject: str, context: str | None = None) -> AtlasEntity: normalized = normalize_entity(subject) token = normalized.strip().lower() cached = _entity_cache.get(token) if cached is not None: try: await _storage.write_entity(cached) except Exception: pass return cached virt_hit = await _virtuoso_store.lookup(token) if virt_hit is not None: # Make the returned raw payload reflect the original caller input # (so tests and UI/debug output stay stable). if isinstance(virt_hit.raw_payload, dict): virt_hit.raw_payload.setdefault("source", "virtuoso") virt_hit.raw_payload["raw"] = subject virt_hit.raw_payload["normalized"] = normalized _entity_cache.store(virt_hit, extra_tokens=[subject, normalized]) try: await _storage.write_entity(virt_hit) except Exception: pass return virt_hit resolution = resolve_entity_via_trends(subject) classification = await classify_entity_type(subject, resolution, context) wikidata = await lookup_wikidata(subject) entity = _entity_from_resolution(subject, resolution, classification, wikidata) _entity_cache.store(entity, extra_tokens=[subject, normalized]) try: await _storage.write_entity(entity) except Exception: pass return entity def _entity_from_resolution(subject: str, resolution: dict, classification: TypeClassification, wikidata: dict | None = None) -> AtlasEntity: canonical_label = ( resolution.get("canonical_label") or resolution.get("normalized") or subject.strip() ) canonical_type = ( classification.canonical_type or resolution.get("type") or "unknown" ) # atlas_id is opaque identity: hash-part only, never semantic content. atlas_id = f"atlas:{entity_hash((resolution.get('mid') or '').strip(), (wikidata or {}).get('qid') or '', canonical_label.strip().lower())}" trends_prov = AtlasProvenance( source=resolution.get("source") or "resolver", retrieval_method="trends-resolution", confidence=0.9 if resolution.get("mid") else 0.3, retrieved_at=resolution.get("retrieved_at"), ) wikidata_prov = ( AtlasProvenance( source="wikidata", retrieval_method="wbsearchentities + entitydata", confidence=0.99, retrieved_at=wikidata.get("retrieved_at"), ) if wikidata and wikidata.get("qid") else None ) claims: list[AtlasClaim] = [] mid = resolution.get("mid") if mid: claims.append( AtlasClaim( claim_id=f"clm_raw_ident_mid_{claim_hash(atlas_id, 'atlas:hasIdentifier', mid, 'raw')}", subject=atlas_id, predicate="atlas:hasIdentifier", object=AtlasClaimObject(kind="identifier", id_type="mid", value=mid), layer="raw", provenance=trends_prov, created_at=_now_date(), ) ) if wikidata and wikidata.get("qid"): claims.append( AtlasClaim( claim_id=f"clm_raw_ident_qid_{claim_hash(atlas_id, 'atlas:hasIdentifier', wikidata['qid'], 'raw')}", subject=atlas_id, predicate="atlas:hasIdentifier", object=AtlasClaimObject(kind="identifier", id_type="qid", value=wikidata["qid"]), layer="raw", provenance=wikidata_prov, created_at=_now_date(), ) ) claims.append( AtlasClaim( claim_id=f"clm_drv_canonical_type_{claim_hash(atlas_id, 'atlas:hasCanonicalType', canonical_type, 'derived')}", subject=atlas_id, predicate="atlas:hasCanonicalType", object=AtlasClaimObject(kind="type", value=f"atlas:{canonical_type}"), layer="derived", provenance=classification.provenance, created_at=_now_date(), ) ) payload = dict(resolution) if wikidata: payload["wikidata"] = { "wikidata_status": "hit", "source": "wikidata", "qid": wikidata.get("qid"), "label": wikidata.get("label"), "description": wikidata.get("description"), "retrieved_at": wikidata.get("retrieved_at"), } else: payload["wikidata"] = {"wikidata_status": "missing", "source": "wikidata", "retrieved_at": None} return AtlasEntity( atlas_id=atlas_id, canonical_label=canonical_label, canonical_description=(wikidata or {}).get("description"), entity_type=canonical_type, aliases=[AtlasAlias(label=subject.strip() or canonical_label)], claims=claims, raw_payload=payload, needs_curation=classification.needs_curation, ) def enrich_entity(entity: AtlasEntity, constraints=None, depth: int = 1) -> AtlasEnrichmentDataset: return AtlasEnrichmentDataset( seed_entity=entity, related_entities=[], query_context=constraints or {}, depth=depth, )