Pārlūkot izejas kodu

refine atlas maintenance and stable claim identifiers

Lukas Goldschmidt 1 mēnesi atpakaļ
vecāks
revīzija
60894b8f23

+ 2 - 0
PROJECT.md

@@ -20,6 +20,7 @@
 3. **Documentation lineage**
    * `README.md` (human-facing) summarizes the architecture, today’s goals, folder layout, and the news/virtuoso collaboration strategy.
    * `PROJECT.md` (this file) tracks agent priorities and reminders about the manifest’s hard rules.
+   * Provenance & model configuration: any LLM used for *classification* must be configurable via `.env`, and provenance emitted by Atlas must explicitly identify **both** the `model` and the `provider` (e.g., `provider=openai`, `model=gpt-5.4-nano`) so downstream consumers can audit reproducibility.
 4. **Dependencies & housekeeping**
    * `requirements.txt` lists FastAPI, uvicorn, fastmcp, rdflib, httpx, and any enrichment helpers we’ll need in the canonical layer.
    * `gitignore` covers Python artifacts, FastAPI logs, and typical OS noise.
@@ -40,3 +41,4 @@
 * Document `expand(entity, constraints, depth)` expectations, starting with rdflib-based stubs and SPARQL placeholders for future enrichment work.
 * Keep the implementation precise: no enrichment in news-mcp, no graph execution in Atlas, and no semantic interpretation in Virtuoso.
 * Add maintenance routines (script/cron) to re-check entities with missing source data (especially missing Wikidata), and to supersede stale claims without bloating the schema.
+* Next major Atlas refinement: transition `atlas_id` itself to a uniform opaque identifier format, and switch RDF node IRIs to opaque, collision-safe hash IRIs (Entity_<hash>, Claim_<hash>, Identifier_<hash>, etc.), keeping semantics exclusively in triples and not encoded in identifier strings.

+ 31 - 3
RESPONSE_SCHEMA.md

@@ -1,4 +1,4 @@
-# Atlas Response Schema v1
+# Atlas Response Schema v2
 
 This file defines the canonical response contract for `resolve_entity`.
 
