from __future__ import annotations import argparse import asyncio import json import logging import os from typing import Any, Dict, Iterable, List, Optional, Tuple import rdflib from .atlas_store import _sparql_select, _sparql_update from .wikidata import WikidataSearch ATLAS_GRAPH_IRI = os.getenv("ATLAS_GRAPH_IRI", "http://world.eu.org/atlas_data#") VIRTUOSO_MCP_SSE_URL = os.getenv("ATLAS_VIRTUOSO_MCP_SSE_URL", "http://192.168.0.249:8501/mcp/sse") WIKIDATA_SUBCLASS_TTL = os.path.join( os.path.dirname(os.path.dirname(__file__)), "ontology", "wikidata_subclassof.ttl", ) logger = logging.getLogger(__name__) def _setup_logging(log_level: str) -> None: logging.basicConfig(level=getattr(logging, log_level.upper(), logging.INFO), force=True) def _extract_wikidata_qids_from_entity_dump(wd_dump: dict[str, Any]) -> list[str]: """Extract candidate type/class QIDs from a Wikidata EntityData dump.""" # dumps from Special:EntityData/{qid}.json look like: # { "entities": { "Q...": { "claims": {...} } } } entities = wd_dump.get("entities") or {} if not entities: return [] entity = next(iter(entities.values())) claims = entity.get("claims") or {} # P31 = instance of, P279 = subclass of candidates: list[str] = [] for prop, key in [("P31", "instance of"), ("P279", "subclass of")]: for claim_list in claims.get(prop, []) or []: try: mainsnak = claim_list.get("mainsnak", {}) datavalue = mainsnak.get("datavalue", {}) value = datavalue.get("value") or {} if isinstance(value, dict) and "id" in value: candidates.append(value["id"]) except Exception: continue # de-dupe while preserving order seen = set() out: list[str] = [] for q in candidates: if q and q not in seen: seen.add(q) out.append(q) return out def _infer_atlas_type_from_qids(qids: Iterable[str], subclass_graph: rdflib.Graph) -> str: """Map Wikidata classes to Atlas ontology types using wikidata_subclassof.ttl.""" # The ttl maps Wikidata resources to dbpedia ontology classes. # We then bucket dbpedia classes by keyword into atlas types. def _bucket_from_obj(obj_str: str) -> Optional[str]: lower = obj_str.lower() if "person" in lower or "politician" in lower: return "atlas:Person" if "human" in lower or "bio" in lower or "biography" in lower: return "atlas:Person" if "organisation" in lower or "organization" in lower or "governmentagency" in lower or "agency" in lower: return "atlas:Organization" if any(k in lower for k in [ "location", "place", "city", "town", "village", "populatedplace", "settlement", "country", "region", "river", ]): return "atlas:Location" # creative works: keep broad for now; refine later using debug logs. if any(k in lower for k in ["creativework", "creative work", "work", "album", "film", "book", "musicalwork", "song", "novel"]): return "atlas:CreativeWork" # events if any(k in lower for k in ["event", "tournament", "competition", "meeting", "conference", "festival", "match"]): return "atlas:Event" # products if any(k in lower for k in ["product", "software", "device", "instrument", "hardware"]): return "atlas:Product" return None for q in qids: # Support both namespace styles used in tests and data sources. start_nodes = [ rdflib.URIRef(f"http://wikidata.dbpedia.org/resource/{q}"), rdflib.URIRef(f"http://www.wikidata.org/entity/{q}"), ] # Walk the rdfs:subClassOf closure to find something that matches our buckets. queue = list(start_nodes) visited: set[rdflib.term.Node] = set() while queue: cur = queue.pop(0) if cur in visited: continue visited.add(cur) for _s, _p, obj in subclass_graph.triples((cur, rdflib.RDFS.subClassOf, None)): obj_str = str(obj) logger.info("infer_type: qid=%s via=%s subClassOf=%s", q, str(cur), obj_str) bucket = _bucket_from_obj(obj_str) if bucket: logger.info("infer_type: mapped to %s via %s", bucket, obj_str) return bucket # Continue walking if the object is also a class resource (best-effort). if isinstance(obj, rdflib.URIRef) and obj not in visited: queue.append(obj) logger.info("infer_type: no mapping matched in closure, default atlas:Other") return "atlas:Other" def _extract_description_and_type_specific_details(qid: str, wd_dump: dict[str, Any], atlas_type: str) -> dict[str, Any]: entities = wd_dump.get("entities") or {} if not entities: return {} entity = next(iter(entities.values())) # labels/descriptions labels = entity.get("labels") or {} descs = entity.get("descriptions") or {} # Prefer en, fallback to any. label_en = labels.get("en", {}).get("value") desc_en = descs.get("en", {}).get("value") out: dict[str, Any] = {} if desc_en: out["description"] = desc_en if label_en: out["label"] = label_en claims = entity.get("claims") or {} if atlas_type == "atlas:Person": # birth date P569, birth place P19 birth_date = _extract_time_literal_from_claims(claims.get("P569", []) or []) birth_place_qid = _extract_entity_qid_from_claims(claims.get("P19", []) or []) if birth_date: out["birthDate"] = birth_date if birth_place_qid: out["birthPlaceQid"] = birth_place_qid if atlas_type == "atlas:Location": # coordinates P625 coords = _extract_coordinates_from_claims(claims.get("P625", []) or []) if coords: out["latitude"] = coords[0] out["longitude"] = coords[1] return out def _extract_entity_qid_from_claims(claim_list: list[dict[str, Any]]) -> Optional[str]: for claim in claim_list: try: mainsnak = claim.get("mainsnak", {}) datavalue = mainsnak.get("datavalue", {}) value = datavalue.get("value") or {} if isinstance(value, dict) and "id" in value: return value["id"] except Exception: continue return None def _extract_time_literal_from_claims(claim_list: list[dict[str, Any]]) -> Optional[str]: # Return the raw time string if present. for claim in claim_list: try: mainsnak = claim.get("mainsnak", {}) datavalue = mainsnak.get("datavalue", {}) value = datavalue.get("value") or {} if isinstance(value, dict) and "time" in value: return value["time"] except Exception: continue return None def _extract_coordinates_from_claims(claim_list: list[dict[str, Any]]) -> Optional[tuple[float, float]]: for claim in claim_list: try: mainsnak = claim.get("mainsnak", {}) datavalue = mainsnak.get("datavalue", {}) value = datavalue.get("value") or {} if isinstance(value, dict) and "latitude" in value and "longitude" in value: return (float(value["latitude"]), float(value["longitude"])) except Exception: continue return None async def _load_wikidata_entity_data(qid: str) -> dict[str, Any]: # Reuse the WikidataSearch helper. search = WikidataSearch({"search": "", "limit": 1}) return await search.get_entity_data(qid) def _spatial_placeholder_update(entity_iri: str, new_fields: dict[str, Any]) -> str: # Scaffold only: we’ll wire the exact triple mappings in the next iteration. # For now we just return a comment string. return f"""# planned update for {entity_iri}: {json.dumps(new_fields, ensure_ascii=False)}""" async def run(args: argparse.Namespace) -> None: _setup_logging(args.log_level) logger.info("maintenance start dry_run=%s batch_size=%s", args.dry_run, args.batch_size) subclass_graph = rdflib.Graph() subclass_graph.parse(WIKIDATA_SUBCLASS_TTL, format="turtle") # 1) Fetch needsCuration=true entities. # We only need label + wikidata-qid identifier to proceed. select_query = f""" PREFIX atlas: SELECT ?entity ?atlasId ?label ?qid WHERE {{ GRAPH <{ATLAS_GRAPH_IRI}> {{ ?entity a atlas:Entity ; atlas:atlasId ?atlasId ; atlas:canonicalLabel ?label ; atlas:needsCuration true . OPTIONAL {{ ?entity atlas:hasIdentifier ?ident . ?ident atlas:scheme "wikidata-qid" ; atlas:value ?qid . }} }} }} LIMIT {args.batch_size} """.strip() rows = await _sparql_select(VIRTUOSO_MCP_SSE_URL, select_query) candidates = rows or [] logger.info("maintenance candidates=%s", len(candidates)) processed = 0 planned_updates = 0 skipped = 0 for r in candidates: processed += 1 entity_iri = r.get("entity", {}).get("value") atlas_id = r.get("atlasId", {}).get("value") label = r.get("label", {}).get("value") qid = (r.get("qid", {}) or {}).get("value") logger.info("candidate atlas_id=%s label=%s qid=%s", atlas_id, label, qid) if not qid: # Per your current reality this should not happen; we still keep it safe. skipped += 1 logger.info("skip: missing wikidata-qid (unexpected) atlas_id=%s", atlas_id) continue wd_dump = await _load_wikidata_entity_data(qid) qids = _extract_wikidata_qids_from_entity_dump(wd_dump) logger.info("type inference: atlas_id=%s qids=%s", atlas_id, qids) atlas_type = _infer_atlas_type_from_qids(qids, subclass_graph) details = _extract_description_and_type_specific_details(qid, wd_dump, atlas_type) # Always plan type update + type-specific details. new_fields = { "type": atlas_type, **details, } if args.dry_run: logger.info("DRY RUN would update: %s", new_fields) else: # Non-dry-run wiring intentionally left for next step. # We only scaffold the planned update for now. logger.info("APPLY scaffold only (writes not enabled yet): %s", new_fields) planned_updates += 1 logger.info( "maintenance done processed=%s planned_updates=%s skipped=%s dry_run=%s", processed, planned_updates, skipped, args.dry_run, ) def main() -> None: parser = argparse.ArgumentParser(description="atlas2-mcp maintenance (needsCuration=true) — dry-run first.") parser.add_argument("--dry-run", action="store_true", help="Do not write updates") parser.add_argument("--batch-size", type=int, default=10) parser.add_argument("--log-level", type=str, default="INFO") parser.add_argument("--limit", type=int, default=None, help="Optional max entities (overrides batch-size)") args = parser.parse_args() if args.limit is not None: args.batch_size = args.limit asyncio.run(run(args)) if __name__ == "__main__": main()