maintain_entities.py 42 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912
  1. #!/usr/bin/env python3
  2. """Atlas maintenance script.
  3. Goal:
  4. - automatically revisit stored entities
  5. - enrich identifier coverage when Wikidata is present
  6. - keep the claim supersession model authoritative
  7. Operational rule:
  8. - no manual subject list is required for normal runs
  9. - --dry-run shows what would change, without writing
  10. """
  11. from __future__ import annotations
  12. import argparse
  13. import asyncio
  14. import json
  15. import os
  16. import re
  17. import sys
  18. from pathlib import Path
  19. from dataclasses import asdict
  20. from datetime import datetime, timezone
  21. from typing import Any
  22. import httpx
  23. ROOT = Path(__file__).resolve().parents[1]
  24. if str(ROOT) not in sys.path:
  25. sys.path.insert(0, str(ROOT))
  26. from app.ids import claim_hash
  27. from app.models import AtlasClaim, AtlasClaimObject, AtlasEntity, AtlasProvenance
  28. from app.storage_service import AtlasStorageService
  29. from app.type_classifier import WIKIDATA_CLASS_MAP
  30. from app.wikidata_lookup import fetch_wikidata_entity, lookup_wikidata
  31. from app.wikidata_type_reasoner import infer_atlas_type_from_p31
  32. # High-confidence identifier properties we can mine from the full Wikidata entity.
  33. # The goal is to enrich the entity with public identifiers and to reconcile the
  34. # Google MID whenever Wikidata already exposes the same identity through another id.
  35. #
  36. # Note: provenance.retrieval_method describes the evidence/property source
  37. # (for example "MusicBrainz artist ID"), not the name of this script.
  38. WIKIDATA_IDENTIFIER_PROPERTIES: dict[str, tuple[str, str]] = {
  39. "P2671": ("mid", "Google Knowledge Graph ID"),
  40. "P434": ("musicbrainz-artist-id", "MusicBrainz artist ID"),
  41. "P435": ("musicbrainz-work-id", "MusicBrainz work ID"),
  42. "P436": ("musicbrainz-release-group-id", "MusicBrainz release group ID"),
  43. "P439": ("musicbrainz-release-id", "MusicBrainz release ID"),
  44. "P444": ("musicbrainz-recording-id", "MusicBrainz recording ID"),
  45. "P345": ("imdb-id", "IMDb ID"),
  46. "P214": ("viaf-id", "VIAF ID"),
  47. "P213": ("isni", "ISNI"),
  48. "P227": ("gnd-id", "GND ID"),
  49. }
  50. # Entity-type-specific Wikidata fields worth capturing early. Birth date and
  51. # coordinates are strict because they are type-specific and should not be left
  52. # as competing active claims.
  53. WIKIDATA_TYPE_FIELD_PLAN: dict[str, dict[str, tuple[str, str]]] = {
  54. "Person": {
  55. "P569": ("birth-date", "date of birth"),
  56. "P19": ("birth-place", "place of birth"),
  57. "P27": ("citizenship", "country of citizenship"),
  58. },
  59. "Organization": {
  60. "P571": ("inception", "inception"),
  61. "P159": ("headquarters", "headquarters location"),
  62. "P452": ("industry", "industry"),
  63. },
  64. "Location": {
  65. "P571": ("inception", "inception"),
  66. "P17": ("country", "country"),
  67. "P131": ("located-in", "located in the administrative territorial entity"),
  68. "P625": ("coordinates", "coordinate location"),
  69. },
  70. }
  71. UNITARY_IDENTIFIER_TYPES = {"mid", "qid", "wikidata-entity", "birth-date", "latitude", "longitude", "coordinates"}
  72. QID_RE = re.compile(r"^Q\d+$")
  73. # Candidate ranking step configuration (.env-controlled)
  74. CANDIDATE_RANK_PROVIDER = os.getenv("ATLAS_CANDIDATE_RANK_PROVIDER", "auto")
  75. CANDIDATE_RANK_MODEL = os.getenv("ATLAS_CANDIDATE_RANK_MODEL", "")
  76. CANDIDATE_RANK_SYSTEM_PROMPT = os.getenv(
  77. "ATLAS_CANDIDATE_RANK_SYSTEM_PROMPT",
  78. str(ROOT / "prompts" / "candidate_ranking" / "system.txt"),
  79. )
  80. CANDIDATE_RANK_USER_TEMPLATE = os.getenv(
  81. "ATLAS_CANDIDATE_RANK_USER_TEMPLATE",
  82. str(ROOT / "prompts" / "candidate_ranking" / "user_template.txt"),
  83. )
  84. OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
  85. OPENAI_MODEL = os.getenv("ATLAS_OPENAI_MODEL", os.getenv("OPENAI_MODEL", "gpt-4o-mini"))
  86. GROQ_API_KEY = os.getenv("GROQ_API_KEY")
  87. GROQ_MODEL = os.getenv("ATLAS_GROQ_MODEL", os.getenv("GROQ_MODEL", "meta-llama/llama-4-scout-17b-16e-instruct"))
  88. def _planned_claim_id(subject: str, predicate: str, value: str, layer: str = "raw") -> str:
  89. created_at = datetime.now(timezone.utc).date().isoformat()
  90. return f"clm_{layer}_{claim_hash(subject, predicate, value, layer, created_at=created_at)}"
  91. def _infer_wikidata_canonical_type(entity_block: dict[str, Any]) -> str | None:
  92. claims = entity_block.get("claims", {}) if isinstance(entity_block, dict) else {}
  93. p31 = claims.get("P31", []) if isinstance(claims, dict) else []
  94. for claim in p31:
  95. mainsnak = claim.get("mainsnak", {})
  96. datavalue = mainsnak.get("datavalue", {})
  97. value = datavalue.get("value", {})
  98. wid = value.get("id") if isinstance(value, dict) else None
  99. mapped = WIKIDATA_CLASS_MAP.get(wid)
  100. if mapped:
  101. return mapped
  102. return None
  103. def _fallback_lookup_looks_compatible(current_type: str, lookup: dict[str, Any], inferred: str | None) -> bool:
  104. current_type = (current_type or "unknown").strip()
  105. if current_type == "unknown":
  106. return True
  107. if inferred and inferred != current_type:
  108. return False
  109. desc = str(lookup.get("description") or "").lower()
  110. if current_type == "Location" and "family name" in desc:
  111. return False
  112. if current_type == "Person" and "city" in desc:
  113. return False
  114. return True
  115. def _resolution_is_clear(current_label: str, current_mid: str | None, current_type: str, wikidata_label: str | None, wikidata_mid: str | None, wikidata_type: str | None) -> bool:
  116. if not wikidata_label or wikidata_label != current_label:
  117. return False
  118. if wikidata_mid and current_mid and wikidata_mid != current_mid:
  119. return False
  120. if wikidata_type and current_type and current_type != "unknown" and wikidata_type != current_type:
  121. return False
  122. return bool(wikidata_type)
  123. def _choose_trends_candidate_mid(candidates: list[dict[str, Any]], *, label: str, wikidata_label: str | None, entity_type: str) -> tuple[str | None, str | None]:
  124. """Pick the best Trends MID by label/type match.
  125. Exact label matches are preferred, and the candidate type must be compatible
  126. with the current Atlas entity type when possible.
  127. """
  128. if not candidates:
  129. return None, None
  130. wanted_labels = {label.strip().lower()}
  131. if wikidata_label:
  132. wanted_labels.add(wikidata_label.strip().lower())
  133. def compatible(candidate_type: str | None) -> bool:
  134. ct = (candidate_type or "").lower()
  135. et = (entity_type or "unknown").lower()
  136. if et == "location":
  137. return any(word in ct for word in ["city", "settlement", "place", "region", "location", "country", "town", "village"])
  138. if et == "person":
  139. return any(word in ct for word in ["person", "artist", "musician", "writer", "politician", "actor"])
  140. if et == "organization":
  141. return any(word in ct for word in ["organization", "company", "university", "school", "institution"])
  142. return True
  143. for c in candidates:
  144. mid = c.get("mid")
  145. title = (c.get("title") or "").strip().lower()
  146. ctype = c.get("type")
  147. if mid and title in wanted_labels and compatible(ctype):
  148. return mid, c.get("title")
  149. for c in candidates:
  150. mid = c.get("mid")
  151. title = (c.get("title") or "").strip().lower()
  152. if mid and title in wanted_labels:
  153. return mid, c.get("title")
  154. return None, None
  155. def _load_text(path: str) -> str:
  156. return Path(path).read_text(encoding="utf-8")
  157. def _pick_rank_provider() -> str | None:
  158. if CANDIDATE_RANK_PROVIDER == "openai" and OPENAI_API_KEY:
  159. return "openai"
  160. if CANDIDATE_RANK_PROVIDER == "groq" and GROQ_API_KEY:
  161. return "groq"
  162. if CANDIDATE_RANK_PROVIDER == "auto":
  163. if GROQ_API_KEY:
  164. return "groq"
  165. if OPENAI_API_KEY:
  166. return "openai"
  167. return None
  168. def _pick_type_provider() -> str | None:
  169. if TYPE_CLASSIFIER_PROVIDER == "openai" and OPENAI_API_KEY:
  170. return "openai"
  171. if TYPE_CLASSIFIER_PROVIDER == "groq" and GROQ_API_KEY:
  172. return "groq"
  173. if TYPE_CLASSIFIER_PROVIDER == "auto":
  174. if GROQ_API_KEY:
  175. return "groq"
  176. if OPENAI_API_KEY:
  177. return "openai"
  178. return None
  179. async def _choose_type_with_llm(item: dict[str, Any], wikidata_hint: dict[str, Any], candidates: list[str]) -> str | None:
  180. # Only used when ontology reasoning can't produce a unique Atlas type.
  181. provider = _pick_type_provider()
  182. if provider is None:
  183. return None
  184. model = TYPE_CLASSIFIER_MODEL or (GROQ_MODEL if provider == "groq" else OPENAI_MODEL)
  185. url = "https://api.groq.com/openai/v1/chat/completions" if provider == "groq" else "https://api.openai.com/v1/chat/completions"
  186. api_key = GROQ_API_KEY if provider == "groq" else OPENAI_API_KEY
  187. prompt = (
  188. "Classify this entity into one Atlas type only.\n"
  189. f"Label: {item.get('label', '')}\n"
  190. f"Description: {item.get('description', '')}\n"
  191. f"Wikidata label: {wikidata_hint.get('label', '')}\n"
  192. f"Wikidata description: {wikidata_hint.get('description', '')}\n"
  193. f"P31-derived Atlas candidates: {', '.join(candidates)}\n"
  194. "Allowed Atlas types: Person, Organization, Location, CreativeWork, Event, Product, Taxon, Other.\n"
  195. 'Return strict JSON only: {"type":"...","confidence":0.0-1.0,"reason":"..."}'
  196. )
  197. payload = {
  198. "model": model,
  199. "temperature": 0,
  200. "messages": [
  201. {"role": "system", "content": "You are Atlas type adjudicator."},
  202. {"role": "user", "content": prompt},
  203. ],
  204. }
  205. try:
  206. async with httpx.AsyncClient(timeout=20) as client:
  207. resp = await client.post(url, json=payload, headers={"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"})
  208. resp.raise_for_status()
  209. content = resp.json().get("choices", [{}])[0].get("message", {}).get("content", "")
  210. if not content:
  211. return None
  212. try:
  213. parsed = json.loads(content)
  214. except Exception:
  215. parsed = json.loads(content.strip().strip("`").replace("json", "", 1).strip()) if content.strip().startswith("`") else None
  216. return parsed.get("type") if isinstance(parsed, dict) else None
  217. except Exception:
  218. return None
  219. async def _rank_candidates_with_llm(item: dict[str, Any], wikidata_hint: dict[str, Any], candidates: list[dict[str, Any]]) -> dict[str, Any] | None:
  220. provider = _pick_rank_provider()
  221. if provider is None:
  222. return None
  223. system_prompt = _load_text(CANDIDATE_RANK_SYSTEM_PROMPT)
  224. user_template = _load_text(CANDIDATE_RANK_USER_TEMPLATE)
  225. user_prompt = user_template.format(
  226. atlas_id=item.get("atlas_id", ""),
  227. canonical_label=item.get("label", ""),
  228. canonical_description=item.get("description", ""),
  229. canonical_type=item.get("entity_type", "unknown"),
  230. wikidata_qid=wikidata_hint.get("qid", ""),
  231. wikidata_label=wikidata_hint.get("label", ""),
  232. wikidata_description=wikidata_hint.get("description", ""),
  233. candidates_json=json.dumps(candidates, ensure_ascii=False),
  234. )
  235. model = CANDIDATE_RANK_MODEL or (GROQ_MODEL if provider == "groq" else OPENAI_MODEL)
  236. url = "https://api.groq.com/openai/v1/chat/completions" if provider == "groq" else "https://api.openai.com/v1/chat/completions"
  237. api_key = GROQ_API_KEY if provider == "groq" else OPENAI_API_KEY
  238. headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}
  239. payload = {
  240. "model": model,
  241. "temperature": 0,
  242. "messages": [
  243. {"role": "system", "content": system_prompt},
  244. {"role": "user", "content": user_prompt},
  245. ],
  246. }
  247. try:
  248. async with httpx.AsyncClient(timeout=20) as client:
  249. resp = await client.post(url, json=payload, headers=headers)
  250. resp.raise_for_status()
  251. content = resp.json().get("choices", [{}])[0].get("message", {}).get("content", "")
  252. if not content:
  253. return None
  254. try:
  255. return json.loads(content)
  256. except Exception:
  257. text = content.strip()
  258. if text.startswith("```"):
  259. text = text.strip("`")
  260. if text.lower().startswith("json"):
  261. text = text[4:].strip()
  262. try:
  263. return json.loads(text)
  264. except Exception:
  265. return None
  266. except Exception:
  267. return None
  268. def build_parser() -> argparse.ArgumentParser:
  269. parser = argparse.ArgumentParser(description="Atlas maintenance / claim adjudication helper")
  270. parser.add_argument("--dry-run", action="store_true", help="Show planned claim updates without writing")
  271. parser.add_argument("--page-size", type=int, default=50, help="How many entities to scan per page")
  272. parser.add_argument("--start-after", default="", help="Resume scanning after this canonical label")
  273. parser.add_argument("--checkpoint-file", default=".atlas-maintenance.checkpoint", help="File storing the last processed label")
  274. parser.add_argument("--reset-checkpoint", action="store_true", help="Ignore any saved checkpoint and start from the beginning")
  275. parser.add_argument("--clear-checkpoint", action="store_true", help="Delete the checkpoint file and exit")
  276. parser.add_argument("--refresh-payloads", action="store_true", help="Explicitly re-fetch online payloads during maintenance")
  277. return parser
  278. async def _sparql_bindings(query: str) -> list[dict[str, Any]]:
  279. svc = AtlasStorageService()
  280. result = await svc._call_tool("sparql_query", {"query": query})
  281. if isinstance(result, list) and result:
  282. first = result[0]
  283. text = getattr(first, "text", None)
  284. result = json.loads(text) if text else {}
  285. return result.get("results", {}).get("bindings", []) if isinstance(result, dict) else []
  286. def _claim_bindings_from_read(payload: dict[str, Any]) -> list[dict[str, Any]]:
  287. result = payload.get("result")
  288. if isinstance(result, list) and result:
  289. text = getattr(result[0], "text", None)
  290. if text:
  291. try:
  292. result = json.loads(text)
  293. except Exception:
  294. return []
  295. if isinstance(result, dict):
  296. return result.get("results", {}).get("bindings", [])
  297. return []
  298. async def discover_subjects(page_size: int, start_after: str = "") -> list[dict[str, Any]]:
  299. """Ask Virtuoso for known Atlas entities and return their immutable atlas_id values.
  300. This keeps the maintenance job automatic: we operate on the stored graph,
  301. not on a hand-entered subject list or mutable label text.
  302. """
  303. start_marker = start_after.replace("\\", "\\\\").replace('"', '\\"') if start_after else ""
  304. filter_clause = f'FILTER(STR(COALESCE(?atlasId, ?label)) > "{start_marker}")' if start_marker else ""
  305. query = """
  306. PREFIX atlas: <http://world.eu.org/atlas_ontology#>
  307. SELECT DISTINCT ?entity ?atlasId ?label ?desc ?type ?mid ?qid ?rawWd ?rawTrends WHERE {{
  308. GRAPH <http://world.eu.org/atlas_data#> {{
  309. ?entity a atlas:Entity ;
  310. atlas:canonicalLabel ?label .
  311. OPTIONAL {{ ?entity atlas:canonicalDescription ?desc . }}
  312. OPTIONAL {{ ?entity atlas:rawWikidataJson ?rawWd . }}
  313. OPTIONAL {{ ?entity atlas:rawTrendsJson ?rawTrends . }}
  314. OPTIONAL {{ ?entity atlas:hasCanonicalType ?type . }}
  315. OPTIONAL {{
  316. ?entity atlas:hasIdentifier ?ident .
  317. ?ident atlas:identifierType atlas:Mid ;
  318. atlas:identifierValue ?mid .
  319. }}
  320. OPTIONAL {{
  321. ?entity atlas:hasIdentifier ?identQid .
  322. ?identQid atlas:identifierType atlas:WikidataQID ;
  323. atlas:identifierValue ?qid .
  324. }}
  325. OPTIONAL {{ ?entity atlas:atlasId ?atlasId . }}
  326. {filter_clause}
  327. }}
  328. }}
  329. ORDER BY COALESCE(?atlasId, ?label)
  330. LIMIT {page_size}
  331. """.format(filter_clause=filter_clause, page_size=page_size)
  332. bindings = await _sparql_bindings(query)
  333. return [
  334. {
  335. "entity_iri": b.get("entity", {}).get("value", ""),
  336. "atlas_id": b.get("atlasId", {}).get("value", "") or b.get("label", {}).get("value", ""),
  337. "label": b.get("label", {}).get("value", ""),
  338. "description": b.get("desc", {}).get("value", ""),
  339. "entity_type": (b.get("type", {}).get("value", "").split("#")[-1] if b.get("type", {}).get("value") else "unknown"),
  340. "mid": b.get("mid", {}).get("value"),
  341. "qid": b.get("qid", {}).get("value"),
  342. "raw_wikidata_json": b.get("rawWd", {}).get("value"),
  343. "raw_trends_json": b.get("rawTrends", {}).get("value"),
  344. }
  345. for b in bindings
  346. if b.get("label", {}).get("value")
  347. ]
  348. async def maintain_subject(item: dict[str, Any], dry_run: bool, refresh_payloads: bool) -> dict[str, Any]:
  349. subject = item.get("label", "")
  350. atlas_id = item.get("atlas_id", "")
  351. entity = AtlasEntity(
  352. atlas_id=atlas_id,
  353. canonical_label=subject,
  354. canonical_description=item.get("description") or None,
  355. entity_type=item.get("entity_type") or "unknown",
  356. raw_payload={},
  357. )
  358. report: dict[str, Any] = {
  359. "subject": subject,
  360. "atlas_id": atlas_id,
  361. "planned": [],
  362. "written": False,
  363. "wikidata_status": "missing",
  364. "comparison": {},
  365. "planned_core_updates": [],
  366. "planned_identifier_claims": 0,
  367. "planned_identifier_types": [],
  368. "planned_type_field_claims": 0,
  369. "trends_candidates_count": 0,
  370. "ranking_attempted": False,
  371. }
  372. wikidata = None
  373. full = None
  374. raw_wd = item.get("raw_wikidata_json")
  375. if raw_wd:
  376. try:
  377. wikidata = json.loads(raw_wd)
  378. except Exception:
  379. wikidata = None
  380. if not isinstance(wikidata, dict):
  381. wikidata = {
  382. "wikidata_status": "hit" if item.get("qid") else "missing",
  383. "qid": item.get("qid"),
  384. }
  385. # Strong sync rule: if persisted Wikidata payload already carries a better
  386. # canonical label/description, plan that core update immediately.
  387. wd_label = (wikidata.get("label") or "").strip() if isinstance(wikidata, dict) else ""
  388. wd_desc = (wikidata.get("description") or "").strip() if isinstance(wikidata, dict) else ""
  389. if wd_label and wd_label != subject:
  390. report["planned_core_updates"].append(
  391. {"field": "canonical_label", "from": subject, "to": wd_label, "source": "wikidata-payload"}
  392. )
  393. if wd_desc and wd_desc != (item.get("description") or ""):
  394. report["planned_core_updates"].append(
  395. {"field": "canonical_description", "from": item.get("description"), "to": wd_desc, "source": "wikidata-payload"}
  396. )
  397. # Fallback: when the graph row does not yet carry a stored Wikidata payload,
  398. # do a direct lookup from the current canonical label so maintenance still works.
  399. if refresh_payloads and wikidata.get("wikidata_status") == "missing" and subject:
  400. lookup = await lookup_wikidata(subject)
  401. if isinstance(lookup, dict) and lookup.get("qid"):
  402. inferred = _infer_wikidata_canonical_type(lookup.get("entity") or {})
  403. entity_type = (item.get("entity_type") or "unknown").strip()
  404. if not _fallback_lookup_looks_compatible(entity_type, lookup, inferred):
  405. report["wikidata_status"] = "ambiguous_candidate"
  406. report["comparison"] = {
  407. "fallback_qid": lookup.get("qid"),
  408. "fallback_label": lookup.get("label"),
  409. "fallback_type": inferred,
  410. "current_type": entity_type,
  411. "accepted": False,
  412. }
  413. candidates = []
  414. raw_trends = item.get("raw_trends_json")
  415. if raw_trends:
  416. try:
  417. trends = json.loads(raw_trends)
  418. candidates = trends.get("candidates", []) if isinstance(trends, dict) else []
  419. except Exception:
  420. candidates = []
  421. report["trends_candidates_count"] = len(candidates)
  422. if candidates:
  423. report["ranking_attempted"] = True
  424. ranked = await _rank_candidates_with_llm(item, lookup, candidates)
  425. if isinstance(ranked, dict) and ranked.get("selected_mid"):
  426. provider = _pick_rank_provider()
  427. model = CANDIDATE_RANK_MODEL or (GROQ_MODEL if provider == "groq" else OPENAI_MODEL)
  428. report["planned"].append(
  429. {
  430. "action": "llm_ranked_candidate_mid",
  431. "selected_mid": ranked.get("selected_mid"),
  432. "selected_title": ranked.get("selected_title"),
  433. "confidence": ranked.get("confidence"),
  434. "reason": ranked.get("reason"),
  435. "provider": provider,
  436. "model": model,
  437. }
  438. )
  439. else:
  440. report["planned"].append(
  441. {
  442. "action": "candidate_ranking_no_selection",
  443. "reason": "llm_returned_no_selected_mid_or_invalid_json",
  444. }
  445. )
  446. else:
  447. wikidata = {
  448. "wikidata_status": "hit",
  449. "qid": lookup.get("qid"),
  450. "label": lookup.get("label"),
  451. "description": lookup.get("description"),
  452. "retrieved_at": lookup.get("retrieved_at"),
  453. }
  454. if isinstance(wikidata, dict) and wikidata.get("wikidata_status") == "hit" and wikidata.get("qid"):
  455. report["wikidata_status"] = "hit"
  456. # If Wikidata already knows the entity, fetch the full object and mine
  457. # any additional identifiers we can safely attach as claims.
  458. full = await fetch_wikidata_entity(wikidata.get("qid"))
  459. if full and isinstance(full.get("entity"), dict):
  460. report["wikidata_status"] = "enriched"
  461. entity_block = full["entity"]
  462. claims_dict = entity_block.get("claims", {}) if isinstance(entity_block, dict) else {}
  463. p31_claims = claims_dict.get("P31", []) if isinstance(claims_dict, dict) else []
  464. p31_qids: list[str] = []
  465. for claim in p31_claims:
  466. mainsnak = claim.get("mainsnak", {})
  467. datavalue = mainsnak.get("datavalue", {})
  468. value = datavalue.get("value", {})
  469. wid = value.get("id") if isinstance(value, dict) else None
  470. if wid:
  471. p31_qids.append(wid)
  472. inferred = infer_atlas_type_from_p31(tuple(p31_qids)) if p31_qids else None
  473. fallback_candidates = []
  474. if not inferred and p31_qids:
  475. fallback_candidates = [WIKIDATA_CLASS_MAP.get(qid) for qid in p31_qids if WIKIDATA_CLASS_MAP.get(qid)]
  476. fallback_candidates = [c for c in fallback_candidates if c]
  477. if len(fallback_candidates) == 1:
  478. inferred = fallback_candidates[0]
  479. elif fallback_candidates:
  480. inferred = await _choose_type_with_llm(item, full, fallback_candidates)
  481. wiki_label = (full.get("label") or "").strip()
  482. wiki_description = (full.get("description") or "").strip()
  483. current_label = subject.strip()
  484. current_mid = item.get("mid")
  485. qid = full.get("qid")
  486. wikidata_mid = None
  487. claims = entity_block.get("claims", {}) if isinstance(entity_block, dict) else {}
  488. mid_claims = claims.get("P2671", []) if isinstance(claims, dict) else []
  489. for claim_node in mid_claims:
  490. mainsnak = claim_node.get("mainsnak", {})
  491. datavalue = mainsnak.get("datavalue", {})
  492. value = datavalue.get("value")
  493. if isinstance(value, str) and value.strip():
  494. wikidata_mid = value.strip()
  495. break
  496. report["comparison"] = {
  497. "canonical_label": {"current": current_label, "wikidata": wiki_label or None, "matches": bool(wiki_label and wiki_label == current_label)},
  498. "mid": {"current": current_mid, "wikidata": wikidata_mid, "matches": bool(current_mid and wikidata_mid and current_mid == wikidata_mid)},
  499. }
  500. if _resolution_is_clear(current_label, current_mid, item.get("entity_type") or "unknown", wiki_label, wikidata_mid, inferred):
  501. report["wikidata_status"] = "clear"
  502. report["planned_core_updates"] = []
  503. if inferred and inferred != (item.get("entity_type") or "unknown"):
  504. report["planned_core_updates"].append(
  505. {
  506. "field": "entity_type",
  507. "from": item.get("entity_type"),
  508. "to": inferred,
  509. "source": "wikidata",
  510. }
  511. )
  512. if wiki_label and wiki_label != current_label:
  513. report["planned_core_updates"].append(
  514. {
  515. "field": "canonical_label",
  516. "from": current_label,
  517. "to": wiki_label,
  518. "source": "wikidata",
  519. }
  520. )
  521. if wiki_description and wiki_description != (item.get("description") or ""):
  522. report["planned_core_updates"].append(
  523. {
  524. "field": "canonical_description",
  525. "from": item.get("description"),
  526. "to": wiki_description,
  527. "source": "wikidata",
  528. }
  529. )
  530. # If Wikidata exposes a MID and it differs from the Trends MID, that
  531. # is a strong signal for claim supersession / correction.
  532. if wikidata_mid and wikidata_mid != current_mid:
  533. report["planned_identifier_claims"] += 1
  534. report["planned_identifier_types"].append("mid")
  535. report["planned"].append(
  536. {
  537. "action": "supersede_mid_and_add_correct_mid",
  538. "from": current_mid,
  539. "to": wikidata_mid,
  540. "source": "wikidata",
  541. }
  542. )
  543. # If Wikidata does not provide a MID, Trends can be used only as a
  544. # last resort. If we already have a Wikidata QID hit, do NOT inject
  545. # a Trends MID (prevents Graza-style cross-entity contamination).
  546. if not wikidata_mid and not qid:
  547. trends_candidates = []
  548. raw_trends = item.get("raw_trends_json")
  549. if raw_trends:
  550. try:
  551. trends = json.loads(raw_trends)
  552. trends_candidates = trends.get("candidates", []) if isinstance(trends, dict) else []
  553. except Exception:
  554. trends_candidates = []
  555. selected_mid, selected_title = _choose_trends_candidate_mid(
  556. trends_candidates,
  557. label=current_label,
  558. wikidata_label=wiki_label,
  559. entity_type=item.get("entity_type") or "unknown",
  560. )
  561. if selected_mid and selected_mid != current_mid:
  562. report["planned_identifier_claims"] += 1
  563. report["planned_identifier_types"].append("mid")
  564. report["planned"].append(
  565. {
  566. "action": "supersede_mid_and_add_correct_mid",
  567. "from": current_mid,
  568. "to": selected_mid,
  569. "source": "trends-candidate",
  570. "title": selected_title,
  571. }
  572. )
  573. # QID is always a known cross-reference and acts as a stable anchor.
  574. existing_qid = None
  575. if qid and qid != existing_qid:
  576. claim = AtlasClaim(
  577. claim_id=_planned_claim_id(atlas_id, "atlas:hasIdentifier", qid),
  578. subject=atlas_id,
  579. predicate="atlas:hasIdentifier",
  580. object=AtlasClaimObject(kind="identifier", id_type="qid", value=qid),
  581. layer="raw",
  582. provenance=AtlasProvenance(
  583. source="wikidata",
  584. retrieval_method="atlas-maintenance-wikidata-enrichment",
  585. confidence=0.99,
  586. retrieved_at=full.get("retrieved_at"),
  587. evidence_property="qid",
  588. ),
  589. )
  590. report["planned"].append({"action": "add_identifier_claim", "claim": asdict(claim)})
  591. report["planned_identifier_claims"] += 1
  592. report["planned_identifier_types"].append("qid")
  593. for wikidata_property, (identifier_type, label) in WIKIDATA_IDENTIFIER_PROPERTIES.items():
  594. property_claims = claims.get(wikidata_property, []) if isinstance(claims, dict) else []
  595. for claim_node in property_claims:
  596. mainsnak = claim_node.get("mainsnak", {})
  597. datavalue = mainsnak.get("datavalue", {})
  598. value = datavalue.get("value")
  599. if not isinstance(value, str) or not value.strip():
  600. continue
  601. existing = None
  602. if existing == value:
  603. continue
  604. claim = AtlasClaim(
  605. claim_id=_planned_claim_id(atlas_id, "atlas:hasIdentifier", value),
  606. subject=atlas_id,
  607. predicate="atlas:hasIdentifier",
  608. object=AtlasClaimObject(kind="identifier", id_type=identifier_type, value=value),
  609. layer="raw",
  610. provenance=AtlasProvenance(
  611. source="wikidata",
  612. retrieval_method="atlas-maintenance-wikidata-enrichment",
  613. confidence=0.99,
  614. retrieved_at=full.get("retrieved_at"),
  615. evidence_property=wikidata_property,
  616. ),
  617. )
  618. report["planned"].append({"action": "add_identifier_claim", "claim": asdict(claim)})
  619. report["planned_identifier_claims"] += 1
  620. report["planned_identifier_types"].append(identifier_type)
  621. # Type-specific enrichment: different entity kinds care about different fields.
  622. # We only plan claims for high-confidence public facts that are useful for
  623. # disambiguation and consolidation.
  624. type_plan = WIKIDATA_TYPE_FIELD_PLAN.get(item.get("entity_type") or "unknown", {})
  625. for wikidata_property, (claim_type, label) in type_plan.items():
  626. property_claims = claims.get(wikidata_property, []) if isinstance(claims, dict) else []
  627. for claim_node in property_claims:
  628. mainsnak = claim_node.get("mainsnak", {})
  629. datavalue = mainsnak.get("datavalue", {})
  630. value = datavalue.get("value")
  631. if value in (None, "", {}):
  632. continue
  633. if wikidata_property == "P625" and isinstance(value, dict):
  634. lat = value.get("latitude")
  635. lon = value.get("longitude")
  636. if lat is not None:
  637. lat_text = str(lat)
  638. lat_claim = AtlasClaim(
  639. claim_id=_planned_claim_id(atlas_id, "atlas:hasLatitude", lat_text),
  640. subject=atlas_id,
  641. predicate="atlas:hasLatitude",
  642. object=AtlasClaimObject(kind="literal", value=lat_text),
  643. layer="raw",
  644. provenance=AtlasProvenance(
  645. source="wikidata",
  646. retrieval_method="atlas-maintenance-wikidata-enrichment",
  647. confidence=0.95,
  648. retrieved_at=full.get("retrieved_at"),
  649. evidence_property=wikidata_property,
  650. ),
  651. )
  652. report["planned"].append({"action": "add_type_field_claim", "claim": asdict(lat_claim)})
  653. report["planned_type_field_claims"] += 1
  654. if lon is not None:
  655. lon_text = str(lon)
  656. lon_claim = AtlasClaim(
  657. claim_id=_planned_claim_id(atlas_id, "atlas:hasLongitude", lon_text),
  658. subject=atlas_id,
  659. predicate="atlas:hasLongitude",
  660. object=AtlasClaimObject(kind="literal", value=lon_text),
  661. layer="raw",
  662. provenance=AtlasProvenance(
  663. source="wikidata",
  664. retrieval_method="atlas-maintenance-wikidata-enrichment",
  665. confidence=0.95,
  666. retrieved_at=full.get("retrieved_at"),
  667. evidence_property=wikidata_property,
  668. ),
  669. )
  670. report["planned"].append({"action": "add_type_field_claim", "claim": asdict(lon_claim)})
  671. report["planned_type_field_claims"] += 1
  672. continue
  673. # For this first pass we capture these as literal payload claims;
  674. # the exact ontology mapping can be tightened later.
  675. if isinstance(value, dict):
  676. # entity / place objects often carry an id and label
  677. value = value.get("id") or value.get("time") or value.get("text") or value.get("amount")
  678. if not isinstance(value, str):
  679. continue
  680. claim = AtlasClaim(
  681. claim_id=_planned_claim_id(atlas_id, f"atlas:has{claim_type.replace('-', ' ').title().replace(' ', '')}", value),
  682. subject=atlas_id,
  683. predicate=f"atlas:has{claim_type.replace('-', ' ').title().replace(' ', '')}",
  684. object=(
  685. AtlasClaimObject(kind="identifier", id_type="wikidata-entity", value=value)
  686. if QID_RE.match(value)
  687. else AtlasClaimObject(kind="literal", value=value)
  688. ),
  689. layer="raw",
  690. provenance=AtlasProvenance(
  691. source="wikidata",
  692. retrieval_method="atlas-maintenance-wikidata-enrichment",
  693. confidence=0.95,
  694. retrieved_at=full.get("retrieved_at"),
  695. evidence_property=wikidata_property,
  696. ),
  697. )
  698. report["planned"].append({"action": "add_type_field_claim", "claim": asdict(claim)})
  699. report["planned_type_field_claims"] += 1
  700. if dry_run:
  701. return report
  702. # Non-dry-run: write planned improvements back to graph as an updated entity snapshot.
  703. updated_label = next((u["to"] for u in report["planned_core_updates"] if u.get("field") == "canonical_label"), subject)
  704. updated_desc = next((u["to"] for u in report["planned_core_updates"] if u.get("field") == "canonical_description"), item.get("description"))
  705. updated_type = next((u["to"] for u in report["planned_core_updates"] if u.get("field") == "entity_type"), item.get("entity_type") or "unknown")
  706. write_claims: list[AtlasClaim] = []
  707. for planned in report["planned"]:
  708. action = planned.get("action")
  709. if action in {"add_identifier_claim", "add_type_field_claim"} and isinstance(planned.get("claim"), dict):
  710. c = planned["claim"]
  711. obj = c.get("object", {})
  712. prov = c.get("provenance") or {}
  713. write_claims.append(
  714. AtlasClaim(
  715. claim_id=c.get("claim_id"),
  716. subject=atlas_id,
  717. predicate=c.get("predicate"),
  718. object=AtlasClaimObject(kind=obj.get("kind"), value=obj.get("value"), id_type=obj.get("id_type")),
  719. layer=c.get("layer", "raw"),
  720. status=c.get("status", "active"),
  721. provenance=AtlasProvenance(
  722. source=prov.get("source", "maintenance"),
  723. retrieval_method=prov.get("retrieval_method", "atlas-maintenance"),
  724. confidence=float(prov.get("confidence", 0.0) or 0.0),
  725. retrieved_at=prov.get("retrieved_at"),
  726. evidence_property=prov.get("evidence_property"),
  727. provider=prov.get("provider"),
  728. model=prov.get("model"),
  729. ),
  730. created_at=c.get("created_at"),
  731. )
  732. )
  733. if action == "llm_ranked_candidate_mid" and planned.get("selected_mid"):
  734. write_claims.append(
  735. AtlasClaim(
  736. claim_id=_planned_claim_id(atlas_id, "atlas:hasIdentifier", planned["selected_mid"]),
  737. subject=atlas_id,
  738. predicate="atlas:hasIdentifier",
  739. object=AtlasClaimObject(kind="identifier", id_type="mid", value=planned["selected_mid"]),
  740. layer="raw",
  741. provenance=AtlasProvenance(
  742. source="maintenance-llm-ranking",
  743. retrieval_method="atlas-maintenance-candidate-ranking",
  744. confidence=float(planned.get("confidence", 0.0) or 0.0),
  745. retrieved_at=datetime.now(timezone.utc).isoformat(),
  746. provider=planned.get("provider"),
  747. model=planned.get("model"),
  748. ),
  749. )
  750. )
  751. updated_entity = AtlasEntity(
  752. atlas_id=atlas_id,
  753. canonical_label=updated_label,
  754. canonical_description=updated_desc,
  755. entity_type=updated_type,
  756. claims=write_claims,
  757. raw_payload={
  758. "wikidata": {
  759. "wikidata_status": report.get("wikidata_status", "missing"),
  760. "qid": wikidata.get("qid") if isinstance(wikidata, dict) else None,
  761. "label": updated_label,
  762. "description": updated_desc,
  763. },
  764. "wikidata_entity_json": json.dumps(full.get("entity"), ensure_ascii=False) if isinstance(wikidata, dict) and isinstance(full, dict) and full.get("entity") else None,
  765. },
  766. )
  767. svc = AtlasStorageService()
  768. # Supersede conflicting active claims before inserting new ones.
  769. existing = await svc.read_entity_claims(atlas_id, include_superseded=True)
  770. existing_bindings = _claim_bindings_from_read(existing)
  771. to_supersede: list[str] = []
  772. planned_types = {
  773. (p.get("claim", {}).get("object", {}).get("id_type"), p.get("claim", {}).get("object", {}).get("value"))
  774. for p in report["planned"]
  775. if p.get("action") == "add_identifier_claim"
  776. }
  777. for b in existing_bindings:
  778. status = b.get("status", {}).get("value")
  779. if status != "active":
  780. continue
  781. claim_uri = b.get("claim", {}).get("value")
  782. pred = b.get("pred", {}).get("value")
  783. id_type = (b.get("idType", {}).get("value") or "").rsplit("#", 1)[-1].replace("WikidataQID", "qid").replace("Mid", "mid").lower()
  784. id_val = b.get("idVal", {}).get("value")
  785. if pred == "atlas:hasCanonicalType" and any(u.get("field") == "entity_type" for u in report["planned_core_updates"]):
  786. if claim_uri:
  787. to_supersede.append(claim_uri)
  788. if pred == "atlas:hasIdentifier" and id_type and id_type in UNITARY_IDENTIFIER_TYPES and any(pt[0] and pt[0].lower() == id_type and pt[1] != id_val for pt in planned_types):
  789. if claim_uri:
  790. to_supersede.append(claim_uri)
  791. if to_supersede:
  792. await svc.supersede_claims(sorted(set(to_supersede)))
  793. await svc.replace_entity_core(
  794. atlas_id,
  795. canonical_label=updated_label,
  796. canonical_description=updated_desc,
  797. canonical_type=updated_type,
  798. )
  799. write_result = await svc.write_entity(updated_entity)
  800. report["write_result"] = {
  801. "status": write_result.get("status"),
  802. "entity_id": write_result.get("entity_id"),
  803. "graph": write_result.get("graph"),
  804. "message": write_result.get("message"),
  805. }
  806. report["written"] = write_result.get("status") == "ok"
  807. return report
  808. async def main() -> int:
  809. parser = build_parser()
  810. args = parser.parse_args()
  811. checkpoint_path = Path(args.checkpoint_file)
  812. if args.clear_checkpoint:
  813. if checkpoint_path.exists():
  814. checkpoint_path.unlink()
  815. print(json.dumps({"checkpoint_cleared": True, "checkpoint_file": str(checkpoint_path)}, indent=2, ensure_ascii=False))
  816. return 0
  817. start_after = args.start_after.strip()
  818. if args.reset_checkpoint:
  819. start_after = ""
  820. elif not start_after:
  821. checkpoint_path = Path(args.checkpoint_file)
  822. if checkpoint_path.exists():
  823. start_after = checkpoint_path.read_text(encoding="utf-8").strip()
  824. subjects = await discover_subjects(args.page_size, start_after)
  825. summaries = []
  826. for item in subjects:
  827. summaries.append(await maintain_subject(item, args.dry_run, args.refresh_payloads))
  828. if subjects and not args.dry_run:
  829. checkpoint_path.write_text(subjects[-1]["atlas_id"], encoding="utf-8")
  830. print(json.dumps({
  831. "dry_run": args.dry_run,
  832. "checkpoint_file": str(checkpoint_path),
  833. "checkpoint_start_after": start_after,
  834. "results": summaries,
  835. }, indent=2, ensure_ascii=False))
  836. return 0
  837. if __name__ == "__main__":
  838. raise SystemExit(asyncio.run(main()))