| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296 |
- #!/usr/bin/env python3
- """Atlas maintenance script.
- Goal:
- - automatically revisit stored entities
- - enrich identifier coverage when Wikidata is present
- - keep the claim supersession model authoritative
- Operational rule:
- - no manual subject list is required for normal runs
- - --dry-run shows what would change, without writing
- """
- from __future__ import annotations
- import argparse
- import asyncio
- import json
- import sys
- from pathlib import Path
- from dataclasses import asdict
- from datetime import datetime, timezone
- from typing import Any
- ROOT = Path(__file__).resolve().parents[1]
- if str(ROOT) not in sys.path:
- sys.path.insert(0, str(ROOT))
- import app.atlas as atlas_module
- from app.atlas import resolve_entity
- from app.ids import claim_hash
- from app.models import AtlasClaim, AtlasClaimObject, AtlasEntity, AtlasProvenance
- from app.storage_service import AtlasStorageService
- from app.wikidata_lookup import lookup_wikidata
- # High-confidence identifier properties we can mine from the full Wikidata entity.
- # The goal is to enrich the entity with public identifiers and to reconcile the
- # Google MID whenever Wikidata already exposes the same identity through another id.
- #
- # Note: provenance.retrieval_method describes the evidence/property source
- # (for example "MusicBrainz artist ID"), not the name of this script.
- WIKIDATA_IDENTIFIER_PROPERTIES: dict[str, tuple[str, str]] = {
- "P2671": ("mid", "Google Knowledge Graph ID"),
- "P434": ("musicbrainz-artist-id", "MusicBrainz artist ID"),
- "P435": ("musicbrainz-work-id", "MusicBrainz work ID"),
- "P436": ("musicbrainz-release-group-id", "MusicBrainz release group ID"),
- "P439": ("musicbrainz-release-id", "MusicBrainz release ID"),
- "P444": ("musicbrainz-recording-id", "MusicBrainz recording ID"),
- "P345": ("imdb-id", "IMDb ID"),
- "P214": ("viaf-id", "VIAF ID"),
- "P213": ("isni", "ISNI"),
- "P227": ("gnd-id", "GND ID"),
- }
- # Entity-type-specific Wikidata fields worth capturing early. These are the
- # fields that help the most with disambiguation and downstream consolidation.
- WIKIDATA_TYPE_FIELD_PLAN: dict[str, dict[str, tuple[str, str]]] = {
- "Person": {
- "P569": ("birth-date", "date of birth"),
- "P19": ("birth-place", "place of birth"),
- "P27": ("citizenship", "country of citizenship"),
- },
- "Organization": {
- "P571": ("inception", "inception"),
- "P159": ("headquarters", "headquarters location"),
- "P452": ("industry", "industry"),
- },
- "Location": {
- "P571": ("inception", "inception"),
- "P17": ("country", "country"),
- "P131": ("located-in", "located in the administrative territorial entity"),
- },
- }
- def _planned_claim_id(subject: str, predicate: str, value: str, layer: str = "raw") -> str:
- created_at = datetime.now(timezone.utc).date().isoformat()
- return f"clm_{layer}_{claim_hash(subject, predicate, value, layer, created_at=created_at)}"
- def build_parser() -> argparse.ArgumentParser:
- parser = argparse.ArgumentParser(description="Atlas maintenance / claim adjudication helper")
- parser.add_argument("--dry-run", action="store_true", help="Show planned claim updates without writing")
- parser.add_argument("--page-size", type=int, default=50, help="How many entities to scan per page")
- parser.add_argument("--start-after", default="", help="Resume scanning after this canonical label")
- parser.add_argument("--checkpoint-file", default=".atlas-maintenance.checkpoint", help="File storing the last processed label")
- parser.add_argument("--reset-checkpoint", action="store_true", help="Ignore any saved checkpoint and start from the beginning")
- parser.add_argument("--clear-checkpoint", action="store_true", help="Delete the checkpoint file and exit")
- return parser
- async def _sparql_bindings(query: str) -> list[dict[str, Any]]:
- svc = AtlasStorageService()
- result = await svc._call_tool("sparql_query", {"query": query})
- if isinstance(result, list) and result:
- first = result[0]
- text = getattr(first, "text", None)
- result = json.loads(text) if text else {}
- return result.get("results", {}).get("bindings", []) if isinstance(result, dict) else []
- async def discover_subjects(page_size: int, start_after: str = "") -> list[str]:
- """Ask Virtuoso for known Atlas entities and return their labels.
- This keeps the maintenance job automatic: we operate on the stored graph,
- not on a hand-entered subject list.
- """
- filter_clause = f'FILTER(STR(?label) > "{start_after.replace("\\", "\\\\").replace("\"", "\\\"")}")' if start_after else ""
- query = """
- PREFIX atlas: <http://world.eu.org/atlas_ontology#>
- SELECT DISTINCT ?label WHERE {{
- GRAPH <http://world.eu.org/atlas_data#> {{
- ?entity a atlas:Entity ;
- atlas:canonicalLabel ?label .
- {filter_clause}
- }}
- }}
- ORDER BY ?label
- LIMIT {page_size}
- """.format(filter_clause=filter_clause, page_size=page_size)
- bindings = await _sparql_bindings(query)
- return [b.get("label", {}).get("value", "") for b in bindings if b.get("label", {}).get("value")]
- async def maintain_subject(subject: str, dry_run: bool) -> dict[str, Any]:
- # We resolve first so the maintenance run always starts from the current
- # canonical entity shape, then we layer on any new evidence.
- if dry_run:
- original_write = atlas_module._storage.write_entity
- async def _noop_write(entity):
- return {"status": "dry-run", "entity_id": entity.atlas_id}
- atlas_module._storage.write_entity = _noop_write
- try:
- entity = await resolve_entity(subject)
- finally:
- atlas_module._storage.write_entity = original_write
- else:
- entity = await resolve_entity(subject)
- report: dict[str, Any] = {
- "subject": subject,
- "atlas_id": entity.atlas_id,
- "planned": [],
- "written": False,
- "wikidata_status": "missing",
- "planned_identifier_claims": 0,
- "planned_identifier_types": [],
- "planned_type_field_claims": 0,
- }
- wikidata = entity.raw_payload.get("wikidata") if isinstance(entity.raw_payload, dict) else None
- if isinstance(wikidata, dict) and wikidata.get("wikidata_status") == "hit" and wikidata.get("qid"):
- report["wikidata_status"] = "hit"
- # If Wikidata already knows the entity, fetch the full object and mine
- # any additional identifiers we can safely attach as claims.
- full = await lookup_wikidata(subject)
- if full and isinstance(full.get("entity"), dict):
- report["wikidata_status"] = "enriched"
- entity_block = full["entity"]
- claims = entity_block.get("claims", {}) if isinstance(entity_block, dict) else {}
- # QID is always a known cross-reference and acts as a stable anchor.
- qid = full.get("qid")
- existing_qid = entity.active_identifier("qid")
- if qid and qid != existing_qid:
- claim = AtlasClaim(
- claim_id=_planned_claim_id(entity.atlas_id, "atlas:hasIdentifier", qid),
- subject=entity.atlas_id,
- predicate="atlas:hasIdentifier",
- object=AtlasClaimObject(kind="identifier", id_type="qid", value=qid),
- layer="raw",
- provenance=AtlasProvenance(
- source="wikidata",
- retrieval_method="atlas-maintenance-wikidata-enrichment",
- confidence=0.99,
- retrieved_at=full.get("retrieved_at"),
- evidence_property="qid",
- ),
- )
- report["planned"].append({"action": "add_identifier_claim", "claim": asdict(claim)})
- report["planned_identifier_claims"] += 1
- report["planned_identifier_types"].append("qid")
- for wikidata_property, (identifier_type, label) in WIKIDATA_IDENTIFIER_PROPERTIES.items():
- property_claims = claims.get(wikidata_property, []) if isinstance(claims, dict) else []
- for claim_node in property_claims:
- mainsnak = claim_node.get("mainsnak", {})
- datavalue = mainsnak.get("datavalue", {})
- value = datavalue.get("value")
- if not isinstance(value, str) or not value.strip():
- continue
- existing = entity.active_identifier(identifier_type)
- if existing == value:
- continue
- claim = AtlasClaim(
- claim_id=_planned_claim_id(entity.atlas_id, "atlas:hasIdentifier", value),
- subject=entity.atlas_id,
- predicate="atlas:hasIdentifier",
- object=AtlasClaimObject(kind="identifier", id_type=identifier_type, value=value),
- layer="raw",
- provenance=AtlasProvenance(
- source="wikidata",
- retrieval_method="atlas-maintenance-wikidata-enrichment",
- confidence=0.99,
- retrieved_at=full.get("retrieved_at"),
- evidence_property=wikidata_property,
- ),
- )
- report["planned"].append({"action": "add_identifier_claim", "claim": asdict(claim)})
- report["planned_identifier_claims"] += 1
- report["planned_identifier_types"].append(identifier_type)
- # Type-specific enrichment: different entity kinds care about different fields.
- # We only plan claims for high-confidence public facts that are useful for
- # disambiguation and consolidation.
- type_plan = WIKIDATA_TYPE_FIELD_PLAN.get(entity.entity_type, {})
- for wikidata_property, (claim_type, label) in type_plan.items():
- property_claims = claims.get(wikidata_property, []) if isinstance(claims, dict) else []
- for claim_node in property_claims:
- mainsnak = claim_node.get("mainsnak", {})
- datavalue = mainsnak.get("datavalue", {})
- value = datavalue.get("value")
- if value in (None, "", {}):
- continue
- # For this first pass we capture these as literal payload claims;
- # the exact ontology mapping can be tightened later.
- if isinstance(value, dict):
- # entity / place objects often carry an id and label
- value = value.get("id") or value.get("time") or value.get("text") or value.get("amount")
- if not isinstance(value, str):
- continue
- claim = AtlasClaim(
- claim_id=_planned_claim_id(entity.atlas_id, f"atlas:has{claim_type.replace('-', ' ').title().replace(' ', '')}", value),
- subject=entity.atlas_id,
- predicate=f"atlas:has{claim_type.replace('-', ' ').title().replace(' ', '')}",
- object=AtlasClaimObject(kind="literal", value=value),
- layer="raw",
- provenance=AtlasProvenance(
- source="wikidata",
- retrieval_method="atlas-maintenance-wikidata-enrichment",
- confidence=0.95,
- retrieved_at=full.get("retrieved_at"),
- evidence_property=wikidata_property,
- ),
- )
- report["planned"].append({"action": "add_type_field_claim", "claim": asdict(claim)})
- report["planned_type_field_claims"] += 1
- if dry_run:
- return report
- # The script currently only reports planned updates.
- # Once the claim update path is wired, this is where write-back will happen.
- report["written"] = False
- return report
- async def main() -> int:
- parser = build_parser()
- args = parser.parse_args()
- checkpoint_path = Path(args.checkpoint_file)
- if args.clear_checkpoint:
- if checkpoint_path.exists():
- checkpoint_path.unlink()
- print(json.dumps({"checkpoint_cleared": True, "checkpoint_file": str(checkpoint_path)}, indent=2, ensure_ascii=False))
- return 0
- start_after = args.start_after.strip()
- if args.reset_checkpoint:
- start_after = ""
- elif not start_after:
- checkpoint_path = Path(args.checkpoint_file)
- if checkpoint_path.exists():
- start_after = checkpoint_path.read_text(encoding="utf-8").strip()
- subjects = await discover_subjects(args.page_size, start_after)
- summaries = []
- for subject in subjects:
- summaries.append(await maintain_subject(subject, args.dry_run))
- if subjects and not args.dry_run:
- checkpoint_path.write_text(subjects[-1], encoding="utf-8")
- print(json.dumps({
- "dry_run": args.dry_run,
- "checkpoint_file": str(checkpoint_path),
- "checkpoint_start_after": start_after,
- "results": summaries,
- }, indent=2, ensure_ascii=False))
- return 0
- if __name__ == "__main__":
- raise SystemExit(asyncio.run(main()))
|