@@ -14,7 +14,7 @@ This file defines the canonical response contract for `resolve_entity`.
 ```json
 {
   "entity": {
-    "entity_id": "atlas:mid:/m/0cqt90",
+    "entity_id": "atlas:1c7ce7c18db59332",
     "canonical_label": "Donald Trump",
     "canonical_description": "45th and 47th U.S. President",
     "canonical_type": "atlas:Person",
@@ -24,6 +24,22 @@ This file defines the canonical response contract for `resolve_entity`.
       {"type": "atlas:WikidataQID", "value": "Q22686"}
     ]
   },
+  "active_claims": [
+    {
+      "claim_id": "clm_raw_ident_mid_/m/0cqt90",
+      "layer": "raw",
+      "status": "active",
+      "subject": "atlas:1c7ce7c18db59332",
+      "predicate": "atlas:hasIdentifier",
+      "object": {"kind": "identifier", "id_type": "atlas:Mid", "value": "/m/0cqt90"},
+      "provenance": {
+        "source": "google-trends",
+        "method": "trends-resolution",
+        "confidence": 0.9,
+        "retrieved_at": "2026-04-03T18:00:00Z"
+      }
+    }
+  ],
   "summary": {
     "raw_claim_count": 5,
     "derived_claim_count": 1,
@@ -112,6 +128,7 @@ All three layers must align around the same `entity_id`.
 - `entity.canonical_type`
 - `entity.needs_curation`
 - `entity.identifiers[]`
+- `active_claims[]`
 
 ### Required in debug mode
 - `debug.raw_claims[]`
@@ -119,9 +136,20 @@ All three layers must align around the same `entity_id`.
 - `debug.source_payloads`
 - `debug.turtle`
 
+## 5) Maintenance model
+
+Atlas maintenance jobs may fetch the full Wikidata entity payload when a Wikidata hit exists.
+That payload can generate additional identifier claims; the adjudicator may activate or supersede
+claims based on identifier alignment (for example, MID vs Wikidata QID vs other external IDs).
+
+Recommended maintenance interface:
+- `scripts/maintain_entities.py SUBJECT...`
+- `--dry-run` prints planned claim changes without writing
+- `--include-wikidata-entity` fetches the full Wikidata entity object for richer identifier claims
+
 ---
 
-## 5) Backward compatibility
+## 6) Backward compatibility
 
 Current implementation fields (`atlas_id`, `entity_type`, etc.) may remain temporarily,
 but target output should migrate to this schema to avoid ambiguity and drift.

+ 17 - 17
app/atlas.py

@@ -2,8 +2,11 @@
 
 from __future__ import annotations
 
+from datetime import datetime, timezone
+
 from app.cache import EntityCache
 from app.entity_normalize import normalize_entity
+from app.ids import claim_hash, entity_hash
 from app.models import (
     AtlasAlias,
     AtlasClaim,
@@ -23,6 +26,10 @@ _virtuoso_store = VirtuosoEntityStore(max_cache_entries=256)
 _storage = AtlasStorageService()
 
 
+def _now_date() -> str:
+    return datetime.now(timezone.utc).date().isoformat()
+
+
 async def resolve_entity(subject: str, context: str | None = None) -> AtlasEntity:
     normalized = normalize_entity(subject)
     token = normalized.strip().lower()
@@ -62,8 +69,6 @@ async def resolve_entity(subject: str, context: str | None = None) -> AtlasEntit
 
 
 def _entity_from_resolution(subject: str, resolution: dict, classification: TypeClassification, wikidata: dict | None = None) -> AtlasEntity:
-    import hashlib
-
     canonical_label = (
         resolution.get("canonical_label")
         or resolution.get("normalized")
@@ -77,15 +82,7 @@ def _entity_from_resolution(subject: str, resolution: dict, classification: Type
     )
 
     # atlas_id is opaque identity: hash-part only, never semantic content.
-    stable_key = "|".join(
-        [
-            (resolution.get("mid") or "").strip(),
-            (wikidata or {}).get("qid") or "",
-            canonical_label.strip().lower(),
-        ]
-    )
-    digest = hashlib.sha1(stable_key.encode("utf-8")).hexdigest()[:16]
-    atlas_id = f"atlas:{digest}"
+    atlas_id = f"atlas:{entity_hash((resolution.get('mid') or '').strip(), (wikidata or {}).get('qid') or '', canonical_label.strip().lower())}"
 
     trends_prov = AtlasProvenance(
         source=resolution.get("source") or "resolver",
@@ -109,41 +106,44 @@ def _entity_from_resolution(subject: str, resolution: dict, classification: Type
     if mid:
         claims.append(
             AtlasClaim(
-                claim_id=f"clm_raw_ident_mid_{mid}",
+                claim_id=f"clm_raw_ident_mid_{claim_hash(atlas_id, 'atlas:hasIdentifier', mid, 'raw')}",
                 subject=atlas_id,
                 predicate="atlas:hasIdentifier",
                 object=AtlasClaimObject(kind="identifier", id_type="mid", value=mid),
                 layer="raw",
                 provenance=trends_prov,
+                created_at=_now_date(),
             )
         )
     if wikidata and wikidata.get("qid"):
         claims.append(
             AtlasClaim(
-                claim_id=f"clm_raw_ident_qid_{wikidata['qid']}",
+                claim_id=f"clm_raw_ident_qid_{claim_hash(atlas_id, 'atlas:hasIdentifier', wikidata['qid'], 'raw')}",
                 subject=atlas_id,
                 predicate="atlas:hasIdentifier",
                 object=AtlasClaimObject(kind="identifier", id_type="qid", value=wikidata["qid"]),
                 layer="raw",
                 provenance=wikidata_prov,
+                created_at=_now_date(),
             )
         )
 
     claims.append(
-        AtlasClaim(
-            claim_id="clm_drv_canonical_type",
+            AtlasClaim(
+            claim_id=f"clm_drv_canonical_type_{claim_hash(atlas_id, 'atlas:hasCanonicalType', canonical_type, 'derived')}",
             subject=atlas_id,
             predicate="atlas:hasCanonicalType",
             object=AtlasClaimObject(kind="type", value=f"atlas:{canonical_type}"),
             layer="derived",
             provenance=classification.provenance,
+            created_at=_now_date(),
         )
     )
 
     payload = dict(resolution)
     if wikidata:
         payload["wikidata"] = {
-            "status": "ok",
+            "wikidata_status": "hit",
             "source": "wikidata",
             "qid": wikidata.get("qid"),
             "label": wikidata.get("label"),
@@ -151,7 +151,7 @@ def _entity_from_resolution(subject: str, resolution: dict, classification: Type
             "retrieved_at": wikidata.get("retrieved_at"),
         }
     else:
-        payload["wikidata"] = {"status": "missing", "source": "wikidata", "retrieved_at": None}
+        payload["wikidata"] = {"wikidata_status": "missing", "source": "wikidata", "retrieved_at": None}
 
     return AtlasEntity(
         atlas_id=atlas_id,

+ 1 - 0
app/claims.py

@@ -15,6 +15,7 @@ def _prov_to_dict(p: AtlasProvenance | None) -> dict[str, Any] | None:
         "method": p.retrieval_method,
         "confidence": p.confidence,
         "retrieved_at": p.retrieved_at,
+        "evidence_property": p.evidence_property,
     }
 
 

+ 34 - 0
app/ids.py

@@ -0,0 +1,34 @@
+"""Stable ID helpers for Atlas.
+
+These functions keep entity and claim identifiers deterministic across the app.
+The same stable hash should be used for:
+- atlas_id / entity IRI fragments
+- claim IDs
+- claim IRIs in Turtle exports
+
+Semantics live in triples; the IDs themselves stay opaque.
+"""
+
+from __future__ import annotations
+
+import hashlib
+
+
+def stable_hash(*parts: str, length: int = 16) -> str:
+    material = "|".join((part or "").strip() for part in parts)
+    return hashlib.sha1(material.encode("utf-8")).hexdigest()[:length]
+
+
+def claim_hash(
+    subject: str,
+    predicate: str,
+    object_value: str,
+    layer: str,
+    status: str = "active",
+    created_at: str | None = None,
+) -> str:
+    return stable_hash(subject, predicate, object_value, layer, status, created_at or "")
+
+
+def entity_hash(*parts: str, length: int = 16) -> str:
+    return stable_hash(*parts, length=length)

+ 7 - 3
app/mcp_server.py

@@ -22,6 +22,7 @@ mcp = FastMCP(
 @mcp.tool(name="resolve_entity", description="Resolve a subject string to a canonical Atlas entity.")
 async def resolve_entity_tool(subject: str, context: str | None = None, debug: bool = False, debug_path: str | None = None):
     entity = await resolve_entity(subject, context)
+    raw_claims, derived_claims = build_claim_sets(entity)
     result = {
         "atlas_id": entity.atlas_id,
         "canonical_label": entity.canonical_label,
@@ -29,18 +30,22 @@ async def resolve_entity_tool(subject: str, context: str | None = None, debug: b
         "entity_type": entity.entity_type,
         "needs_curation": entity.needs_curation,
         "aliases": [alias.label for alias in entity.aliases],
+        "active_claims": raw_claims + derived_claims,
         "g_trends_payload": {k: v for k, v in entity.raw_payload.items() if k != "wikidata"},
         "wikidata_payload": (
             entity.raw_payload.get("wikidata")
             if entity.raw_payload.get("wikidata") is not None
-            else {"status": "missing"}
+            else {"wikidata_status": "missing"}
         ),
     }
     if debug:
-        raw_claims, derived_claims = build_claim_sets(entity)
         turtle = entity_to_turtle(entity)
         result["raw_claims"] = raw_claims
         result["derived_claims"] = derived_claims
+        result["source_payloads"] = {
+            "g_trends_payload": result["g_trends_payload"],
+            "wikidata_payload": result["wikidata_payload"],
+        }
         result["turtle"] = turtle
         if debug_path:
             path = Path(debug_path)
@@ -70,4 +75,3 @@ async def enrich_entity_tool(subject: str, depth: int = 1, context: str | None =
         "query_context": result.query_context,
         "depth": result.depth,
     }
-

+ 2 - 0
app/models.py

@@ -24,6 +24,7 @@ class AtlasProvenance:
     retrieval_method: str
     confidence: float = 0.0
     retrieved_at: Optional[str] = None
+    evidence_property: str | None = None
 
 
 @dataclass
@@ -42,6 +43,7 @@ class AtlasClaim:
     layer: str
     status: str = "active"
     provenance: AtlasProvenance | None = None
+    created_at: str | None = None
 
 
 @dataclass

+ 31 - 2
app/storage_service.py

@@ -9,6 +9,7 @@ from __future__ import annotations
 import json
 import logging
 import os
+import time
 from typing import Any, Awaitable, Callable
 
 from mcp import ClientSession
@@ -46,10 +47,35 @@ def entity_iri(entity_id: str) -> str:
 class AtlasStorageService:
     def __init__(self, call_tool: CallToolFn | None = None):
         self._call_tool_override = call_tool
+        self._tool_cache: dict[str, tuple[float, dict[str, Any]]] = {}
+        self._tool_cache_ttl_seconds = float(os.getenv("ATLAS_VIRTUOSO_CALL_CACHE_TTL", "30"))
+
+    def _cache_key(self, tool_name: str, payload: dict[str, Any]) -> str:
+        return f"{tool_name}:{json.dumps(payload, sort_keys=True, separators=(',', ':'))}"
+
+    def _cache_get(self, key: str) -> dict[str, Any] | None:
+        item = self._tool_cache.get(key)
+        if not item:
+            return None
+        expires_at, value = item
+        if expires_at < time.time():
+            self._tool_cache.pop(key, None)
+            return None
+        return value
+
+    def _cache_set(self, key: str, value: dict[str, Any]) -> None:
+        self._tool_cache[key] = (time.time() + self._tool_cache_ttl_seconds, value)
 
     async def _call_tool(self, tool_name: str, payload: dict[str, Any]) -> dict[str, Any]:
+        cache_key = self._cache_key(tool_name, payload)
+        cached = self._cache_get(cache_key)
+        if cached is not None:
+            return cached
+
         if self._call_tool_override:
-            return await self._call_tool_override(tool_name, payload)
+            result = await self._call_tool_override(tool_name, payload)
+            self._cache_set(cache_key, result)
+            return result
 
         try:
             async with sse_client(
@@ -62,7 +88,10 @@ class AtlasStorageService:
                     result = await session.call_tool(tool_name, {"input": payload})
                     if result.isError:
                         raise RuntimeError(f"Tool {tool_name} failed: {result.error}")
-                    return result.structuredContent if result.structuredContent is not None else result.content
+                    data = result.structuredContent if result.structuredContent is not None else result.content
+                    if isinstance(data, dict):
+                        self._cache_set(cache_key, data)
+                    return data
         except Exception as exc:
             raise RuntimeError(f"Virtuoso MCP call failed for {tool_name}: {exc}")
 

+ 2 - 1
app/triple_export.py

@@ -35,7 +35,8 @@ def _alias_node(alias_label: str) -> str:
 
 
 def _claim_node(claim: AtlasClaim) -> str:
-    return f"atlas_data:claim_{_safe_fragment(claim.claim_id)}"
+    hash_part = claim.claim_id.split("_", maxsplit=2)[-1]
+    return f"atlas_data:Claim_{_safe_fragment(hash_part)}"
 
 
 def _provenance_node(claim: AtlasClaim) -> str:

+ 1 - 1
app/virtuoso_store.py

@@ -181,7 +181,7 @@ def _entity_from_binding(binding: dict) -> AtlasEntity:
             "source": "virtuoso",
             "raw": label or entity_uri,
             "normalized": (label or entity_uri),
-            "wikidata": (json.loads(raw_wd) if raw_wd else {"status": "missing"}),
+            "wikidata": (json.loads(raw_wd) if raw_wd else {"wikidata_status": "missing"}),
             **(json.loads(raw_trends) if raw_trends else {}),
         },
         needs_curation=(entity_type or "unknown") == "unknown",

+ 1 - 0
app/wikidata_lookup.py

@@ -56,5 +56,6 @@ async def lookup_wikidata(subject: str) -> Optional[dict[str, Any]]:
             "description": top.get("description"),
             "entity": entity_payload.get("entities", {}).get(qid, {}),
             "source": "wikidata",
+            "wikidata_status": "hit",
             "retrieved_at": datetime.now(timezone.utc).isoformat(),
         }

+ 4 - 0
config/entity_aliases.json

@@ -13,5 +13,9 @@
   "trump": "Donald Trump",
   "donald trump": "Donald Trump",
   "merz": "Friedrich Merz",
+  "Grace Latigo": "Grace Marta Latigo",
+  "grace latigo": "Grace Marta Latigo",  
+  "Rubio": "Marco Rubio",  
+  "marco rubio": "Marco Rubio",  
   "friedrich merz": "Friedrich Merz"
 }

+ 10 - 0
maintain_entities.sh

@@ -0,0 +1,10 @@
+#!/usr/bin/env bash
+set -euo pipefail
+
+ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
+if [[ -f "$ROOT_DIR/.venv/bin/activate" ]]; then
+  # shellcheck disable=SC1091
+  source "$ROOT_DIR/.venv/bin/activate"
+fi
+
+exec python "$ROOT_DIR/scripts/maintain_entities.py" "$@"

+ 296 - 0
scripts/maintain_entities.py

@@ -0,0 +1,296 @@
+#!/usr/bin/env python3
+"""Atlas maintenance script.
+
+Goal:
+- automatically revisit stored entities
+- enrich identifier coverage when Wikidata is present
+- keep the claim supersession model authoritative
+
+Operational rule:
+- no manual subject list is required for normal runs
+- --dry-run shows what would change, without writing
+"""
+
+from __future__ import annotations
+
+import argparse
+import asyncio
+import json
+import sys
+from pathlib import Path
+from dataclasses import asdict
+from datetime import datetime, timezone
+from typing import Any
+
+
+ROOT = Path(__file__).resolve().parents[1]
+if str(ROOT) not in sys.path:
+    sys.path.insert(0, str(ROOT))
+
+import app.atlas as atlas_module
+from app.atlas import resolve_entity
+from app.ids import claim_hash
+from app.models import AtlasClaim, AtlasClaimObject, AtlasEntity, AtlasProvenance
+from app.storage_service import AtlasStorageService
+from app.wikidata_lookup import lookup_wikidata
+
+# High-confidence identifier properties we can mine from the full Wikidata entity.
+# The goal is to enrich the entity with public identifiers and to reconcile the
+# Google MID whenever Wikidata already exposes the same identity through another id.
+#
+# Note: provenance.retrieval_method describes the evidence/property source
+# (for example "MusicBrainz artist ID"), not the name of this script.
+WIKIDATA_IDENTIFIER_PROPERTIES: dict[str, tuple[str, str]] = {
+    "P2671": ("mid", "Google Knowledge Graph ID"),
+    "P434": ("musicbrainz-artist-id", "MusicBrainz artist ID"),
+    "P435": ("musicbrainz-work-id", "MusicBrainz work ID"),
+    "P436": ("musicbrainz-release-group-id", "MusicBrainz release group ID"),
+    "P439": ("musicbrainz-release-id", "MusicBrainz release ID"),
+    "P444": ("musicbrainz-recording-id", "MusicBrainz recording ID"),
+    "P345": ("imdb-id", "IMDb ID"),
+    "P214": ("viaf-id", "VIAF ID"),
+    "P213": ("isni", "ISNI"),
+    "P227": ("gnd-id", "GND ID"),
+}
+
+# Entity-type-specific Wikidata fields worth capturing early. These are the
+# fields that help the most with disambiguation and downstream consolidation.
+WIKIDATA_TYPE_FIELD_PLAN: dict[str, dict[str, tuple[str, str]]] = {
+    "Person": {
+        "P569": ("birth-date", "date of birth"),
+        "P19": ("birth-place", "place of birth"),
+        "P27": ("citizenship", "country of citizenship"),
+    },
+    "Organization": {
+        "P571": ("inception", "inception"),
+        "P159": ("headquarters", "headquarters location"),
+        "P452": ("industry", "industry"),
+    },
+    "Location": {
+        "P571": ("inception", "inception"),
+        "P17": ("country", "country"),
+        "P131": ("located-in", "located in the administrative territorial entity"),
+    },
+}
+
+
+def _planned_claim_id(subject: str, predicate: str, value: str, layer: str = "raw") -> str:
+    created_at = datetime.now(timezone.utc).date().isoformat()
+    return f"clm_{layer}_{claim_hash(subject, predicate, value, layer, created_at=created_at)}"
+
+
+def build_parser() -> argparse.ArgumentParser:
+    parser = argparse.ArgumentParser(description="Atlas maintenance / claim adjudication helper")
+    parser.add_argument("--dry-run", action="store_true", help="Show planned claim updates without writing")
+    parser.add_argument("--page-size", type=int, default=50, help="How many entities to scan per page")
+    parser.add_argument("--start-after", default="", help="Resume scanning after this canonical label")
+    parser.add_argument("--checkpoint-file", default=".atlas-maintenance.checkpoint", help="File storing the last processed label")
+    parser.add_argument("--reset-checkpoint", action="store_true", help="Ignore any saved checkpoint and start from the beginning")
+    parser.add_argument("--clear-checkpoint", action="store_true", help="Delete the checkpoint file and exit")
+    return parser
+
+
+async def _sparql_bindings(query: str) -> list[dict[str, Any]]:
+    svc = AtlasStorageService()
+    result = await svc._call_tool("sparql_query", {"query": query})
+    if isinstance(result, list) and result:
+        first = result[0]
+        text = getattr(first, "text", None)
+        result = json.loads(text) if text else {}
+    return result.get("results", {}).get("bindings", []) if isinstance(result, dict) else []
+
+
+async def discover_subjects(page_size: int, start_after: str = "") -> list[str]:
+    """Ask Virtuoso for known Atlas entities and return their labels.
+
+    This keeps the maintenance job automatic: we operate on the stored graph,
+    not on a hand-entered subject list.
+    """
+    filter_clause = f'FILTER(STR(?label) > "{start_after.replace("\\", "\\\\").replace("\"", "\\\"")}")' if start_after else ""
+    query = """
+PREFIX atlas: <http://world.eu.org/atlas_ontology#>
+SELECT DISTINCT ?label WHERE {{
+  GRAPH <http://world.eu.org/atlas_data#> {{
+    ?entity a atlas:Entity ;
+            atlas:canonicalLabel ?label .
+    {filter_clause}
+  }}
+}}
+ORDER BY ?label
+LIMIT {page_size}
+""".format(filter_clause=filter_clause, page_size=page_size)
+    bindings = await _sparql_bindings(query)
+    return [b.get("label", {}).get("value", "") for b in bindings if b.get("label", {}).get("value")]
+
+
+async def maintain_subject(subject: str, dry_run: bool) -> dict[str, Any]:
+    # We resolve first so the maintenance run always starts from the current
+    # canonical entity shape, then we layer on any new evidence.
+    if dry_run:
+        original_write = atlas_module._storage.write_entity
+
+        async def _noop_write(entity):
+            return {"status": "dry-run", "entity_id": entity.atlas_id}
+
+        atlas_module._storage.write_entity = _noop_write
+        try:
+            entity = await resolve_entity(subject)
+        finally:
+            atlas_module._storage.write_entity = original_write
+    else:
+        entity = await resolve_entity(subject)
+    report: dict[str, Any] = {
+        "subject": subject,
+        "atlas_id": entity.atlas_id,
+        "planned": [],
+        "written": False,
+        "wikidata_status": "missing",
+        "planned_identifier_claims": 0,
+        "planned_identifier_types": [],
+        "planned_type_field_claims": 0,
+    }
+
+    wikidata = entity.raw_payload.get("wikidata") if isinstance(entity.raw_payload, dict) else None
+    if isinstance(wikidata, dict) and wikidata.get("wikidata_status") == "hit" and wikidata.get("qid"):
+        report["wikidata_status"] = "hit"
+        # If Wikidata already knows the entity, fetch the full object and mine
+        # any additional identifiers we can safely attach as claims.
+        full = await lookup_wikidata(subject)
+        if full and isinstance(full.get("entity"), dict):
+            report["wikidata_status"] = "enriched"
+            entity_block = full["entity"]
+            claims = entity_block.get("claims", {}) if isinstance(entity_block, dict) else {}
+
+            # QID is always a known cross-reference and acts as a stable anchor.
+            qid = full.get("qid")
+            existing_qid = entity.active_identifier("qid")
+            if qid and qid != existing_qid:
+                claim = AtlasClaim(
+                    claim_id=_planned_claim_id(entity.atlas_id, "atlas:hasIdentifier", qid),
+                    subject=entity.atlas_id,
+                    predicate="atlas:hasIdentifier",
+                    object=AtlasClaimObject(kind="identifier", id_type="qid", value=qid),
+                    layer="raw",
+                    provenance=AtlasProvenance(
+                        source="wikidata",
+                        retrieval_method="atlas-maintenance-wikidata-enrichment",
+                        confidence=0.99,
+                        retrieved_at=full.get("retrieved_at"),
+                        evidence_property="qid",
+                    ),
+                )
+                report["planned"].append({"action": "add_identifier_claim", "claim": asdict(claim)})
+                report["planned_identifier_claims"] += 1
+                report["planned_identifier_types"].append("qid")
+
+            for wikidata_property, (identifier_type, label) in WIKIDATA_IDENTIFIER_PROPERTIES.items():
+                property_claims = claims.get(wikidata_property, []) if isinstance(claims, dict) else []
+                for claim_node in property_claims:
+                    mainsnak = claim_node.get("mainsnak", {})
+                    datavalue = mainsnak.get("datavalue", {})
+                    value = datavalue.get("value")
+                    if not isinstance(value, str) or not value.strip():
+                        continue
+                    existing = entity.active_identifier(identifier_type)
+                    if existing == value:
+                        continue
+                    claim = AtlasClaim(
+                        claim_id=_planned_claim_id(entity.atlas_id, "atlas:hasIdentifier", value),
+                        subject=entity.atlas_id,
+                        predicate="atlas:hasIdentifier",
+                        object=AtlasClaimObject(kind="identifier", id_type=identifier_type, value=value),
+                        layer="raw",
+                        provenance=AtlasProvenance(
+                            source="wikidata",
+                            retrieval_method="atlas-maintenance-wikidata-enrichment",
+                            confidence=0.99,
+                            retrieved_at=full.get("retrieved_at"),
+                            evidence_property=wikidata_property,
+                        ),
+                    )
+                    report["planned"].append({"action": "add_identifier_claim", "claim": asdict(claim)})
+                    report["planned_identifier_claims"] += 1
+                    report["planned_identifier_types"].append(identifier_type)
+
+            # Type-specific enrichment: different entity kinds care about different fields.
+            # We only plan claims for high-confidence public facts that are useful for
+            # disambiguation and consolidation.
+            type_plan = WIKIDATA_TYPE_FIELD_PLAN.get(entity.entity_type, {})
+            for wikidata_property, (claim_type, label) in type_plan.items():
+                property_claims = claims.get(wikidata_property, []) if isinstance(claims, dict) else []
+                for claim_node in property_claims:
+                    mainsnak = claim_node.get("mainsnak", {})
+                    datavalue = mainsnak.get("datavalue", {})
+                    value = datavalue.get("value")
+                    if value in (None, "", {}):
+                        continue
+                    # For this first pass we capture these as literal payload claims;
+                    # the exact ontology mapping can be tightened later.
+                    if isinstance(value, dict):
+                        # entity / place objects often carry an id and label
+                        value = value.get("id") or value.get("time") or value.get("text") or value.get("amount")
+                    if not isinstance(value, str):
+                        continue
+                    claim = AtlasClaim(
+                        claim_id=_planned_claim_id(entity.atlas_id, f"atlas:has{claim_type.replace('-', ' ').title().replace(' ', '')}", value),
+                        subject=entity.atlas_id,
+                        predicate=f"atlas:has{claim_type.replace('-', ' ').title().replace(' ', '')}",
+                        object=AtlasClaimObject(kind="literal", value=value),
+                        layer="raw",
+                        provenance=AtlasProvenance(
+                            source="wikidata",
+                            retrieval_method="atlas-maintenance-wikidata-enrichment",
+                            confidence=0.95,
+                            retrieved_at=full.get("retrieved_at"),
+                            evidence_property=wikidata_property,
+                        ),
+                    )
+                    report["planned"].append({"action": "add_type_field_claim", "claim": asdict(claim)})
+                    report["planned_type_field_claims"] += 1
+    if dry_run:
+        return report
+
+    # The script currently only reports planned updates.
+    # Once the claim update path is wired, this is where write-back will happen.
+    report["written"] = False
+    return report
+
+
+async def main() -> int:
+    parser = build_parser()
+    args = parser.parse_args()
+
+    checkpoint_path = Path(args.checkpoint_file)
+    if args.clear_checkpoint:
+        if checkpoint_path.exists():
+            checkpoint_path.unlink()
+        print(json.dumps({"checkpoint_cleared": True, "checkpoint_file": str(checkpoint_path)}, indent=2, ensure_ascii=False))
+        return 0
+
+    start_after = args.start_after.strip()
+    if args.reset_checkpoint:
+        start_after = ""
+    elif not start_after:
+        checkpoint_path = Path(args.checkpoint_file)
+        if checkpoint_path.exists():
+            start_after = checkpoint_path.read_text(encoding="utf-8").strip()
+
+    subjects = await discover_subjects(args.page_size, start_after)
+    summaries = []
+    for subject in subjects:
+        summaries.append(await maintain_subject(subject, args.dry_run))
+
+    if subjects and not args.dry_run:
+        checkpoint_path.write_text(subjects[-1], encoding="utf-8")
+
+    print(json.dumps({
+        "dry_run": args.dry_run,
+        "checkpoint_file": str(checkpoint_path),
+        "checkpoint_start_after": start_after,
+        "results": summaries,
+    }, indent=2, ensure_ascii=False))
+    return 0
+
+
+if __name__ == "__main__":
+    raise SystemExit(asyncio.run(main()))