| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174 |
- """Atlas semantic core for entity resolution and enrichment."""
- from __future__ import annotations
- from app.cache import EntityCache
- from app.entity_normalize import normalize_entity
- from app.models import (
- AtlasAlias,
- AtlasClaim,
- AtlasClaimObject,
- AtlasEntity,
- AtlasEnrichmentDataset,
- AtlasProvenance,
- )
- from app.trends_resolution import resolve_entity_via_trends
- from app.type_classifier import TypeClassification, classify_entity_type
- from app.storage_service import AtlasStorageService
- from app.virtuoso_store import VirtuosoEntityStore
- from app.wikidata_lookup import lookup_wikidata
- _entity_cache = EntityCache(max_entries=512)
- _virtuoso_store = VirtuosoEntityStore(max_cache_entries=256)
- _storage = AtlasStorageService()
- async def resolve_entity(subject: str, context: str | None = None) -> AtlasEntity:
- normalized = normalize_entity(subject)
- token = normalized.strip().lower()
- cached = _entity_cache.get(token)
- if cached is not None:
- try:
- await _storage.write_entity(cached)
- except Exception:
- pass
- return cached
- virt_hit = await _virtuoso_store.lookup(token)
- if virt_hit is not None:
- # Make the returned raw payload reflect the original caller input
- # (so tests and UI/debug output stay stable).
- if isinstance(virt_hit.raw_payload, dict):
- virt_hit.raw_payload.setdefault("source", "virtuoso")
- virt_hit.raw_payload["raw"] = subject
- virt_hit.raw_payload["normalized"] = normalized
- _entity_cache.store(virt_hit, extra_tokens=[subject, normalized])
- try:
- await _storage.write_entity(virt_hit)
- except Exception:
- pass
- return virt_hit
- resolution = resolve_entity_via_trends(subject)
- classification = await classify_entity_type(subject, resolution, context)
- wikidata = await lookup_wikidata(subject)
- entity = _entity_from_resolution(subject, resolution, classification, wikidata)
- _entity_cache.store(entity, extra_tokens=[subject, normalized])
- try:
- await _storage.write_entity(entity)
- except Exception:
- pass
- return entity
- def _entity_from_resolution(subject: str, resolution: dict, classification: TypeClassification, wikidata: dict | None = None) -> AtlasEntity:
- import hashlib
- canonical_label = (
- resolution.get("canonical_label")
- or resolution.get("normalized")
- or subject.strip()
- )
- canonical_type = (
- classification.canonical_type
- or resolution.get("type")
- or "unknown"
- )
- # atlas_id is opaque identity: hash-part only, never semantic content.
- stable_key = "|".join(
- [
- (resolution.get("mid") or "").strip(),
- (wikidata or {}).get("qid") or "",
- canonical_label.strip().lower(),
- ]
- )
- digest = hashlib.sha1(stable_key.encode("utf-8")).hexdigest()[:16]
- atlas_id = f"atlas:{digest}"
- trends_prov = AtlasProvenance(
- source=resolution.get("source") or "resolver",
- retrieval_method="trends-resolution",
- confidence=0.9 if resolution.get("mid") else 0.3,
- retrieved_at=resolution.get("retrieved_at"),
- )
- wikidata_prov = (
- AtlasProvenance(
- source="wikidata",
- retrieval_method="wbsearchentities + entitydata",
- confidence=0.99,
- retrieved_at=wikidata.get("retrieved_at"),
- )
- if wikidata and wikidata.get("qid")
- else None
- )
- claims: list[AtlasClaim] = []
- mid = resolution.get("mid")
- if mid:
- claims.append(
- AtlasClaim(
- claim_id=f"clm_raw_ident_mid_{mid}",
- subject=atlas_id,
- predicate="atlas:hasIdentifier",
- object=AtlasClaimObject(kind="identifier", id_type="mid", value=mid),
- layer="raw",
- provenance=trends_prov,
- )
- )
- if wikidata and wikidata.get("qid"):
- claims.append(
- AtlasClaim(
- claim_id=f"clm_raw_ident_qid_{wikidata['qid']}",
- subject=atlas_id,
- predicate="atlas:hasIdentifier",
- object=AtlasClaimObject(kind="identifier", id_type="qid", value=wikidata["qid"]),
- layer="raw",
- provenance=wikidata_prov,
- )
- )
- claims.append(
- AtlasClaim(
- claim_id="clm_drv_canonical_type",
- subject=atlas_id,
- predicate="atlas:hasCanonicalType",
- object=AtlasClaimObject(kind="type", value=f"atlas:{canonical_type}"),
- layer="derived",
- provenance=classification.provenance,
- )
- )
- payload = dict(resolution)
- if wikidata:
- payload["wikidata"] = {
- "status": "ok",
- "source": "wikidata",
- "qid": wikidata.get("qid"),
- "label": wikidata.get("label"),
- "description": wikidata.get("description"),
- "retrieved_at": wikidata.get("retrieved_at"),
- }
- else:
- payload["wikidata"] = {"status": "missing", "source": "wikidata", "retrieved_at": None}
- return AtlasEntity(
- atlas_id=atlas_id,
- canonical_label=canonical_label,
- canonical_description=(wikidata or {}).get("description"),
- entity_type=canonical_type,
- aliases=[AtlasAlias(label=subject.strip() or canonical_label)],
- claims=claims,
- raw_payload=payload,
- needs_curation=classification.needs_curation,
- )
- def enrich_entity(entity: AtlasEntity, constraints=None, depth: int = 1) -> AtlasEnrichmentDataset:
- return AtlasEnrichmentDataset(
- seed_entity=entity,
- related_entities=[],
- query_context=constraints or {},
- depth=depth,
- )
|