| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912 |
- #!/usr/bin/env python3
- """Atlas maintenance script.
- Goal:
- - automatically revisit stored entities
- - enrich identifier coverage when Wikidata is present
- - keep the claim supersession model authoritative
- Operational rule:
- - no manual subject list is required for normal runs
- - --dry-run shows what would change, without writing
- """
- from __future__ import annotations
- import argparse
- import asyncio
- import json
- import os
- import re
- import sys
- from pathlib import Path
- from dataclasses import asdict
- from datetime import datetime, timezone
- from typing import Any
- import httpx
- ROOT = Path(__file__).resolve().parents[1]
- if str(ROOT) not in sys.path:
- sys.path.insert(0, str(ROOT))
- from app.ids import claim_hash
- from app.models import AtlasClaim, AtlasClaimObject, AtlasEntity, AtlasProvenance
- from app.storage_service import AtlasStorageService
- from app.type_classifier import WIKIDATA_CLASS_MAP
- from app.wikidata_lookup import fetch_wikidata_entity, lookup_wikidata
- from app.wikidata_type_reasoner import infer_atlas_type_from_p31
- # High-confidence identifier properties we can mine from the full Wikidata entity.
- # The goal is to enrich the entity with public identifiers and to reconcile the
- # Google MID whenever Wikidata already exposes the same identity through another id.
- #
- # Note: provenance.retrieval_method describes the evidence/property source
- # (for example "MusicBrainz artist ID"), not the name of this script.
- WIKIDATA_IDENTIFIER_PROPERTIES: dict[str, tuple[str, str]] = {
- "P2671": ("mid", "Google Knowledge Graph ID"),
- "P434": ("musicbrainz-artist-id", "MusicBrainz artist ID"),
- "P435": ("musicbrainz-work-id", "MusicBrainz work ID"),
- "P436": ("musicbrainz-release-group-id", "MusicBrainz release group ID"),
- "P439": ("musicbrainz-release-id", "MusicBrainz release ID"),
- "P444": ("musicbrainz-recording-id", "MusicBrainz recording ID"),
- "P345": ("imdb-id", "IMDb ID"),
- "P214": ("viaf-id", "VIAF ID"),
- "P213": ("isni", "ISNI"),
- "P227": ("gnd-id", "GND ID"),
- }
- # Entity-type-specific Wikidata fields worth capturing early. Birth date and
- # coordinates are strict because they are type-specific and should not be left
- # as competing active claims.
- WIKIDATA_TYPE_FIELD_PLAN: dict[str, dict[str, tuple[str, str]]] = {
- "Person": {
- "P569": ("birth-date", "date of birth"),
- "P19": ("birth-place", "place of birth"),
- "P27": ("citizenship", "country of citizenship"),
- },
- "Organization": {
- "P571": ("inception", "inception"),
- "P159": ("headquarters", "headquarters location"),
- "P452": ("industry", "industry"),
- },
- "Location": {
- "P571": ("inception", "inception"),
- "P17": ("country", "country"),
- "P131": ("located-in", "located in the administrative territorial entity"),
- "P625": ("coordinates", "coordinate location"),
- },
- }
- UNITARY_IDENTIFIER_TYPES = {"mid", "qid", "wikidata-entity", "birth-date", "latitude", "longitude", "coordinates"}
- QID_RE = re.compile(r"^Q\d+$")
- # Candidate ranking step configuration (.env-controlled)
- CANDIDATE_RANK_PROVIDER = os.getenv("ATLAS_CANDIDATE_RANK_PROVIDER", "auto")
- CANDIDATE_RANK_MODEL = os.getenv("ATLAS_CANDIDATE_RANK_MODEL", "")
- CANDIDATE_RANK_SYSTEM_PROMPT = os.getenv(
- "ATLAS_CANDIDATE_RANK_SYSTEM_PROMPT",
- str(ROOT / "prompts" / "candidate_ranking" / "system.txt"),
- )
- CANDIDATE_RANK_USER_TEMPLATE = os.getenv(
- "ATLAS_CANDIDATE_RANK_USER_TEMPLATE",
- str(ROOT / "prompts" / "candidate_ranking" / "user_template.txt"),
- )
- OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
- OPENAI_MODEL = os.getenv("ATLAS_OPENAI_MODEL", os.getenv("OPENAI_MODEL", "gpt-4o-mini"))
- GROQ_API_KEY = os.getenv("GROQ_API_KEY")
- GROQ_MODEL = os.getenv("ATLAS_GROQ_MODEL", os.getenv("GROQ_MODEL", "meta-llama/llama-4-scout-17b-16e-instruct"))
- def _planned_claim_id(subject: str, predicate: str, value: str, layer: str = "raw") -> str:
- created_at = datetime.now(timezone.utc).date().isoformat()
- return f"clm_{layer}_{claim_hash(subject, predicate, value, layer, created_at=created_at)}"
- def _infer_wikidata_canonical_type(entity_block: dict[str, Any]) -> str | None:
- claims = entity_block.get("claims", {}) if isinstance(entity_block, dict) else {}
- p31 = claims.get("P31", []) if isinstance(claims, dict) else []
- for claim in p31:
- mainsnak = claim.get("mainsnak", {})
- datavalue = mainsnak.get("datavalue", {})
- value = datavalue.get("value", {})
- wid = value.get("id") if isinstance(value, dict) else None
- mapped = WIKIDATA_CLASS_MAP.get(wid)
- if mapped:
- return mapped
- return None
- def _fallback_lookup_looks_compatible(current_type: str, lookup: dict[str, Any], inferred: str | None) -> bool:
- current_type = (current_type or "unknown").strip()
- if current_type == "unknown":
- return True
- if inferred and inferred != current_type:
- return False
- desc = str(lookup.get("description") or "").lower()
- if current_type == "Location" and "family name" in desc:
- return False
- if current_type == "Person" and "city" in desc:
- return False
- return True
- def _resolution_is_clear(current_label: str, current_mid: str | None, current_type: str, wikidata_label: str | None, wikidata_mid: str | None, wikidata_type: str | None) -> bool:
- if not wikidata_label or wikidata_label != current_label:
- return False
- if wikidata_mid and current_mid and wikidata_mid != current_mid:
- return False
- if wikidata_type and current_type and current_type != "unknown" and wikidata_type != current_type:
- return False
- return bool(wikidata_type)
- def _choose_trends_candidate_mid(candidates: list[dict[str, Any]], *, label: str, wikidata_label: str | None, entity_type: str) -> tuple[str | None, str | None]:
- """Pick the best Trends MID by label/type match.
- Exact label matches are preferred, and the candidate type must be compatible
- with the current Atlas entity type when possible.
- """
- if not candidates:
- return None, None
- wanted_labels = {label.strip().lower()}
- if wikidata_label:
- wanted_labels.add(wikidata_label.strip().lower())
- def compatible(candidate_type: str | None) -> bool:
- ct = (candidate_type or "").lower()
- et = (entity_type or "unknown").lower()
- if et == "location":
- return any(word in ct for word in ["city", "settlement", "place", "region", "location", "country", "town", "village"])
- if et == "person":
- return any(word in ct for word in ["person", "artist", "musician", "writer", "politician", "actor"])
- if et == "organization":
- return any(word in ct for word in ["organization", "company", "university", "school", "institution"])
- return True
- for c in candidates:
- mid = c.get("mid")
- title = (c.get("title") or "").strip().lower()
- ctype = c.get("type")
- if mid and title in wanted_labels and compatible(ctype):
- return mid, c.get("title")
- for c in candidates:
- mid = c.get("mid")
- title = (c.get("title") or "").strip().lower()
- if mid and title in wanted_labels:
- return mid, c.get("title")
- return None, None
- def _load_text(path: str) -> str:
- return Path(path).read_text(encoding="utf-8")
- def _pick_rank_provider() -> str | None:
- if CANDIDATE_RANK_PROVIDER == "openai" and OPENAI_API_KEY:
- return "openai"
- if CANDIDATE_RANK_PROVIDER == "groq" and GROQ_API_KEY:
- return "groq"
- if CANDIDATE_RANK_PROVIDER == "auto":
- if GROQ_API_KEY:
- return "groq"
- if OPENAI_API_KEY:
- return "openai"
- return None
- def _pick_type_provider() -> str | None:
- if TYPE_CLASSIFIER_PROVIDER == "openai" and OPENAI_API_KEY:
- return "openai"
- if TYPE_CLASSIFIER_PROVIDER == "groq" and GROQ_API_KEY:
- return "groq"
- if TYPE_CLASSIFIER_PROVIDER == "auto":
- if GROQ_API_KEY:
- return "groq"
- if OPENAI_API_KEY:
- return "openai"
- return None
- async def _choose_type_with_llm(item: dict[str, Any], wikidata_hint: dict[str, Any], candidates: list[str]) -> str | None:
- # Only used when ontology reasoning can't produce a unique Atlas type.
- provider = _pick_type_provider()
- if provider is None:
- return None
- model = TYPE_CLASSIFIER_MODEL or (GROQ_MODEL if provider == "groq" else OPENAI_MODEL)
- url = "https://api.groq.com/openai/v1/chat/completions" if provider == "groq" else "https://api.openai.com/v1/chat/completions"
- api_key = GROQ_API_KEY if provider == "groq" else OPENAI_API_KEY
- prompt = (
- "Classify this entity into one Atlas type only.\n"
- f"Label: {item.get('label', '')}\n"
- f"Description: {item.get('description', '')}\n"
- f"Wikidata label: {wikidata_hint.get('label', '')}\n"
- f"Wikidata description: {wikidata_hint.get('description', '')}\n"
- f"P31-derived Atlas candidates: {', '.join(candidates)}\n"
- "Allowed Atlas types: Person, Organization, Location, CreativeWork, Event, Product, Taxon, Other.\n"
- 'Return strict JSON only: {"type":"...","confidence":0.0-1.0,"reason":"..."}'
- )
- payload = {
- "model": model,
- "temperature": 0,
- "messages": [
- {"role": "system", "content": "You are Atlas type adjudicator."},
- {"role": "user", "content": prompt},
- ],
- }
- try:
- async with httpx.AsyncClient(timeout=20) as client:
- resp = await client.post(url, json=payload, headers={"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"})
- resp.raise_for_status()
- content = resp.json().get("choices", [{}])[0].get("message", {}).get("content", "")
- if not content:
- return None
- try:
- parsed = json.loads(content)
- except Exception:
- parsed = json.loads(content.strip().strip("`").replace("json", "", 1).strip()) if content.strip().startswith("`") else None
- return parsed.get("type") if isinstance(parsed, dict) else None
- except Exception:
- return None
- async def _rank_candidates_with_llm(item: dict[str, Any], wikidata_hint: dict[str, Any], candidates: list[dict[str, Any]]) -> dict[str, Any] | None:
- provider = _pick_rank_provider()
- if provider is None:
- return None
- system_prompt = _load_text(CANDIDATE_RANK_SYSTEM_PROMPT)
- user_template = _load_text(CANDIDATE_RANK_USER_TEMPLATE)
- user_prompt = user_template.format(
- atlas_id=item.get("atlas_id", ""),
- canonical_label=item.get("label", ""),
- canonical_description=item.get("description", ""),
- canonical_type=item.get("entity_type", "unknown"),
- wikidata_qid=wikidata_hint.get("qid", ""),
- wikidata_label=wikidata_hint.get("label", ""),
- wikidata_description=wikidata_hint.get("description", ""),
- candidates_json=json.dumps(candidates, ensure_ascii=False),
- )
- model = CANDIDATE_RANK_MODEL or (GROQ_MODEL if provider == "groq" else OPENAI_MODEL)
- url = "https://api.groq.com/openai/v1/chat/completions" if provider == "groq" else "https://api.openai.com/v1/chat/completions"
- api_key = GROQ_API_KEY if provider == "groq" else OPENAI_API_KEY
- headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}
- payload = {
- "model": model,
- "temperature": 0,
- "messages": [
- {"role": "system", "content": system_prompt},
- {"role": "user", "content": user_prompt},
- ],
- }
- try:
- async with httpx.AsyncClient(timeout=20) as client:
- resp = await client.post(url, json=payload, headers=headers)
- resp.raise_for_status()
- content = resp.json().get("choices", [{}])[0].get("message", {}).get("content", "")
- if not content:
- return None
- try:
- return json.loads(content)
- except Exception:
- text = content.strip()
- if text.startswith("```"):
- text = text.strip("`")
- if text.lower().startswith("json"):
- text = text[4:].strip()
- try:
- return json.loads(text)
- except Exception:
- return None
- except Exception:
- return None
- def build_parser() -> argparse.ArgumentParser:
- parser = argparse.ArgumentParser(description="Atlas maintenance / claim adjudication helper")
- parser.add_argument("--dry-run", action="store_true", help="Show planned claim updates without writing")
- parser.add_argument("--page-size", type=int, default=50, help="How many entities to scan per page")
- parser.add_argument("--start-after", default="", help="Resume scanning after this canonical label")
- parser.add_argument("--checkpoint-file", default=".atlas-maintenance.checkpoint", help="File storing the last processed label")
- parser.add_argument("--reset-checkpoint", action="store_true", help="Ignore any saved checkpoint and start from the beginning")
- parser.add_argument("--clear-checkpoint", action="store_true", help="Delete the checkpoint file and exit")
- parser.add_argument("--refresh-payloads", action="store_true", help="Explicitly re-fetch online payloads during maintenance")
- return parser
- async def _sparql_bindings(query: str) -> list[dict[str, Any]]:
- svc = AtlasStorageService()
- result = await svc._call_tool("sparql_query", {"query": query})
- if isinstance(result, list) and result:
- first = result[0]
- text = getattr(first, "text", None)
- result = json.loads(text) if text else {}
- return result.get("results", {}).get("bindings", []) if isinstance(result, dict) else []
- def _claim_bindings_from_read(payload: dict[str, Any]) -> list[dict[str, Any]]:
- result = payload.get("result")
- if isinstance(result, list) and result:
- text = getattr(result[0], "text", None)
- if text:
- try:
- result = json.loads(text)
- except Exception:
- return []
- if isinstance(result, dict):
- return result.get("results", {}).get("bindings", [])
- return []
- async def discover_subjects(page_size: int, start_after: str = "") -> list[dict[str, Any]]:
- """Ask Virtuoso for known Atlas entities and return their immutable atlas_id values.
- This keeps the maintenance job automatic: we operate on the stored graph,
- not on a hand-entered subject list or mutable label text.
- """
- start_marker = start_after.replace("\\", "\\\\").replace('"', '\\"') if start_after else ""
- filter_clause = f'FILTER(STR(COALESCE(?atlasId, ?label)) > "{start_marker}")' if start_marker else ""
- query = """
- PREFIX atlas: <http://world.eu.org/atlas_ontology#>
- SELECT DISTINCT ?entity ?atlasId ?label ?desc ?type ?mid ?qid ?rawWd ?rawTrends WHERE {{
- GRAPH <http://world.eu.org/atlas_data#> {{
- ?entity a atlas:Entity ;
- atlas:canonicalLabel ?label .
- OPTIONAL {{ ?entity atlas:canonicalDescription ?desc . }}
- OPTIONAL {{ ?entity atlas:rawWikidataJson ?rawWd . }}
- OPTIONAL {{ ?entity atlas:rawTrendsJson ?rawTrends . }}
- OPTIONAL {{ ?entity atlas:hasCanonicalType ?type . }}
- OPTIONAL {{
- ?entity atlas:hasIdentifier ?ident .
- ?ident atlas:identifierType atlas:Mid ;
- atlas:identifierValue ?mid .
- }}
- OPTIONAL {{
- ?entity atlas:hasIdentifier ?identQid .
- ?identQid atlas:identifierType atlas:WikidataQID ;
- atlas:identifierValue ?qid .
- }}
- OPTIONAL {{ ?entity atlas:atlasId ?atlasId . }}
- {filter_clause}
- }}
- }}
- ORDER BY COALESCE(?atlasId, ?label)
- LIMIT {page_size}
- """.format(filter_clause=filter_clause, page_size=page_size)
- bindings = await _sparql_bindings(query)
- return [
- {
- "entity_iri": b.get("entity", {}).get("value", ""),
- "atlas_id": b.get("atlasId", {}).get("value", "") or b.get("label", {}).get("value", ""),
- "label": b.get("label", {}).get("value", ""),
- "description": b.get("desc", {}).get("value", ""),
- "entity_type": (b.get("type", {}).get("value", "").split("#")[-1] if b.get("type", {}).get("value") else "unknown"),
- "mid": b.get("mid", {}).get("value"),
- "qid": b.get("qid", {}).get("value"),
- "raw_wikidata_json": b.get("rawWd", {}).get("value"),
- "raw_trends_json": b.get("rawTrends", {}).get("value"),
- }
- for b in bindings
- if b.get("label", {}).get("value")
- ]
- async def maintain_subject(item: dict[str, Any], dry_run: bool, refresh_payloads: bool) -> dict[str, Any]:
- subject = item.get("label", "")
- atlas_id = item.get("atlas_id", "")
- entity = AtlasEntity(
- atlas_id=atlas_id,
- canonical_label=subject,
- canonical_description=item.get("description") or None,
- entity_type=item.get("entity_type") or "unknown",
- raw_payload={},
- )
- report: dict[str, Any] = {
- "subject": subject,
- "atlas_id": atlas_id,
- "planned": [],
- "written": False,
- "wikidata_status": "missing",
- "comparison": {},
- "planned_core_updates": [],
- "planned_identifier_claims": 0,
- "planned_identifier_types": [],
- "planned_type_field_claims": 0,
- "trends_candidates_count": 0,
- "ranking_attempted": False,
- }
- wikidata = None
- full = None
- raw_wd = item.get("raw_wikidata_json")
- if raw_wd:
- try:
- wikidata = json.loads(raw_wd)
- except Exception:
- wikidata = None
- if not isinstance(wikidata, dict):
- wikidata = {
- "wikidata_status": "hit" if item.get("qid") else "missing",
- "qid": item.get("qid"),
- }
- # Strong sync rule: if persisted Wikidata payload already carries a better
- # canonical label/description, plan that core update immediately.
- wd_label = (wikidata.get("label") or "").strip() if isinstance(wikidata, dict) else ""
- wd_desc = (wikidata.get("description") or "").strip() if isinstance(wikidata, dict) else ""
- if wd_label and wd_label != subject:
- report["planned_core_updates"].append(
- {"field": "canonical_label", "from": subject, "to": wd_label, "source": "wikidata-payload"}
- )
- if wd_desc and wd_desc != (item.get("description") or ""):
- report["planned_core_updates"].append(
- {"field": "canonical_description", "from": item.get("description"), "to": wd_desc, "source": "wikidata-payload"}
- )
- # Fallback: when the graph row does not yet carry a stored Wikidata payload,
- # do a direct lookup from the current canonical label so maintenance still works.
- if refresh_payloads and wikidata.get("wikidata_status") == "missing" and subject:
- lookup = await lookup_wikidata(subject)
- if isinstance(lookup, dict) and lookup.get("qid"):
- inferred = _infer_wikidata_canonical_type(lookup.get("entity") or {})
- entity_type = (item.get("entity_type") or "unknown").strip()
- if not _fallback_lookup_looks_compatible(entity_type, lookup, inferred):
- report["wikidata_status"] = "ambiguous_candidate"
- report["comparison"] = {
- "fallback_qid": lookup.get("qid"),
- "fallback_label": lookup.get("label"),
- "fallback_type": inferred,
- "current_type": entity_type,
- "accepted": False,
- }
- candidates = []
- raw_trends = item.get("raw_trends_json")
- if raw_trends:
- try:
- trends = json.loads(raw_trends)
- candidates = trends.get("candidates", []) if isinstance(trends, dict) else []
- except Exception:
- candidates = []
- report["trends_candidates_count"] = len(candidates)
- if candidates:
- report["ranking_attempted"] = True
- ranked = await _rank_candidates_with_llm(item, lookup, candidates)
- if isinstance(ranked, dict) and ranked.get("selected_mid"):
- provider = _pick_rank_provider()
- model = CANDIDATE_RANK_MODEL or (GROQ_MODEL if provider == "groq" else OPENAI_MODEL)
- report["planned"].append(
- {
- "action": "llm_ranked_candidate_mid",
- "selected_mid": ranked.get("selected_mid"),
- "selected_title": ranked.get("selected_title"),
- "confidence": ranked.get("confidence"),
- "reason": ranked.get("reason"),
- "provider": provider,
- "model": model,
- }
- )
- else:
- report["planned"].append(
- {
- "action": "candidate_ranking_no_selection",
- "reason": "llm_returned_no_selected_mid_or_invalid_json",
- }
- )
- else:
- wikidata = {
- "wikidata_status": "hit",
- "qid": lookup.get("qid"),
- "label": lookup.get("label"),
- "description": lookup.get("description"),
- "retrieved_at": lookup.get("retrieved_at"),
- }
- if isinstance(wikidata, dict) and wikidata.get("wikidata_status") == "hit" and wikidata.get("qid"):
- report["wikidata_status"] = "hit"
- # If Wikidata already knows the entity, fetch the full object and mine
- # any additional identifiers we can safely attach as claims.
- full = await fetch_wikidata_entity(wikidata.get("qid"))
- if full and isinstance(full.get("entity"), dict):
- report["wikidata_status"] = "enriched"
- entity_block = full["entity"]
- claims_dict = entity_block.get("claims", {}) if isinstance(entity_block, dict) else {}
- p31_claims = claims_dict.get("P31", []) if isinstance(claims_dict, dict) else []
- p31_qids: list[str] = []
- for claim in p31_claims:
- mainsnak = claim.get("mainsnak", {})
- datavalue = mainsnak.get("datavalue", {})
- value = datavalue.get("value", {})
- wid = value.get("id") if isinstance(value, dict) else None
- if wid:
- p31_qids.append(wid)
- inferred = infer_atlas_type_from_p31(tuple(p31_qids)) if p31_qids else None
- fallback_candidates = []
- if not inferred and p31_qids:
- fallback_candidates = [WIKIDATA_CLASS_MAP.get(qid) for qid in p31_qids if WIKIDATA_CLASS_MAP.get(qid)]
- fallback_candidates = [c for c in fallback_candidates if c]
- if len(fallback_candidates) == 1:
- inferred = fallback_candidates[0]
- elif fallback_candidates:
- inferred = await _choose_type_with_llm(item, full, fallback_candidates)
- wiki_label = (full.get("label") or "").strip()
- wiki_description = (full.get("description") or "").strip()
- current_label = subject.strip()
- current_mid = item.get("mid")
- qid = full.get("qid")
- wikidata_mid = None
- claims = entity_block.get("claims", {}) if isinstance(entity_block, dict) else {}
- mid_claims = claims.get("P2671", []) if isinstance(claims, dict) else []
- for claim_node in mid_claims:
- mainsnak = claim_node.get("mainsnak", {})
- datavalue = mainsnak.get("datavalue", {})
- value = datavalue.get("value")
- if isinstance(value, str) and value.strip():
- wikidata_mid = value.strip()
- break
- report["comparison"] = {
- "canonical_label": {"current": current_label, "wikidata": wiki_label or None, "matches": bool(wiki_label and wiki_label == current_label)},
- "mid": {"current": current_mid, "wikidata": wikidata_mid, "matches": bool(current_mid and wikidata_mid and current_mid == wikidata_mid)},
- }
- if _resolution_is_clear(current_label, current_mid, item.get("entity_type") or "unknown", wiki_label, wikidata_mid, inferred):
- report["wikidata_status"] = "clear"
- report["planned_core_updates"] = []
- if inferred and inferred != (item.get("entity_type") or "unknown"):
- report["planned_core_updates"].append(
- {
- "field": "entity_type",
- "from": item.get("entity_type"),
- "to": inferred,
- "source": "wikidata",
- }
- )
- if wiki_label and wiki_label != current_label:
- report["planned_core_updates"].append(
- {
- "field": "canonical_label",
- "from": current_label,
- "to": wiki_label,
- "source": "wikidata",
- }
- )
- if wiki_description and wiki_description != (item.get("description") or ""):
- report["planned_core_updates"].append(
- {
- "field": "canonical_description",
- "from": item.get("description"),
- "to": wiki_description,
- "source": "wikidata",
- }
- )
- # If Wikidata exposes a MID and it differs from the Trends MID, that
- # is a strong signal for claim supersession / correction.
- if wikidata_mid and wikidata_mid != current_mid:
- report["planned_identifier_claims"] += 1
- report["planned_identifier_types"].append("mid")
- report["planned"].append(
- {
- "action": "supersede_mid_and_add_correct_mid",
- "from": current_mid,
- "to": wikidata_mid,
- "source": "wikidata",
- }
- )
- # If Wikidata does not provide a MID, Trends can be used only as a
- # last resort. If we already have a Wikidata QID hit, do NOT inject
- # a Trends MID (prevents Graza-style cross-entity contamination).
- if not wikidata_mid and not qid:
- trends_candidates = []
- raw_trends = item.get("raw_trends_json")
- if raw_trends:
- try:
- trends = json.loads(raw_trends)
- trends_candidates = trends.get("candidates", []) if isinstance(trends, dict) else []
- except Exception:
- trends_candidates = []
- selected_mid, selected_title = _choose_trends_candidate_mid(
- trends_candidates,
- label=current_label,
- wikidata_label=wiki_label,
- entity_type=item.get("entity_type") or "unknown",
- )
- if selected_mid and selected_mid != current_mid:
- report["planned_identifier_claims"] += 1
- report["planned_identifier_types"].append("mid")
- report["planned"].append(
- {
- "action": "supersede_mid_and_add_correct_mid",
- "from": current_mid,
- "to": selected_mid,
- "source": "trends-candidate",
- "title": selected_title,
- }
- )
- # QID is always a known cross-reference and acts as a stable anchor.
- existing_qid = None
- if qid and qid != existing_qid:
- claim = AtlasClaim(
- claim_id=_planned_claim_id(atlas_id, "atlas:hasIdentifier", qid),
- subject=atlas_id,
- predicate="atlas:hasIdentifier",
- object=AtlasClaimObject(kind="identifier", id_type="qid", value=qid),
- layer="raw",
- provenance=AtlasProvenance(
- source="wikidata",
- retrieval_method="atlas-maintenance-wikidata-enrichment",
- confidence=0.99,
- retrieved_at=full.get("retrieved_at"),
- evidence_property="qid",
- ),
- )
- report["planned"].append({"action": "add_identifier_claim", "claim": asdict(claim)})
- report["planned_identifier_claims"] += 1
- report["planned_identifier_types"].append("qid")
- for wikidata_property, (identifier_type, label) in WIKIDATA_IDENTIFIER_PROPERTIES.items():
- property_claims = claims.get(wikidata_property, []) if isinstance(claims, dict) else []
- for claim_node in property_claims:
- mainsnak = claim_node.get("mainsnak", {})
- datavalue = mainsnak.get("datavalue", {})
- value = datavalue.get("value")
- if not isinstance(value, str) or not value.strip():
- continue
- existing = None
- if existing == value:
- continue
- claim = AtlasClaim(
- claim_id=_planned_claim_id(atlas_id, "atlas:hasIdentifier", value),
- subject=atlas_id,
- predicate="atlas:hasIdentifier",
- object=AtlasClaimObject(kind="identifier", id_type=identifier_type, value=value),
- layer="raw",
- provenance=AtlasProvenance(
- source="wikidata",
- retrieval_method="atlas-maintenance-wikidata-enrichment",
- confidence=0.99,
- retrieved_at=full.get("retrieved_at"),
- evidence_property=wikidata_property,
- ),
- )
- report["planned"].append({"action": "add_identifier_claim", "claim": asdict(claim)})
- report["planned_identifier_claims"] += 1
- report["planned_identifier_types"].append(identifier_type)
- # Type-specific enrichment: different entity kinds care about different fields.
- # We only plan claims for high-confidence public facts that are useful for
- # disambiguation and consolidation.
- type_plan = WIKIDATA_TYPE_FIELD_PLAN.get(item.get("entity_type") or "unknown", {})
- for wikidata_property, (claim_type, label) in type_plan.items():
- property_claims = claims.get(wikidata_property, []) if isinstance(claims, dict) else []
- for claim_node in property_claims:
- mainsnak = claim_node.get("mainsnak", {})
- datavalue = mainsnak.get("datavalue", {})
- value = datavalue.get("value")
- if value in (None, "", {}):
- continue
- if wikidata_property == "P625" and isinstance(value, dict):
- lat = value.get("latitude")
- lon = value.get("longitude")
- if lat is not None:
- lat_text = str(lat)
- lat_claim = AtlasClaim(
- claim_id=_planned_claim_id(atlas_id, "atlas:hasLatitude", lat_text),
- subject=atlas_id,
- predicate="atlas:hasLatitude",
- object=AtlasClaimObject(kind="literal", value=lat_text),
- layer="raw",
- provenance=AtlasProvenance(
- source="wikidata",
- retrieval_method="atlas-maintenance-wikidata-enrichment",
- confidence=0.95,
- retrieved_at=full.get("retrieved_at"),
- evidence_property=wikidata_property,
- ),
- )
- report["planned"].append({"action": "add_type_field_claim", "claim": asdict(lat_claim)})
- report["planned_type_field_claims"] += 1
- if lon is not None:
- lon_text = str(lon)
- lon_claim = AtlasClaim(
- claim_id=_planned_claim_id(atlas_id, "atlas:hasLongitude", lon_text),
- subject=atlas_id,
- predicate="atlas:hasLongitude",
- object=AtlasClaimObject(kind="literal", value=lon_text),
- layer="raw",
- provenance=AtlasProvenance(
- source="wikidata",
- retrieval_method="atlas-maintenance-wikidata-enrichment",
- confidence=0.95,
- retrieved_at=full.get("retrieved_at"),
- evidence_property=wikidata_property,
- ),
- )
- report["planned"].append({"action": "add_type_field_claim", "claim": asdict(lon_claim)})
- report["planned_type_field_claims"] += 1
- continue
- # For this first pass we capture these as literal payload claims;
- # the exact ontology mapping can be tightened later.
- if isinstance(value, dict):
- # entity / place objects often carry an id and label
- value = value.get("id") or value.get("time") or value.get("text") or value.get("amount")
- if not isinstance(value, str):
- continue
- claim = AtlasClaim(
- claim_id=_planned_claim_id(atlas_id, f"atlas:has{claim_type.replace('-', ' ').title().replace(' ', '')}", value),
- subject=atlas_id,
- predicate=f"atlas:has{claim_type.replace('-', ' ').title().replace(' ', '')}",
- object=(
- AtlasClaimObject(kind="identifier", id_type="wikidata-entity", value=value)
- if QID_RE.match(value)
- else AtlasClaimObject(kind="literal", value=value)
- ),
- layer="raw",
- provenance=AtlasProvenance(
- source="wikidata",
- retrieval_method="atlas-maintenance-wikidata-enrichment",
- confidence=0.95,
- retrieved_at=full.get("retrieved_at"),
- evidence_property=wikidata_property,
- ),
- )
- report["planned"].append({"action": "add_type_field_claim", "claim": asdict(claim)})
- report["planned_type_field_claims"] += 1
- if dry_run:
- return report
- # Non-dry-run: write planned improvements back to graph as an updated entity snapshot.
- updated_label = next((u["to"] for u in report["planned_core_updates"] if u.get("field") == "canonical_label"), subject)
- updated_desc = next((u["to"] for u in report["planned_core_updates"] if u.get("field") == "canonical_description"), item.get("description"))
- updated_type = next((u["to"] for u in report["planned_core_updates"] if u.get("field") == "entity_type"), item.get("entity_type") or "unknown")
- write_claims: list[AtlasClaim] = []
- for planned in report["planned"]:
- action = planned.get("action")
- if action in {"add_identifier_claim", "add_type_field_claim"} and isinstance(planned.get("claim"), dict):
- c = planned["claim"]
- obj = c.get("object", {})
- prov = c.get("provenance") or {}
- write_claims.append(
- AtlasClaim(
- claim_id=c.get("claim_id"),
- subject=atlas_id,
- predicate=c.get("predicate"),
- object=AtlasClaimObject(kind=obj.get("kind"), value=obj.get("value"), id_type=obj.get("id_type")),
- layer=c.get("layer", "raw"),
- status=c.get("status", "active"),
- provenance=AtlasProvenance(
- source=prov.get("source", "maintenance"),
- retrieval_method=prov.get("retrieval_method", "atlas-maintenance"),
- confidence=float(prov.get("confidence", 0.0) or 0.0),
- retrieved_at=prov.get("retrieved_at"),
- evidence_property=prov.get("evidence_property"),
- provider=prov.get("provider"),
- model=prov.get("model"),
- ),
- created_at=c.get("created_at"),
- )
- )
- if action == "llm_ranked_candidate_mid" and planned.get("selected_mid"):
- write_claims.append(
- AtlasClaim(
- claim_id=_planned_claim_id(atlas_id, "atlas:hasIdentifier", planned["selected_mid"]),
- subject=atlas_id,
- predicate="atlas:hasIdentifier",
- object=AtlasClaimObject(kind="identifier", id_type="mid", value=planned["selected_mid"]),
- layer="raw",
- provenance=AtlasProvenance(
- source="maintenance-llm-ranking",
- retrieval_method="atlas-maintenance-candidate-ranking",
- confidence=float(planned.get("confidence", 0.0) or 0.0),
- retrieved_at=datetime.now(timezone.utc).isoformat(),
- provider=planned.get("provider"),
- model=planned.get("model"),
- ),
- )
- )
- updated_entity = AtlasEntity(
- atlas_id=atlas_id,
- canonical_label=updated_label,
- canonical_description=updated_desc,
- entity_type=updated_type,
- claims=write_claims,
- raw_payload={
- "wikidata": {
- "wikidata_status": report.get("wikidata_status", "missing"),
- "qid": wikidata.get("qid") if isinstance(wikidata, dict) else None,
- "label": updated_label,
- "description": updated_desc,
- },
- "wikidata_entity_json": json.dumps(full.get("entity"), ensure_ascii=False) if isinstance(wikidata, dict) and isinstance(full, dict) and full.get("entity") else None,
- },
- )
- svc = AtlasStorageService()
- # Supersede conflicting active claims before inserting new ones.
- existing = await svc.read_entity_claims(atlas_id, include_superseded=True)
- existing_bindings = _claim_bindings_from_read(existing)
- to_supersede: list[str] = []
- planned_types = {
- (p.get("claim", {}).get("object", {}).get("id_type"), p.get("claim", {}).get("object", {}).get("value"))
- for p in report["planned"]
- if p.get("action") == "add_identifier_claim"
- }
- for b in existing_bindings:
- status = b.get("status", {}).get("value")
- if status != "active":
- continue
- claim_uri = b.get("claim", {}).get("value")
- pred = b.get("pred", {}).get("value")
- id_type = (b.get("idType", {}).get("value") or "").rsplit("#", 1)[-1].replace("WikidataQID", "qid").replace("Mid", "mid").lower()
- id_val = b.get("idVal", {}).get("value")
- if pred == "atlas:hasCanonicalType" and any(u.get("field") == "entity_type" for u in report["planned_core_updates"]):
- if claim_uri:
- to_supersede.append(claim_uri)
- if pred == "atlas:hasIdentifier" and id_type and id_type in UNITARY_IDENTIFIER_TYPES and any(pt[0] and pt[0].lower() == id_type and pt[1] != id_val for pt in planned_types):
- if claim_uri:
- to_supersede.append(claim_uri)
- if to_supersede:
- await svc.supersede_claims(sorted(set(to_supersede)))
- await svc.replace_entity_core(
- atlas_id,
- canonical_label=updated_label,
- canonical_description=updated_desc,
- canonical_type=updated_type,
- )
- write_result = await svc.write_entity(updated_entity)
- report["write_result"] = {
- "status": write_result.get("status"),
- "entity_id": write_result.get("entity_id"),
- "graph": write_result.get("graph"),
- "message": write_result.get("message"),
- }
- report["written"] = write_result.get("status") == "ok"
- return report
- async def main() -> int:
- parser = build_parser()
- args = parser.parse_args()
- checkpoint_path = Path(args.checkpoint_file)
- if args.clear_checkpoint:
- if checkpoint_path.exists():
- checkpoint_path.unlink()
- print(json.dumps({"checkpoint_cleared": True, "checkpoint_file": str(checkpoint_path)}, indent=2, ensure_ascii=False))
- return 0
- start_after = args.start_after.strip()
- if args.reset_checkpoint:
- start_after = ""
- elif not start_after:
- checkpoint_path = Path(args.checkpoint_file)
- if checkpoint_path.exists():
- start_after = checkpoint_path.read_text(encoding="utf-8").strip()
- subjects = await discover_subjects(args.page_size, start_after)
- summaries = []
- for item in subjects:
- summaries.append(await maintain_subject(item, args.dry_run, args.refresh_payloads))
- if subjects and not args.dry_run:
- checkpoint_path.write_text(subjects[-1]["atlas_id"], encoding="utf-8")
- print(json.dumps({
- "dry_run": args.dry_run,
- "checkpoint_file": str(checkpoint_path),
- "checkpoint_start_after": start_after,
- "results": summaries,
- }, indent=2, ensure_ascii=False))
- return 0
- if __name__ == "__main__":
- raise SystemExit(asyncio.run(main()))
|