atlas.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. """Atlas semantic core for entity resolution and enrichment."""
  2. from __future__ import annotations
  3. from app.cache import EntityCache
  4. from app.entity_normalize import normalize_entity
  5. from app.models import (
  6. AtlasAlias,
  7. AtlasClaim,
  8. AtlasClaimObject,
  9. AtlasEntity,
  10. AtlasEnrichmentDataset,
  11. AtlasProvenance,
  12. )
  13. from app.trends_resolution import resolve_entity_via_trends
  14. from app.type_classifier import TypeClassification, classify_entity_type
  15. from app.storage_service import AtlasStorageService
  16. from app.virtuoso_store import VirtuosoEntityStore
  17. from app.wikidata_lookup import lookup_wikidata
  18. _entity_cache = EntityCache(max_entries=512)
  19. _virtuoso_store = VirtuosoEntityStore(max_cache_entries=256)
  20. _storage = AtlasStorageService()
  21. async def resolve_entity(subject: str, context: str | None = None) -> AtlasEntity:
  22. normalized = normalize_entity(subject)
  23. token = normalized.strip().lower()
  24. cached = _entity_cache.get(token)
  25. if cached is not None:
  26. try:
  27. await _storage.write_entity(cached)
  28. except Exception:
  29. pass
  30. return cached
  31. virt_hit = await _virtuoso_store.lookup(token)
  32. if virt_hit is not None:
  33. # Make the returned raw payload reflect the original caller input
  34. # (so tests and UI/debug output stay stable).
  35. if isinstance(virt_hit.raw_payload, dict):
  36. virt_hit.raw_payload.setdefault("source", "virtuoso")
  37. virt_hit.raw_payload["raw"] = subject
  38. virt_hit.raw_payload["normalized"] = normalized
  39. _entity_cache.store(virt_hit, extra_tokens=[subject, normalized])
  40. try:
  41. await _storage.write_entity(virt_hit)
  42. except Exception:
  43. pass
  44. return virt_hit
  45. resolution = resolve_entity_via_trends(subject)
  46. classification = await classify_entity_type(subject, resolution, context)
  47. wikidata = await lookup_wikidata(subject)
  48. entity = _entity_from_resolution(subject, resolution, classification, wikidata)
  49. _entity_cache.store(entity, extra_tokens=[subject, normalized])
  50. try:
  51. await _storage.write_entity(entity)
  52. except Exception:
  53. pass
  54. return entity
  55. def _entity_from_resolution(subject: str, resolution: dict, classification: TypeClassification, wikidata: dict | None = None) -> AtlasEntity:
  56. import hashlib
  57. canonical_label = (
  58. resolution.get("canonical_label")
  59. or resolution.get("normalized")
  60. or subject.strip()
  61. )
  62. canonical_type = (
  63. classification.canonical_type
  64. or resolution.get("type")
  65. or "unknown"
  66. )
  67. # atlas_id is opaque identity: hash-part only, never semantic content.
  68. stable_key = "|".join(
  69. [
  70. (resolution.get("mid") or "").strip(),
  71. (wikidata or {}).get("qid") or "",
  72. canonical_label.strip().lower(),
  73. ]
  74. )
  75. digest = hashlib.sha1(stable_key.encode("utf-8")).hexdigest()[:16]
  76. atlas_id = f"atlas:{digest}"
  77. trends_prov = AtlasProvenance(
  78. source=resolution.get("source") or "resolver",
  79. retrieval_method="trends-resolution",
  80. confidence=0.9 if resolution.get("mid") else 0.3,
  81. retrieved_at=resolution.get("retrieved_at"),
  82. )
  83. wikidata_prov = (
  84. AtlasProvenance(
  85. source="wikidata",
  86. retrieval_method="wbsearchentities + entitydata",
  87. confidence=0.99,
  88. retrieved_at=wikidata.get("retrieved_at"),
  89. )
  90. if wikidata and wikidata.get("qid")
  91. else None
  92. )
  93. claims: list[AtlasClaim] = []
  94. mid = resolution.get("mid")
  95. if mid:
  96. claims.append(
  97. AtlasClaim(
  98. claim_id=f"clm_raw_ident_mid_{mid}",
  99. subject=atlas_id,
  100. predicate="atlas:hasIdentifier",
  101. object=AtlasClaimObject(kind="identifier", id_type="mid", value=mid),
  102. layer="raw",
  103. provenance=trends_prov,
  104. )
  105. )
  106. if wikidata and wikidata.get("qid"):
  107. claims.append(
  108. AtlasClaim(
  109. claim_id=f"clm_raw_ident_qid_{wikidata['qid']}",
  110. subject=atlas_id,
  111. predicate="atlas:hasIdentifier",
  112. object=AtlasClaimObject(kind="identifier", id_type="qid", value=wikidata["qid"]),
  113. layer="raw",
  114. provenance=wikidata_prov,
  115. )
  116. )
  117. claims.append(
  118. AtlasClaim(
  119. claim_id="clm_drv_canonical_type",
  120. subject=atlas_id,
  121. predicate="atlas:hasCanonicalType",
  122. object=AtlasClaimObject(kind="type", value=f"atlas:{canonical_type}"),
  123. layer="derived",
  124. provenance=classification.provenance,
  125. )
  126. )
  127. payload = dict(resolution)
  128. if wikidata:
  129. payload["wikidata"] = {
  130. "status": "ok",
  131. "source": "wikidata",
  132. "qid": wikidata.get("qid"),
  133. "label": wikidata.get("label"),
  134. "description": wikidata.get("description"),
  135. "retrieved_at": wikidata.get("retrieved_at"),
  136. }
  137. else:
  138. payload["wikidata"] = {"status": "missing", "source": "wikidata", "retrieved_at": None}
  139. return AtlasEntity(
  140. atlas_id=atlas_id,
  141. canonical_label=canonical_label,
  142. canonical_description=(wikidata or {}).get("description"),
  143. entity_type=canonical_type,
  144. aliases=[AtlasAlias(label=subject.strip() or canonical_label)],
  145. claims=claims,
  146. raw_payload=payload,
  147. needs_curation=classification.needs_curation,
  148. )
  149. def enrich_entity(entity: AtlasEntity, constraints=None, depth: int = 1) -> AtlasEnrichmentDataset:
  150. return AtlasEnrichmentDataset(
  151. seed_entity=entity,
  152. related_entities=[],
  153. query_context=constraints or {},
  154. depth=depth,
  155. )