maintain_entities.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296
  1. #!/usr/bin/env python3
  2. """Atlas maintenance script.
  3. Goal:
  4. - automatically revisit stored entities
  5. - enrich identifier coverage when Wikidata is present
  6. - keep the claim supersession model authoritative
  7. Operational rule:
  8. - no manual subject list is required for normal runs
  9. - --dry-run shows what would change, without writing
  10. """
  11. from __future__ import annotations
  12. import argparse
  13. import asyncio
  14. import json
  15. import sys
  16. from pathlib import Path
  17. from dataclasses import asdict
  18. from datetime import datetime, timezone
  19. from typing import Any
  20. ROOT = Path(__file__).resolve().parents[1]
  21. if str(ROOT) not in sys.path:
  22. sys.path.insert(0, str(ROOT))
  23. import app.atlas as atlas_module
  24. from app.atlas import resolve_entity
  25. from app.ids import claim_hash
  26. from app.models import AtlasClaim, AtlasClaimObject, AtlasEntity, AtlasProvenance
  27. from app.storage_service import AtlasStorageService
  28. from app.wikidata_lookup import lookup_wikidata
  29. # High-confidence identifier properties we can mine from the full Wikidata entity.
  30. # The goal is to enrich the entity with public identifiers and to reconcile the
  31. # Google MID whenever Wikidata already exposes the same identity through another id.
  32. #
  33. # Note: provenance.retrieval_method describes the evidence/property source
  34. # (for example "MusicBrainz artist ID"), not the name of this script.
  35. WIKIDATA_IDENTIFIER_PROPERTIES: dict[str, tuple[str, str]] = {
  36. "P2671": ("mid", "Google Knowledge Graph ID"),
  37. "P434": ("musicbrainz-artist-id", "MusicBrainz artist ID"),
  38. "P435": ("musicbrainz-work-id", "MusicBrainz work ID"),
  39. "P436": ("musicbrainz-release-group-id", "MusicBrainz release group ID"),
  40. "P439": ("musicbrainz-release-id", "MusicBrainz release ID"),
  41. "P444": ("musicbrainz-recording-id", "MusicBrainz recording ID"),
  42. "P345": ("imdb-id", "IMDb ID"),
  43. "P214": ("viaf-id", "VIAF ID"),
  44. "P213": ("isni", "ISNI"),
  45. "P227": ("gnd-id", "GND ID"),
  46. }
  47. # Entity-type-specific Wikidata fields worth capturing early. These are the
  48. # fields that help the most with disambiguation and downstream consolidation.
  49. WIKIDATA_TYPE_FIELD_PLAN: dict[str, dict[str, tuple[str, str]]] = {
  50. "Person": {
  51. "P569": ("birth-date", "date of birth"),
  52. "P19": ("birth-place", "place of birth"),
  53. "P27": ("citizenship", "country of citizenship"),
  54. },
  55. "Organization": {
  56. "P571": ("inception", "inception"),
  57. "P159": ("headquarters", "headquarters location"),
  58. "P452": ("industry", "industry"),
  59. },
  60. "Location": {
  61. "P571": ("inception", "inception"),
  62. "P17": ("country", "country"),
  63. "P131": ("located-in", "located in the administrative territorial entity"),
  64. },
  65. }
  66. def _planned_claim_id(subject: str, predicate: str, value: str, layer: str = "raw") -> str:
  67. created_at = datetime.now(timezone.utc).date().isoformat()
  68. return f"clm_{layer}_{claim_hash(subject, predicate, value, layer, created_at=created_at)}"
  69. def build_parser() -> argparse.ArgumentParser:
  70. parser = argparse.ArgumentParser(description="Atlas maintenance / claim adjudication helper")
  71. parser.add_argument("--dry-run", action="store_true", help="Show planned claim updates without writing")
  72. parser.add_argument("--page-size", type=int, default=50, help="How many entities to scan per page")
  73. parser.add_argument("--start-after", default="", help="Resume scanning after this canonical label")
  74. parser.add_argument("--checkpoint-file", default=".atlas-maintenance.checkpoint", help="File storing the last processed label")
  75. parser.add_argument("--reset-checkpoint", action="store_true", help="Ignore any saved checkpoint and start from the beginning")
  76. parser.add_argument("--clear-checkpoint", action="store_true", help="Delete the checkpoint file and exit")
  77. return parser
  78. async def _sparql_bindings(query: str) -> list[dict[str, Any]]:
  79. svc = AtlasStorageService()
  80. result = await svc._call_tool("sparql_query", {"query": query})
  81. if isinstance(result, list) and result:
  82. first = result[0]
  83. text = getattr(first, "text", None)
  84. result = json.loads(text) if text else {}
  85. return result.get("results", {}).get("bindings", []) if isinstance(result, dict) else []
  86. async def discover_subjects(page_size: int, start_after: str = "") -> list[str]:
  87. """Ask Virtuoso for known Atlas entities and return their labels.
  88. This keeps the maintenance job automatic: we operate on the stored graph,
  89. not on a hand-entered subject list.
  90. """
  91. filter_clause = f'FILTER(STR(?label) > "{start_after.replace("\\", "\\\\").replace("\"", "\\\"")}")' if start_after else ""
  92. query = """
  93. PREFIX atlas: <http://world.eu.org/atlas_ontology#>
  94. SELECT DISTINCT ?label WHERE {{
  95. GRAPH <http://world.eu.org/atlas_data#> {{
  96. ?entity a atlas:Entity ;
  97. atlas:canonicalLabel ?label .
  98. {filter_clause}
  99. }}
  100. }}
  101. ORDER BY ?label
  102. LIMIT {page_size}
  103. """.format(filter_clause=filter_clause, page_size=page_size)
  104. bindings = await _sparql_bindings(query)
  105. return [b.get("label", {}).get("value", "") for b in bindings if b.get("label", {}).get("value")]
  106. async def maintain_subject(subject: str, dry_run: bool) -> dict[str, Any]:
  107. # We resolve first so the maintenance run always starts from the current
  108. # canonical entity shape, then we layer on any new evidence.
  109. if dry_run:
  110. original_write = atlas_module._storage.write_entity
  111. async def _noop_write(entity):
  112. return {"status": "dry-run", "entity_id": entity.atlas_id}
  113. atlas_module._storage.write_entity = _noop_write
  114. try:
  115. entity = await resolve_entity(subject)
  116. finally:
  117. atlas_module._storage.write_entity = original_write
  118. else:
  119. entity = await resolve_entity(subject)
  120. report: dict[str, Any] = {
  121. "subject": subject,
  122. "atlas_id": entity.atlas_id,
  123. "planned": [],
  124. "written": False,
  125. "wikidata_status": "missing",
  126. "planned_identifier_claims": 0,
  127. "planned_identifier_types": [],
  128. "planned_type_field_claims": 0,
  129. }
  130. wikidata = entity.raw_payload.get("wikidata") if isinstance(entity.raw_payload, dict) else None
  131. if isinstance(wikidata, dict) and wikidata.get("wikidata_status") == "hit" and wikidata.get("qid"):
  132. report["wikidata_status"] = "hit"
  133. # If Wikidata already knows the entity, fetch the full object and mine
  134. # any additional identifiers we can safely attach as claims.
  135. full = await lookup_wikidata(subject)
  136. if full and isinstance(full.get("entity"), dict):
  137. report["wikidata_status"] = "enriched"
  138. entity_block = full["entity"]
  139. claims = entity_block.get("claims", {}) if isinstance(entity_block, dict) else {}
  140. # QID is always a known cross-reference and acts as a stable anchor.
  141. qid = full.get("qid")
  142. existing_qid = entity.active_identifier("qid")
  143. if qid and qid != existing_qid:
  144. claim = AtlasClaim(
  145. claim_id=_planned_claim_id(entity.atlas_id, "atlas:hasIdentifier", qid),
  146. subject=entity.atlas_id,
  147. predicate="atlas:hasIdentifier",
  148. object=AtlasClaimObject(kind="identifier", id_type="qid", value=qid),
  149. layer="raw",
  150. provenance=AtlasProvenance(
  151. source="wikidata",
  152. retrieval_method="atlas-maintenance-wikidata-enrichment",
  153. confidence=0.99,
  154. retrieved_at=full.get("retrieved_at"),
  155. evidence_property="qid",
  156. ),
  157. )
  158. report["planned"].append({"action": "add_identifier_claim", "claim": asdict(claim)})
  159. report["planned_identifier_claims"] += 1
  160. report["planned_identifier_types"].append("qid")
  161. for wikidata_property, (identifier_type, label) in WIKIDATA_IDENTIFIER_PROPERTIES.items():
  162. property_claims = claims.get(wikidata_property, []) if isinstance(claims, dict) else []
  163. for claim_node in property_claims:
  164. mainsnak = claim_node.get("mainsnak", {})
  165. datavalue = mainsnak.get("datavalue", {})
  166. value = datavalue.get("value")
  167. if not isinstance(value, str) or not value.strip():
  168. continue
  169. existing = entity.active_identifier(identifier_type)
  170. if existing == value:
  171. continue
  172. claim = AtlasClaim(
  173. claim_id=_planned_claim_id(entity.atlas_id, "atlas:hasIdentifier", value),
  174. subject=entity.atlas_id,
  175. predicate="atlas:hasIdentifier",
  176. object=AtlasClaimObject(kind="identifier", id_type=identifier_type, value=value),
  177. layer="raw",
  178. provenance=AtlasProvenance(
  179. source="wikidata",
  180. retrieval_method="atlas-maintenance-wikidata-enrichment",
  181. confidence=0.99,
  182. retrieved_at=full.get("retrieved_at"),
  183. evidence_property=wikidata_property,
  184. ),
  185. )
  186. report["planned"].append({"action": "add_identifier_claim", "claim": asdict(claim)})
  187. report["planned_identifier_claims"] += 1
  188. report["planned_identifier_types"].append(identifier_type)
  189. # Type-specific enrichment: different entity kinds care about different fields.
  190. # We only plan claims for high-confidence public facts that are useful for
  191. # disambiguation and consolidation.
  192. type_plan = WIKIDATA_TYPE_FIELD_PLAN.get(entity.entity_type, {})
  193. for wikidata_property, (claim_type, label) in type_plan.items():
  194. property_claims = claims.get(wikidata_property, []) if isinstance(claims, dict) else []
  195. for claim_node in property_claims:
  196. mainsnak = claim_node.get("mainsnak", {})
  197. datavalue = mainsnak.get("datavalue", {})
  198. value = datavalue.get("value")
  199. if value in (None, "", {}):
  200. continue
  201. # For this first pass we capture these as literal payload claims;
  202. # the exact ontology mapping can be tightened later.
  203. if isinstance(value, dict):
  204. # entity / place objects often carry an id and label
  205. value = value.get("id") or value.get("time") or value.get("text") or value.get("amount")
  206. if not isinstance(value, str):
  207. continue
  208. claim = AtlasClaim(
  209. claim_id=_planned_claim_id(entity.atlas_id, f"atlas:has{claim_type.replace('-', ' ').title().replace(' ', '')}", value),
  210. subject=entity.atlas_id,
  211. predicate=f"atlas:has{claim_type.replace('-', ' ').title().replace(' ', '')}",
  212. object=AtlasClaimObject(kind="literal", value=value),
  213. layer="raw",
  214. provenance=AtlasProvenance(
  215. source="wikidata",
  216. retrieval_method="atlas-maintenance-wikidata-enrichment",
  217. confidence=0.95,
  218. retrieved_at=full.get("retrieved_at"),
  219. evidence_property=wikidata_property,
  220. ),
  221. )
  222. report["planned"].append({"action": "add_type_field_claim", "claim": asdict(claim)})
  223. report["planned_type_field_claims"] += 1
  224. if dry_run:
  225. return report
  226. # The script currently only reports planned updates.
  227. # Once the claim update path is wired, this is where write-back will happen.
  228. report["written"] = False
  229. return report
  230. async def main() -> int:
  231. parser = build_parser()
  232. args = parser.parse_args()
  233. checkpoint_path = Path(args.checkpoint_file)
  234. if args.clear_checkpoint:
  235. if checkpoint_path.exists():
  236. checkpoint_path.unlink()
  237. print(json.dumps({"checkpoint_cleared": True, "checkpoint_file": str(checkpoint_path)}, indent=2, ensure_ascii=False))
  238. return 0
  239. start_after = args.start_after.strip()
  240. if args.reset_checkpoint:
  241. start_after = ""
  242. elif not start_after:
  243. checkpoint_path = Path(args.checkpoint_file)
  244. if checkpoint_path.exists():
  245. start_after = checkpoint_path.read_text(encoding="utf-8").strip()
  246. subjects = await discover_subjects(args.page_size, start_after)
  247. summaries = []
  248. for subject in subjects:
  249. summaries.append(await maintain_subject(subject, args.dry_run))
  250. if subjects and not args.dry_run:
  251. checkpoint_path.write_text(subjects[-1], encoding="utf-8")
  252. print(json.dumps({
  253. "dry_run": args.dry_run,
  254. "checkpoint_file": str(checkpoint_path),
  255. "checkpoint_start_after": start_after,
  256. "results": summaries,
  257. }, indent=2, ensure_ascii=False))
  258. return 0
  259. if __name__ == "__main__":
  260. raise SystemExit(asyncio.run(main()))