#!/usr/bin/env python3 """Atlas maintenance script. Goal: - automatically revisit stored entities - enrich identifier coverage when Wikidata is present - keep the claim supersession model authoritative Operational rule: - no manual subject list is required for normal runs - --dry-run shows what would change, without writing """ from __future__ import annotations import argparse import asyncio import json import sys from pathlib import Path from dataclasses import asdict from datetime import datetime, timezone from typing import Any ROOT = Path(__file__).resolve().parents[1] if str(ROOT) not in sys.path: sys.path.insert(0, str(ROOT)) import app.atlas as atlas_module from app.atlas import resolve_entity from app.ids import claim_hash from app.models import AtlasClaim, AtlasClaimObject, AtlasEntity, AtlasProvenance from app.storage_service import AtlasStorageService from app.wikidata_lookup import lookup_wikidata # High-confidence identifier properties we can mine from the full Wikidata entity. # The goal is to enrich the entity with public identifiers and to reconcile the # Google MID whenever Wikidata already exposes the same identity through another id. # # Note: provenance.retrieval_method describes the evidence/property source # (for example "MusicBrainz artist ID"), not the name of this script. WIKIDATA_IDENTIFIER_PROPERTIES: dict[str, tuple[str, str]] = { "P2671": ("mid", "Google Knowledge Graph ID"), "P434": ("musicbrainz-artist-id", "MusicBrainz artist ID"), "P435": ("musicbrainz-work-id", "MusicBrainz work ID"), "P436": ("musicbrainz-release-group-id", "MusicBrainz release group ID"), "P439": ("musicbrainz-release-id", "MusicBrainz release ID"), "P444": ("musicbrainz-recording-id", "MusicBrainz recording ID"), "P345": ("imdb-id", "IMDb ID"), "P214": ("viaf-id", "VIAF ID"), "P213": ("isni", "ISNI"), "P227": ("gnd-id", "GND ID"), } # Entity-type-specific Wikidata fields worth capturing early. These are the # fields that help the most with disambiguation and downstream consolidation. WIKIDATA_TYPE_FIELD_PLAN: dict[str, dict[str, tuple[str, str]]] = { "Person": { "P569": ("birth-date", "date of birth"), "P19": ("birth-place", "place of birth"), "P27": ("citizenship", "country of citizenship"), }, "Organization": { "P571": ("inception", "inception"), "P159": ("headquarters", "headquarters location"), "P452": ("industry", "industry"), }, "Location": { "P571": ("inception", "inception"), "P17": ("country", "country"), "P131": ("located-in", "located in the administrative territorial entity"), }, } def _planned_claim_id(subject: str, predicate: str, value: str, layer: str = "raw") -> str: created_at = datetime.now(timezone.utc).date().isoformat() return f"clm_{layer}_{claim_hash(subject, predicate, value, layer, created_at=created_at)}" def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(description="Atlas maintenance / claim adjudication helper") parser.add_argument("--dry-run", action="store_true", help="Show planned claim updates without writing") parser.add_argument("--page-size", type=int, default=50, help="How many entities to scan per page") parser.add_argument("--start-after", default="", help="Resume scanning after this canonical label") parser.add_argument("--checkpoint-file", default=".atlas-maintenance.checkpoint", help="File storing the last processed label") parser.add_argument("--reset-checkpoint", action="store_true", help="Ignore any saved checkpoint and start from the beginning") parser.add_argument("--clear-checkpoint", action="store_true", help="Delete the checkpoint file and exit") return parser async def _sparql_bindings(query: str) -> list[dict[str, Any]]: svc = AtlasStorageService() result = await svc._call_tool("sparql_query", {"query": query}) if isinstance(result, list) and result: first = result[0] text = getattr(first, "text", None) result = json.loads(text) if text else {} return result.get("results", {}).get("bindings", []) if isinstance(result, dict) else [] async def discover_subjects(page_size: int, start_after: str = "") -> list[str]: """Ask Virtuoso for known Atlas entities and return their labels. This keeps the maintenance job automatic: we operate on the stored graph, not on a hand-entered subject list. """ filter_clause = f'FILTER(STR(?label) > "{start_after.replace("\\", "\\\\").replace("\"", "\\\"")}")' if start_after else "" query = """ PREFIX atlas: SELECT DISTINCT ?label WHERE {{ GRAPH {{ ?entity a atlas:Entity ; atlas:canonicalLabel ?label . {filter_clause} }} }} ORDER BY ?label LIMIT {page_size} """.format(filter_clause=filter_clause, page_size=page_size) bindings = await _sparql_bindings(query) return [b.get("label", {}).get("value", "") for b in bindings if b.get("label", {}).get("value")] async def maintain_subject(subject: str, dry_run: bool) -> dict[str, Any]: # We resolve first so the maintenance run always starts from the current # canonical entity shape, then we layer on any new evidence. if dry_run: original_write = atlas_module._storage.write_entity async def _noop_write(entity): return {"status": "dry-run", "entity_id": entity.atlas_id} atlas_module._storage.write_entity = _noop_write try: entity = await resolve_entity(subject) finally: atlas_module._storage.write_entity = original_write else: entity = await resolve_entity(subject) report: dict[str, Any] = { "subject": subject, "atlas_id": entity.atlas_id, "planned": [], "written": False, "wikidata_status": "missing", "planned_identifier_claims": 0, "planned_identifier_types": [], "planned_type_field_claims": 0, } wikidata = entity.raw_payload.get("wikidata") if isinstance(entity.raw_payload, dict) else None if isinstance(wikidata, dict) and wikidata.get("wikidata_status") == "hit" and wikidata.get("qid"): report["wikidata_status"] = "hit" # If Wikidata already knows the entity, fetch the full object and mine # any additional identifiers we can safely attach as claims. full = await lookup_wikidata(subject) if full and isinstance(full.get("entity"), dict): report["wikidata_status"] = "enriched" entity_block = full["entity"] claims = entity_block.get("claims", {}) if isinstance(entity_block, dict) else {} # QID is always a known cross-reference and acts as a stable anchor. qid = full.get("qid") existing_qid = entity.active_identifier("qid") if qid and qid != existing_qid: claim = AtlasClaim( claim_id=_planned_claim_id(entity.atlas_id, "atlas:hasIdentifier", qid), subject=entity.atlas_id, predicate="atlas:hasIdentifier", object=AtlasClaimObject(kind="identifier", id_type="qid", value=qid), layer="raw", provenance=AtlasProvenance( source="wikidata", retrieval_method="atlas-maintenance-wikidata-enrichment", confidence=0.99, retrieved_at=full.get("retrieved_at"), evidence_property="qid", ), ) report["planned"].append({"action": "add_identifier_claim", "claim": asdict(claim)}) report["planned_identifier_claims"] += 1 report["planned_identifier_types"].append("qid") for wikidata_property, (identifier_type, label) in WIKIDATA_IDENTIFIER_PROPERTIES.items(): property_claims = claims.get(wikidata_property, []) if isinstance(claims, dict) else [] for claim_node in property_claims: mainsnak = claim_node.get("mainsnak", {}) datavalue = mainsnak.get("datavalue", {}) value = datavalue.get("value") if not isinstance(value, str) or not value.strip(): continue existing = entity.active_identifier(identifier_type) if existing == value: continue claim = AtlasClaim( claim_id=_planned_claim_id(entity.atlas_id, "atlas:hasIdentifier", value), subject=entity.atlas_id, predicate="atlas:hasIdentifier", object=AtlasClaimObject(kind="identifier", id_type=identifier_type, value=value), layer="raw", provenance=AtlasProvenance( source="wikidata", retrieval_method="atlas-maintenance-wikidata-enrichment", confidence=0.99, retrieved_at=full.get("retrieved_at"), evidence_property=wikidata_property, ), ) report["planned"].append({"action": "add_identifier_claim", "claim": asdict(claim)}) report["planned_identifier_claims"] += 1 report["planned_identifier_types"].append(identifier_type) # Type-specific enrichment: different entity kinds care about different fields. # We only plan claims for high-confidence public facts that are useful for # disambiguation and consolidation. type_plan = WIKIDATA_TYPE_FIELD_PLAN.get(entity.entity_type, {}) for wikidata_property, (claim_type, label) in type_plan.items(): property_claims = claims.get(wikidata_property, []) if isinstance(claims, dict) else [] for claim_node in property_claims: mainsnak = claim_node.get("mainsnak", {}) datavalue = mainsnak.get("datavalue", {}) value = datavalue.get("value") if value in (None, "", {}): continue # For this first pass we capture these as literal payload claims; # the exact ontology mapping can be tightened later. if isinstance(value, dict): # entity / place objects often carry an id and label value = value.get("id") or value.get("time") or value.get("text") or value.get("amount") if not isinstance(value, str): continue claim = AtlasClaim( claim_id=_planned_claim_id(entity.atlas_id, f"atlas:has{claim_type.replace('-', ' ').title().replace(' ', '')}", value), subject=entity.atlas_id, predicate=f"atlas:has{claim_type.replace('-', ' ').title().replace(' ', '')}", object=AtlasClaimObject(kind="literal", value=value), layer="raw", provenance=AtlasProvenance( source="wikidata", retrieval_method="atlas-maintenance-wikidata-enrichment", confidence=0.95, retrieved_at=full.get("retrieved_at"), evidence_property=wikidata_property, ), ) report["planned"].append({"action": "add_type_field_claim", "claim": asdict(claim)}) report["planned_type_field_claims"] += 1 if dry_run: return report # The script currently only reports planned updates. # Once the claim update path is wired, this is where write-back will happen. report["written"] = False return report async def main() -> int: parser = build_parser() args = parser.parse_args() checkpoint_path = Path(args.checkpoint_file) if args.clear_checkpoint: if checkpoint_path.exists(): checkpoint_path.unlink() print(json.dumps({"checkpoint_cleared": True, "checkpoint_file": str(checkpoint_path)}, indent=2, ensure_ascii=False)) return 0 start_after = args.start_after.strip() if args.reset_checkpoint: start_after = "" elif not start_after: checkpoint_path = Path(args.checkpoint_file) if checkpoint_path.exists(): start_after = checkpoint_path.read_text(encoding="utf-8").strip() subjects = await discover_subjects(args.page_size, start_after) summaries = [] for subject in subjects: summaries.append(await maintain_subject(subject, args.dry_run)) if subjects and not args.dry_run: checkpoint_path.write_text(subjects[-1], encoding="utf-8") print(json.dumps({ "dry_run": args.dry_run, "checkpoint_file": str(checkpoint_path), "checkpoint_start_after": start_after, "results": summaries, }, indent=2, ensure_ascii=False)) return 0 if __name__ == "__main__": raise SystemExit(asyncio.run(main()))