atlas.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. """Atlas semantic core for entity resolution and enrichment."""
  2. from __future__ import annotations
  3. from datetime import datetime, timezone
  4. from app.cache import EntityCache
  5. from app.entity_normalize import normalize_entity
  6. from app.ids import claim_hash, entity_hash
  7. from app.models import (
  8. AtlasAlias,
  9. AtlasClaim,
  10. AtlasClaimObject,
  11. AtlasEntity,
  12. AtlasEnrichmentDataset,
  13. AtlasProvenance,
  14. )
  15. from app.trends_resolution import resolve_entity_via_trends
  16. from app.type_classifier import TypeClassification, classify_entity_type
  17. from app.storage_service import AtlasStorageService
  18. from app.virtuoso_store import VirtuosoEntityStore
  19. from app.wikidata_lookup import lookup_wikidata
  20. _entity_cache = EntityCache(max_entries=512)
  21. _virtuoso_store = VirtuosoEntityStore(max_cache_entries=256)
  22. _storage = AtlasStorageService()
  23. def _now_date() -> str:
  24. return datetime.now(timezone.utc).date().isoformat()
  25. async def resolve_entity(subject: str, context: str | None = None) -> AtlasEntity:
  26. # Normalize once so cache lookups and downstream resolvers speak the same name.
  27. normalized = normalize_entity(subject)
  28. token = normalized.strip().lower()
  29. # Fast path: reuse the last resolved entity if we already have it.
  30. cached = _entity_cache.get(token)
  31. if cached is not None:
  32. try:
  33. await _storage.write_entity(cached)
  34. except Exception:
  35. pass
  36. return cached
  37. # Prefer the local graph when it already knows this entity.
  38. virt_hit = await _virtuoso_store.lookup(token)
  39. if virt_hit is not None:
  40. # Keep debug output anchored to the caller's wording.
  41. if isinstance(virt_hit.raw_payload, dict):
  42. virt_hit.raw_payload.setdefault("source", "virtuoso")
  43. virt_hit.raw_payload["raw"] = subject
  44. virt_hit.raw_payload["normalized"] = normalized
  45. _entity_cache.store(virt_hit, extra_tokens=[subject, normalized])
  46. try:
  47. await _storage.write_entity(virt_hit)
  48. except Exception:
  49. pass
  50. return virt_hit
  51. # Fall back to live resolution, then shape the result into Atlas form.
  52. resolution = resolve_entity_via_trends(subject)
  53. classification = await classify_entity_type(subject, resolution, context)
  54. wikidata = await lookup_wikidata(subject)
  55. entity = _entity_from_resolution(subject, resolution, classification, wikidata)
  56. _entity_cache.store(entity, extra_tokens=[subject, normalized])
  57. try:
  58. await _storage.write_entity(entity)
  59. except Exception:
  60. pass
  61. return entity
  62. def _entity_from_resolution(subject: str, resolution: dict, classification: TypeClassification, wikidata: dict | None = None) -> AtlasEntity:
  63. # Pick the cleanest label we have; fall back to the caller's wording.
  64. canonical_label = (
  65. resolution.get("canonical_label")
  66. or resolution.get("normalized")
  67. or subject.strip()
  68. )
  69. canonical_type = (
  70. classification.canonical_type
  71. or resolution.get("type")
  72. or "unknown"
  73. )
  74. # atlas_id is opaque identity: hash-part only, never semantic content.
  75. atlas_id = f"atlas:{entity_hash((resolution.get('mid') or '').strip(), (wikidata or {}).get('qid') or '', canonical_label.strip().lower())}"
  76. trends_prov = AtlasProvenance(
  77. source=resolution.get("source") or "resolver",
  78. retrieval_method="trends-resolution",
  79. confidence=0.9 if resolution.get("mid") else 0.3,
  80. retrieved_at=resolution.get("retrieved_at"),
  81. )
  82. wikidata_prov = (
  83. AtlasProvenance(
  84. source="wikidata",
  85. retrieval_method="wbsearchentities + entitydata",
  86. confidence=0.99,
  87. retrieved_at=wikidata.get("retrieved_at"),
  88. )
  89. if wikidata and wikidata.get("qid")
  90. else None
  91. )
  92. claims: list[AtlasClaim] = []
  93. mid = resolution.get("mid")
  94. if mid:
  95. claims.append(
  96. AtlasClaim(
  97. claim_id=f"clm_raw_ident_mid_{claim_hash(atlas_id, 'atlas:hasIdentifier', mid, 'raw')}",
  98. subject=atlas_id,
  99. predicate="atlas:hasIdentifier",
  100. object=AtlasClaimObject(kind="identifier", id_type="mid", value=mid),
  101. layer="raw",
  102. provenance=trends_prov,
  103. created_at=_now_date(),
  104. )
  105. )
  106. if wikidata and wikidata.get("qid"):
  107. claims.append(
  108. AtlasClaim(
  109. claim_id=f"clm_raw_ident_qid_{claim_hash(atlas_id, 'atlas:hasIdentifier', wikidata['qid'], 'raw')}",
  110. subject=atlas_id,
  111. predicate="atlas:hasIdentifier",
  112. object=AtlasClaimObject(kind="identifier", id_type="qid", value=wikidata["qid"]),
  113. layer="raw",
  114. provenance=wikidata_prov,
  115. created_at=_now_date(),
  116. )
  117. )
  118. # The derived type is the one we expect other parts of Atlas to trust.
  119. claims.append(
  120. AtlasClaim(
  121. claim_id=f"clm_drv_canonical_type_{claim_hash(atlas_id, 'atlas:hasCanonicalType', canonical_type, 'derived')}",
  122. subject=atlas_id,
  123. predicate="atlas:hasCanonicalType",
  124. object=AtlasClaimObject(kind="type", value=f"atlas:{canonical_type}"),
  125. layer="derived",
  126. provenance=classification.provenance,
  127. created_at=_now_date(),
  128. )
  129. )
  130. payload = dict(resolution)
  131. if wikidata:
  132. payload["wikidata"] = {
  133. "wikidata_status": "hit",
  134. "source": "wikidata",
  135. "qid": wikidata.get("qid"),
  136. "label": wikidata.get("label"),
  137. "description": wikidata.get("description"),
  138. "retrieved_at": wikidata.get("retrieved_at"),
  139. }
  140. else:
  141. payload["wikidata"] = {"wikidata_status": "missing", "source": "wikidata", "retrieved_at": None}
  142. return AtlasEntity(
  143. atlas_id=atlas_id,
  144. canonical_label=canonical_label,
  145. canonical_description=(wikidata or {}).get("description"),
  146. entity_type=canonical_type,
  147. aliases=[AtlasAlias(label=subject.strip() or canonical_label)],
  148. claims=claims,
  149. raw_payload=payload,
  150. needs_curation=classification.needs_curation,
  151. )
  152. def enrich_entity(entity: AtlasEntity, constraints=None, depth: int = 1) -> AtlasEnrichmentDataset:
  153. return AtlasEnrichmentDataset(
  154. seed_entity=entity,
  155. related_entities=[],
  156. query_context=constraints or {},
  157. depth=depth,
  158. )