#!/usr/bin/env python3 """Atlas maintenance script. Goal: - automatically revisit stored entities - enrich identifier coverage when Wikidata is present - keep the claim supersession model authoritative Operational rule: - no manual subject list is required for normal runs - --dry-run shows what would change, without writing """ from __future__ import annotations import argparse import asyncio import json import os import re import sys from pathlib import Path from dataclasses import asdict from datetime import datetime, timezone from typing import Any import httpx ROOT = Path(__file__).resolve().parents[1] if str(ROOT) not in sys.path: sys.path.insert(0, str(ROOT)) from app.ids import claim_hash from app.models import AtlasClaim, AtlasClaimObject, AtlasEntity, AtlasProvenance from app.storage_service import AtlasStorageService from app.type_classifier import WIKIDATA_CLASS_MAP from app.wikidata_lookup import fetch_wikidata_entity, lookup_wikidata from app.wikidata_type_reasoner import infer_atlas_type_from_p31 # High-confidence identifier properties we can mine from the full Wikidata entity. # The goal is to enrich the entity with public identifiers and to reconcile the # Google MID whenever Wikidata already exposes the same identity through another id. # # Note: provenance.retrieval_method describes the evidence/property source # (for example "MusicBrainz artist ID"), not the name of this script. WIKIDATA_IDENTIFIER_PROPERTIES: dict[str, tuple[str, str]] = { "P2671": ("mid", "Google Knowledge Graph ID"), "P434": ("musicbrainz-artist-id", "MusicBrainz artist ID"), "P435": ("musicbrainz-work-id", "MusicBrainz work ID"), "P436": ("musicbrainz-release-group-id", "MusicBrainz release group ID"), "P439": ("musicbrainz-release-id", "MusicBrainz release ID"), "P444": ("musicbrainz-recording-id", "MusicBrainz recording ID"), "P345": ("imdb-id", "IMDb ID"), "P214": ("viaf-id", "VIAF ID"), "P213": ("isni", "ISNI"), "P227": ("gnd-id", "GND ID"), } # Entity-type-specific Wikidata fields worth capturing early. Birth date and # coordinates are strict because they are type-specific and should not be left # as competing active claims. WIKIDATA_TYPE_FIELD_PLAN: dict[str, dict[str, tuple[str, str]]] = { "Person": { "P569": ("birth-date", "date of birth"), "P19": ("birth-place", "place of birth"), "P27": ("citizenship", "country of citizenship"), }, "Organization": { "P571": ("inception", "inception"), "P159": ("headquarters", "headquarters location"), "P452": ("industry", "industry"), }, "Location": { "P571": ("inception", "inception"), "P17": ("country", "country"), "P131": ("located-in", "located in the administrative territorial entity"), "P625": ("coordinates", "coordinate location"), }, } UNITARY_IDENTIFIER_TYPES = {"mid", "qid", "wikidata-entity", "birth-date", "latitude", "longitude", "coordinates"} QID_RE = re.compile(r"^Q\d+$") # Candidate ranking step configuration (.env-controlled) CANDIDATE_RANK_PROVIDER = os.getenv("ATLAS_CANDIDATE_RANK_PROVIDER", "auto") CANDIDATE_RANK_MODEL = os.getenv("ATLAS_CANDIDATE_RANK_MODEL", "") CANDIDATE_RANK_SYSTEM_PROMPT = os.getenv( "ATLAS_CANDIDATE_RANK_SYSTEM_PROMPT", str(ROOT / "prompts" / "candidate_ranking" / "system.txt"), ) CANDIDATE_RANK_USER_TEMPLATE = os.getenv( "ATLAS_CANDIDATE_RANK_USER_TEMPLATE", str(ROOT / "prompts" / "candidate_ranking" / "user_template.txt"), ) OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") OPENAI_MODEL = os.getenv("ATLAS_OPENAI_MODEL", os.getenv("OPENAI_MODEL", "gpt-4o-mini")) GROQ_API_KEY = os.getenv("GROQ_API_KEY") GROQ_MODEL = os.getenv("ATLAS_GROQ_MODEL", os.getenv("GROQ_MODEL", "meta-llama/llama-4-scout-17b-16e-instruct")) def _planned_claim_id(subject: str, predicate: str, value: str, layer: str = "raw") -> str: created_at = datetime.now(timezone.utc).date().isoformat() return f"clm_{layer}_{claim_hash(subject, predicate, value, layer, created_at=created_at)}" def _infer_wikidata_canonical_type(entity_block: dict[str, Any]) -> str | None: claims = entity_block.get("claims", {}) if isinstance(entity_block, dict) else {} p31 = claims.get("P31", []) if isinstance(claims, dict) else [] for claim in p31: mainsnak = claim.get("mainsnak", {}) datavalue = mainsnak.get("datavalue", {}) value = datavalue.get("value", {}) wid = value.get("id") if isinstance(value, dict) else None mapped = WIKIDATA_CLASS_MAP.get(wid) if mapped: return mapped return None def _fallback_lookup_looks_compatible(current_type: str, lookup: dict[str, Any], inferred: str | None) -> bool: current_type = (current_type or "unknown").strip() if current_type == "unknown": return True if inferred and inferred != current_type: return False desc = str(lookup.get("description") or "").lower() if current_type == "Location" and "family name" in desc: return False if current_type == "Person" and "city" in desc: return False return True def _resolution_is_clear(current_label: str, current_mid: str | None, current_type: str, wikidata_label: str | None, wikidata_mid: str | None, wikidata_type: str | None) -> bool: if not wikidata_label or wikidata_label != current_label: return False if wikidata_mid and current_mid and wikidata_mid != current_mid: return False if wikidata_type and current_type and current_type != "unknown" and wikidata_type != current_type: return False return bool(wikidata_type) def _choose_trends_candidate_mid(candidates: list[dict[str, Any]], *, label: str, wikidata_label: str | None, entity_type: str) -> tuple[str | None, str | None]: """Pick the best Trends MID by label/type match. Exact label matches are preferred, and the candidate type must be compatible with the current Atlas entity type when possible. """ if not candidates: return None, None wanted_labels = {label.strip().lower()} if wikidata_label: wanted_labels.add(wikidata_label.strip().lower()) def compatible(candidate_type: str | None) -> bool: ct = (candidate_type or "").lower() et = (entity_type or "unknown").lower() if et == "location": return any(word in ct for word in ["city", "settlement", "place", "region", "location", "country", "town", "village"]) if et == "person": return any(word in ct for word in ["person", "artist", "musician", "writer", "politician", "actor"]) if et == "organization": return any(word in ct for word in ["organization", "company", "university", "school", "institution"]) return True for c in candidates: mid = c.get("mid") title = (c.get("title") or "").strip().lower() ctype = c.get("type") if mid and title in wanted_labels and compatible(ctype): return mid, c.get("title") for c in candidates: mid = c.get("mid") title = (c.get("title") or "").strip().lower() if mid and title in wanted_labels: return mid, c.get("title") return None, None def _load_text(path: str) -> str: return Path(path).read_text(encoding="utf-8") def _pick_rank_provider() -> str | None: if CANDIDATE_RANK_PROVIDER == "openai" and OPENAI_API_KEY: return "openai" if CANDIDATE_RANK_PROVIDER == "groq" and GROQ_API_KEY: return "groq" if CANDIDATE_RANK_PROVIDER == "auto": if GROQ_API_KEY: return "groq" if OPENAI_API_KEY: return "openai" return None def _pick_type_provider() -> str | None: if TYPE_CLASSIFIER_PROVIDER == "openai" and OPENAI_API_KEY: return "openai" if TYPE_CLASSIFIER_PROVIDER == "groq" and GROQ_API_KEY: return "groq" if TYPE_CLASSIFIER_PROVIDER == "auto": if GROQ_API_KEY: return "groq" if OPENAI_API_KEY: return "openai" return None async def _choose_type_with_llm(item: dict[str, Any], wikidata_hint: dict[str, Any], candidates: list[str]) -> str | None: # Only used when ontology reasoning can't produce a unique Atlas type. provider = _pick_type_provider() if provider is None: return None model = TYPE_CLASSIFIER_MODEL or (GROQ_MODEL if provider == "groq" else OPENAI_MODEL) url = "https://api.groq.com/openai/v1/chat/completions" if provider == "groq" else "https://api.openai.com/v1/chat/completions" api_key = GROQ_API_KEY if provider == "groq" else OPENAI_API_KEY prompt = ( "Classify this entity into one Atlas type only.\n" f"Label: {item.get('label', '')}\n" f"Description: {item.get('description', '')}\n" f"Wikidata label: {wikidata_hint.get('label', '')}\n" f"Wikidata description: {wikidata_hint.get('description', '')}\n" f"P31-derived Atlas candidates: {', '.join(candidates)}\n" "Allowed Atlas types: Person, Organization, Location, CreativeWork, Event, Product, Taxon, Other.\n" 'Return strict JSON only: {"type":"...","confidence":0.0-1.0,"reason":"..."}' ) payload = { "model": model, "temperature": 0, "messages": [ {"role": "system", "content": "You are Atlas type adjudicator."}, {"role": "user", "content": prompt}, ], } try: async with httpx.AsyncClient(timeout=20) as client: resp = await client.post(url, json=payload, headers={"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}) resp.raise_for_status() content = resp.json().get("choices", [{}])[0].get("message", {}).get("content", "") if not content: return None try: parsed = json.loads(content) except Exception: parsed = json.loads(content.strip().strip("`").replace("json", "", 1).strip()) if content.strip().startswith("`") else None return parsed.get("type") if isinstance(parsed, dict) else None except Exception: return None async def _rank_candidates_with_llm(item: dict[str, Any], wikidata_hint: dict[str, Any], candidates: list[dict[str, Any]]) -> dict[str, Any] | None: provider = _pick_rank_provider() if provider is None: return None system_prompt = _load_text(CANDIDATE_RANK_SYSTEM_PROMPT) user_template = _load_text(CANDIDATE_RANK_USER_TEMPLATE) user_prompt = user_template.format( atlas_id=item.get("atlas_id", ""), canonical_label=item.get("label", ""), canonical_description=item.get("description", ""), canonical_type=item.get("entity_type", "unknown"), wikidata_qid=wikidata_hint.get("qid", ""), wikidata_label=wikidata_hint.get("label", ""), wikidata_description=wikidata_hint.get("description", ""), candidates_json=json.dumps(candidates, ensure_ascii=False), ) model = CANDIDATE_RANK_MODEL or (GROQ_MODEL if provider == "groq" else OPENAI_MODEL) url = "https://api.groq.com/openai/v1/chat/completions" if provider == "groq" else "https://api.openai.com/v1/chat/completions" api_key = GROQ_API_KEY if provider == "groq" else OPENAI_API_KEY headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"} payload = { "model": model, "temperature": 0, "messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}, ], } try: async with httpx.AsyncClient(timeout=20) as client: resp = await client.post(url, json=payload, headers=headers) resp.raise_for_status() content = resp.json().get("choices", [{}])[0].get("message", {}).get("content", "") if not content: return None try: return json.loads(content) except Exception: text = content.strip() if text.startswith("```"): text = text.strip("`") if text.lower().startswith("json"): text = text[4:].strip() try: return json.loads(text) except Exception: return None except Exception: return None def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(description="Atlas maintenance / claim adjudication helper") parser.add_argument("--dry-run", action="store_true", help="Show planned claim updates without writing") parser.add_argument("--page-size", type=int, default=50, help="How many entities to scan per page") parser.add_argument("--start-after", default="", help="Resume scanning after this canonical label") parser.add_argument("--checkpoint-file", default=".atlas-maintenance.checkpoint", help="File storing the last processed label") parser.add_argument("--reset-checkpoint", action="store_true", help="Ignore any saved checkpoint and start from the beginning") parser.add_argument("--clear-checkpoint", action="store_true", help="Delete the checkpoint file and exit") parser.add_argument("--refresh-payloads", action="store_true", help="Explicitly re-fetch online payloads during maintenance") return parser async def _sparql_bindings(query: str) -> list[dict[str, Any]]: svc = AtlasStorageService() result = await svc._call_tool("sparql_query", {"query": query}) if isinstance(result, list) and result: first = result[0] text = getattr(first, "text", None) result = json.loads(text) if text else {} return result.get("results", {}).get("bindings", []) if isinstance(result, dict) else [] def _claim_bindings_from_read(payload: dict[str, Any]) -> list[dict[str, Any]]: result = payload.get("result") if isinstance(result, list) and result: text = getattr(result[0], "text", None) if text: try: result = json.loads(text) except Exception: return [] if isinstance(result, dict): return result.get("results", {}).get("bindings", []) return [] async def discover_subjects(page_size: int, start_after: str = "") -> list[dict[str, Any]]: """Ask Virtuoso for known Atlas entities and return their immutable atlas_id values. This keeps the maintenance job automatic: we operate on the stored graph, not on a hand-entered subject list or mutable label text. """ start_marker = start_after.replace("\\", "\\\\").replace('"', '\\"') if start_after else "" filter_clause = f'FILTER(STR(COALESCE(?atlasId, ?label)) > "{start_marker}")' if start_marker else "" query = """ PREFIX atlas: SELECT DISTINCT ?entity ?atlasId ?label ?desc ?type ?mid ?qid ?rawWd ?rawTrends WHERE {{ GRAPH {{ ?entity a atlas:Entity ; atlas:canonicalLabel ?label . OPTIONAL {{ ?entity atlas:canonicalDescription ?desc . }} OPTIONAL {{ ?entity atlas:rawWikidataJson ?rawWd . }} OPTIONAL {{ ?entity atlas:rawTrendsJson ?rawTrends . }} OPTIONAL {{ ?entity atlas:hasCanonicalType ?type . }} OPTIONAL {{ ?entity atlas:hasIdentifier ?ident . ?ident atlas:identifierType atlas:Mid ; atlas:identifierValue ?mid . }} OPTIONAL {{ ?entity atlas:hasIdentifier ?identQid . ?identQid atlas:identifierType atlas:WikidataQID ; atlas:identifierValue ?qid . }} OPTIONAL {{ ?entity atlas:atlasId ?atlasId . }} {filter_clause} }} }} ORDER BY COALESCE(?atlasId, ?label) LIMIT {page_size} """.format(filter_clause=filter_clause, page_size=page_size) bindings = await _sparql_bindings(query) return [ { "entity_iri": b.get("entity", {}).get("value", ""), "atlas_id": b.get("atlasId", {}).get("value", "") or b.get("label", {}).get("value", ""), "label": b.get("label", {}).get("value", ""), "description": b.get("desc", {}).get("value", ""), "entity_type": (b.get("type", {}).get("value", "").split("#")[-1] if b.get("type", {}).get("value") else "unknown"), "mid": b.get("mid", {}).get("value"), "qid": b.get("qid", {}).get("value"), "raw_wikidata_json": b.get("rawWd", {}).get("value"), "raw_trends_json": b.get("rawTrends", {}).get("value"), } for b in bindings if b.get("label", {}).get("value") ] async def maintain_subject(item: dict[str, Any], dry_run: bool, refresh_payloads: bool) -> dict[str, Any]: subject = item.get("label", "") atlas_id = item.get("atlas_id", "") entity = AtlasEntity( atlas_id=atlas_id, canonical_label=subject, canonical_description=item.get("description") or None, entity_type=item.get("entity_type") or "unknown", raw_payload={}, ) report: dict[str, Any] = { "subject": subject, "atlas_id": atlas_id, "planned": [], "written": False, "wikidata_status": "missing", "comparison": {}, "planned_core_updates": [], "planned_identifier_claims": 0, "planned_identifier_types": [], "planned_type_field_claims": 0, "trends_candidates_count": 0, "ranking_attempted": False, } wikidata = None full = None raw_wd = item.get("raw_wikidata_json") if raw_wd: try: wikidata = json.loads(raw_wd) except Exception: wikidata = None if not isinstance(wikidata, dict): wikidata = { "wikidata_status": "hit" if item.get("qid") else "missing", "qid": item.get("qid"), } # Strong sync rule: if persisted Wikidata payload already carries a better # canonical label/description, plan that core update immediately. wd_label = (wikidata.get("label") or "").strip() if isinstance(wikidata, dict) else "" wd_desc = (wikidata.get("description") or "").strip() if isinstance(wikidata, dict) else "" if wd_label and wd_label != subject: report["planned_core_updates"].append( {"field": "canonical_label", "from": subject, "to": wd_label, "source": "wikidata-payload"} ) if wd_desc and wd_desc != (item.get("description") or ""): report["planned_core_updates"].append( {"field": "canonical_description", "from": item.get("description"), "to": wd_desc, "source": "wikidata-payload"} ) # Fallback: when the graph row does not yet carry a stored Wikidata payload, # do a direct lookup from the current canonical label so maintenance still works. if refresh_payloads and wikidata.get("wikidata_status") == "missing" and subject: lookup = await lookup_wikidata(subject) if isinstance(lookup, dict) and lookup.get("qid"): inferred = _infer_wikidata_canonical_type(lookup.get("entity") or {}) entity_type = (item.get("entity_type") or "unknown").strip() if not _fallback_lookup_looks_compatible(entity_type, lookup, inferred): report["wikidata_status"] = "ambiguous_candidate" report["comparison"] = { "fallback_qid": lookup.get("qid"), "fallback_label": lookup.get("label"), "fallback_type": inferred, "current_type": entity_type, "accepted": False, } candidates = [] raw_trends = item.get("raw_trends_json") if raw_trends: try: trends = json.loads(raw_trends) candidates = trends.get("candidates", []) if isinstance(trends, dict) else [] except Exception: candidates = [] report["trends_candidates_count"] = len(candidates) if candidates: report["ranking_attempted"] = True ranked = await _rank_candidates_with_llm(item, lookup, candidates) if isinstance(ranked, dict) and ranked.get("selected_mid"): provider = _pick_rank_provider() model = CANDIDATE_RANK_MODEL or (GROQ_MODEL if provider == "groq" else OPENAI_MODEL) report["planned"].append( { "action": "llm_ranked_candidate_mid", "selected_mid": ranked.get("selected_mid"), "selected_title": ranked.get("selected_title"), "confidence": ranked.get("confidence"), "reason": ranked.get("reason"), "provider": provider, "model": model, } ) else: report["planned"].append( { "action": "candidate_ranking_no_selection", "reason": "llm_returned_no_selected_mid_or_invalid_json", } ) else: wikidata = { "wikidata_status": "hit", "qid": lookup.get("qid"), "label": lookup.get("label"), "description": lookup.get("description"), "retrieved_at": lookup.get("retrieved_at"), } if isinstance(wikidata, dict) and wikidata.get("wikidata_status") == "hit" and wikidata.get("qid"): report["wikidata_status"] = "hit" # If Wikidata already knows the entity, fetch the full object and mine # any additional identifiers we can safely attach as claims. full = await fetch_wikidata_entity(wikidata.get("qid")) if full and isinstance(full.get("entity"), dict): report["wikidata_status"] = "enriched" entity_block = full["entity"] claims_dict = entity_block.get("claims", {}) if isinstance(entity_block, dict) else {} p31_claims = claims_dict.get("P31", []) if isinstance(claims_dict, dict) else [] p31_qids: list[str] = [] for claim in p31_claims: mainsnak = claim.get("mainsnak", {}) datavalue = mainsnak.get("datavalue", {}) value = datavalue.get("value", {}) wid = value.get("id") if isinstance(value, dict) else None if wid: p31_qids.append(wid) inferred = infer_atlas_type_from_p31(tuple(p31_qids)) if p31_qids else None fallback_candidates = [] if not inferred and p31_qids: fallback_candidates = [WIKIDATA_CLASS_MAP.get(qid) for qid in p31_qids if WIKIDATA_CLASS_MAP.get(qid)] fallback_candidates = [c for c in fallback_candidates if c] if len(fallback_candidates) == 1: inferred = fallback_candidates[0] elif fallback_candidates: inferred = await _choose_type_with_llm(item, full, fallback_candidates) wiki_label = (full.get("label") or "").strip() wiki_description = (full.get("description") or "").strip() current_label = subject.strip() current_mid = item.get("mid") qid = full.get("qid") wikidata_mid = None claims = entity_block.get("claims", {}) if isinstance(entity_block, dict) else {} mid_claims = claims.get("P2671", []) if isinstance(claims, dict) else [] for claim_node in mid_claims: mainsnak = claim_node.get("mainsnak", {}) datavalue = mainsnak.get("datavalue", {}) value = datavalue.get("value") if isinstance(value, str) and value.strip(): wikidata_mid = value.strip() break report["comparison"] = { "canonical_label": {"current": current_label, "wikidata": wiki_label or None, "matches": bool(wiki_label and wiki_label == current_label)}, "mid": {"current": current_mid, "wikidata": wikidata_mid, "matches": bool(current_mid and wikidata_mid and current_mid == wikidata_mid)}, } if _resolution_is_clear(current_label, current_mid, item.get("entity_type") or "unknown", wiki_label, wikidata_mid, inferred): report["wikidata_status"] = "clear" report["planned_core_updates"] = [] if inferred and inferred != (item.get("entity_type") or "unknown"): report["planned_core_updates"].append( { "field": "entity_type", "from": item.get("entity_type"), "to": inferred, "source": "wikidata", } ) if wiki_label and wiki_label != current_label: report["planned_core_updates"].append( { "field": "canonical_label", "from": current_label, "to": wiki_label, "source": "wikidata", } ) if wiki_description and wiki_description != (item.get("description") or ""): report["planned_core_updates"].append( { "field": "canonical_description", "from": item.get("description"), "to": wiki_description, "source": "wikidata", } ) # If Wikidata exposes a MID and it differs from the Trends MID, that # is a strong signal for claim supersession / correction. if wikidata_mid and wikidata_mid != current_mid: report["planned_identifier_claims"] += 1 report["planned_identifier_types"].append("mid") report["planned"].append( { "action": "supersede_mid_and_add_correct_mid", "from": current_mid, "to": wikidata_mid, "source": "wikidata", } ) # If Wikidata does not provide a MID, Trends can be used only as a # last resort. If we already have a Wikidata QID hit, do NOT inject # a Trends MID (prevents Graza-style cross-entity contamination). if not wikidata_mid and not qid: trends_candidates = [] raw_trends = item.get("raw_trends_json") if raw_trends: try: trends = json.loads(raw_trends) trends_candidates = trends.get("candidates", []) if isinstance(trends, dict) else [] except Exception: trends_candidates = [] selected_mid, selected_title = _choose_trends_candidate_mid( trends_candidates, label=current_label, wikidata_label=wiki_label, entity_type=item.get("entity_type") or "unknown", ) if selected_mid and selected_mid != current_mid: report["planned_identifier_claims"] += 1 report["planned_identifier_types"].append("mid") report["planned"].append( { "action": "supersede_mid_and_add_correct_mid", "from": current_mid, "to": selected_mid, "source": "trends-candidate", "title": selected_title, } ) # QID is always a known cross-reference and acts as a stable anchor. existing_qid = None if qid and qid != existing_qid: claim = AtlasClaim( claim_id=_planned_claim_id(atlas_id, "atlas:hasIdentifier", qid), subject=atlas_id, predicate="atlas:hasIdentifier", object=AtlasClaimObject(kind="identifier", id_type="qid", value=qid), layer="raw", provenance=AtlasProvenance( source="wikidata", retrieval_method="atlas-maintenance-wikidata-enrichment", confidence=0.99, retrieved_at=full.get("retrieved_at"), evidence_property="qid", ), ) report["planned"].append({"action": "add_identifier_claim", "claim": asdict(claim)}) report["planned_identifier_claims"] += 1 report["planned_identifier_types"].append("qid") for wikidata_property, (identifier_type, label) in WIKIDATA_IDENTIFIER_PROPERTIES.items(): property_claims = claims.get(wikidata_property, []) if isinstance(claims, dict) else [] for claim_node in property_claims: mainsnak = claim_node.get("mainsnak", {}) datavalue = mainsnak.get("datavalue", {}) value = datavalue.get("value") if not isinstance(value, str) or not value.strip(): continue existing = None if existing == value: continue claim = AtlasClaim( claim_id=_planned_claim_id(atlas_id, "atlas:hasIdentifier", value), subject=atlas_id, predicate="atlas:hasIdentifier", object=AtlasClaimObject(kind="identifier", id_type=identifier_type, value=value), layer="raw", provenance=AtlasProvenance( source="wikidata", retrieval_method="atlas-maintenance-wikidata-enrichment", confidence=0.99, retrieved_at=full.get("retrieved_at"), evidence_property=wikidata_property, ), ) report["planned"].append({"action": "add_identifier_claim", "claim": asdict(claim)}) report["planned_identifier_claims"] += 1 report["planned_identifier_types"].append(identifier_type) # Type-specific enrichment: different entity kinds care about different fields. # We only plan claims for high-confidence public facts that are useful for # disambiguation and consolidation. type_plan = WIKIDATA_TYPE_FIELD_PLAN.get(item.get("entity_type") or "unknown", {}) for wikidata_property, (claim_type, label) in type_plan.items(): property_claims = claims.get(wikidata_property, []) if isinstance(claims, dict) else [] for claim_node in property_claims: mainsnak = claim_node.get("mainsnak", {}) datavalue = mainsnak.get("datavalue", {}) value = datavalue.get("value") if value in (None, "", {}): continue if wikidata_property == "P625" and isinstance(value, dict): lat = value.get("latitude") lon = value.get("longitude") if lat is not None: lat_text = str(lat) lat_claim = AtlasClaim( claim_id=_planned_claim_id(atlas_id, "atlas:hasLatitude", lat_text), subject=atlas_id, predicate="atlas:hasLatitude", object=AtlasClaimObject(kind="literal", value=lat_text), layer="raw", provenance=AtlasProvenance( source="wikidata", retrieval_method="atlas-maintenance-wikidata-enrichment", confidence=0.95, retrieved_at=full.get("retrieved_at"), evidence_property=wikidata_property, ), ) report["planned"].append({"action": "add_type_field_claim", "claim": asdict(lat_claim)}) report["planned_type_field_claims"] += 1 if lon is not None: lon_text = str(lon) lon_claim = AtlasClaim( claim_id=_planned_claim_id(atlas_id, "atlas:hasLongitude", lon_text), subject=atlas_id, predicate="atlas:hasLongitude", object=AtlasClaimObject(kind="literal", value=lon_text), layer="raw", provenance=AtlasProvenance( source="wikidata", retrieval_method="atlas-maintenance-wikidata-enrichment", confidence=0.95, retrieved_at=full.get("retrieved_at"), evidence_property=wikidata_property, ), ) report["planned"].append({"action": "add_type_field_claim", "claim": asdict(lon_claim)}) report["planned_type_field_claims"] += 1 continue # For this first pass we capture these as literal payload claims; # the exact ontology mapping can be tightened later. if isinstance(value, dict): # entity / place objects often carry an id and label value = value.get("id") or value.get("time") or value.get("text") or value.get("amount") if not isinstance(value, str): continue claim = AtlasClaim( claim_id=_planned_claim_id(atlas_id, f"atlas:has{claim_type.replace('-', ' ').title().replace(' ', '')}", value), subject=atlas_id, predicate=f"atlas:has{claim_type.replace('-', ' ').title().replace(' ', '')}", object=( AtlasClaimObject(kind="identifier", id_type="wikidata-entity", value=value) if QID_RE.match(value) else AtlasClaimObject(kind="literal", value=value) ), layer="raw", provenance=AtlasProvenance( source="wikidata", retrieval_method="atlas-maintenance-wikidata-enrichment", confidence=0.95, retrieved_at=full.get("retrieved_at"), evidence_property=wikidata_property, ), ) report["planned"].append({"action": "add_type_field_claim", "claim": asdict(claim)}) report["planned_type_field_claims"] += 1 if dry_run: return report # Non-dry-run: write planned improvements back to graph as an updated entity snapshot. updated_label = next((u["to"] for u in report["planned_core_updates"] if u.get("field") == "canonical_label"), subject) updated_desc = next((u["to"] for u in report["planned_core_updates"] if u.get("field") == "canonical_description"), item.get("description")) updated_type = next((u["to"] for u in report["planned_core_updates"] if u.get("field") == "entity_type"), item.get("entity_type") or "unknown") write_claims: list[AtlasClaim] = [] for planned in report["planned"]: action = planned.get("action") if action in {"add_identifier_claim", "add_type_field_claim"} and isinstance(planned.get("claim"), dict): c = planned["claim"] obj = c.get("object", {}) prov = c.get("provenance") or {} write_claims.append( AtlasClaim( claim_id=c.get("claim_id"), subject=atlas_id, predicate=c.get("predicate"), object=AtlasClaimObject(kind=obj.get("kind"), value=obj.get("value"), id_type=obj.get("id_type")), layer=c.get("layer", "raw"), status=c.get("status", "active"), provenance=AtlasProvenance( source=prov.get("source", "maintenance"), retrieval_method=prov.get("retrieval_method", "atlas-maintenance"), confidence=float(prov.get("confidence", 0.0) or 0.0), retrieved_at=prov.get("retrieved_at"), evidence_property=prov.get("evidence_property"), provider=prov.get("provider"), model=prov.get("model"), ), created_at=c.get("created_at"), ) ) if action == "llm_ranked_candidate_mid" and planned.get("selected_mid"): write_claims.append( AtlasClaim( claim_id=_planned_claim_id(atlas_id, "atlas:hasIdentifier", planned["selected_mid"]), subject=atlas_id, predicate="atlas:hasIdentifier", object=AtlasClaimObject(kind="identifier", id_type="mid", value=planned["selected_mid"]), layer="raw", provenance=AtlasProvenance( source="maintenance-llm-ranking", retrieval_method="atlas-maintenance-candidate-ranking", confidence=float(planned.get("confidence", 0.0) or 0.0), retrieved_at=datetime.now(timezone.utc).isoformat(), provider=planned.get("provider"), model=planned.get("model"), ), ) ) updated_entity = AtlasEntity( atlas_id=atlas_id, canonical_label=updated_label, canonical_description=updated_desc, entity_type=updated_type, claims=write_claims, raw_payload={ "wikidata": { "wikidata_status": report.get("wikidata_status", "missing"), "qid": wikidata.get("qid") if isinstance(wikidata, dict) else None, "label": updated_label, "description": updated_desc, }, "wikidata_entity_json": json.dumps(full.get("entity"), ensure_ascii=False) if isinstance(wikidata, dict) and isinstance(full, dict) and full.get("entity") else None, }, ) svc = AtlasStorageService() # Supersede conflicting active claims before inserting new ones. existing = await svc.read_entity_claims(atlas_id, include_superseded=True) existing_bindings = _claim_bindings_from_read(existing) to_supersede: list[str] = [] planned_types = { (p.get("claim", {}).get("object", {}).get("id_type"), p.get("claim", {}).get("object", {}).get("value")) for p in report["planned"] if p.get("action") == "add_identifier_claim" } for b in existing_bindings: status = b.get("status", {}).get("value") if status != "active": continue claim_uri = b.get("claim", {}).get("value") pred = b.get("pred", {}).get("value") id_type = (b.get("idType", {}).get("value") or "").rsplit("#", 1)[-1].replace("WikidataQID", "qid").replace("Mid", "mid").lower() id_val = b.get("idVal", {}).get("value") if pred == "atlas:hasCanonicalType" and any(u.get("field") == "entity_type" for u in report["planned_core_updates"]): if claim_uri: to_supersede.append(claim_uri) if pred == "atlas:hasIdentifier" and id_type and id_type in UNITARY_IDENTIFIER_TYPES and any(pt[0] and pt[0].lower() == id_type and pt[1] != id_val for pt in planned_types): if claim_uri: to_supersede.append(claim_uri) if to_supersede: await svc.supersede_claims(sorted(set(to_supersede))) await svc.replace_entity_core( atlas_id, canonical_label=updated_label, canonical_description=updated_desc, canonical_type=updated_type, ) write_result = await svc.write_entity(updated_entity) report["write_result"] = { "status": write_result.get("status"), "entity_id": write_result.get("entity_id"), "graph": write_result.get("graph"), "message": write_result.get("message"), } report["written"] = write_result.get("status") == "ok" return report async def main() -> int: parser = build_parser() args = parser.parse_args() checkpoint_path = Path(args.checkpoint_file) if args.clear_checkpoint: if checkpoint_path.exists(): checkpoint_path.unlink() print(json.dumps({"checkpoint_cleared": True, "checkpoint_file": str(checkpoint_path)}, indent=2, ensure_ascii=False)) return 0 start_after = args.start_after.strip() if args.reset_checkpoint: start_after = "" elif not start_after: checkpoint_path = Path(args.checkpoint_file) if checkpoint_path.exists(): start_after = checkpoint_path.read_text(encoding="utf-8").strip() subjects = await discover_subjects(args.page_size, start_after) summaries = [] for item in subjects: summaries.append(await maintain_subject(item, args.dry_run, args.refresh_payloads)) if subjects and not args.dry_run: checkpoint_path.write_text(subjects[-1]["atlas_id"], encoding="utf-8") print(json.dumps({ "dry_run": args.dry_run, "checkpoint_file": str(checkpoint_path), "checkpoint_start_after": start_after, "results": summaries, }, indent=2, ensure_ascii=False)) return 0 if __name__ == "__main__": raise SystemExit(asyncio.run(main()))