| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323 |
- from __future__ import annotations
- import argparse
- import asyncio
- import json
- import logging
- import os
- from typing import Any, Dict, Iterable, List, Optional, Tuple
- import rdflib
- from .atlas_store import _sparql_select, _sparql_update
- from .wikidata import WikidataSearch
- ATLAS_GRAPH_IRI = os.getenv("ATLAS_GRAPH_IRI", "http://world.eu.org/atlas_data#")
- VIRTUOSO_MCP_SSE_URL = os.getenv("ATLAS_VIRTUOSO_MCP_SSE_URL", "http://192.168.0.249:8501/mcp/sse")
- WIKIDATA_SUBCLASS_TTL = os.path.join(
- os.path.dirname(os.path.dirname(__file__)),
- "ontology",
- "wikidata_subclassof.ttl",
- )
- logger = logging.getLogger(__name__)
- def _setup_logging(log_level: str) -> None:
- logging.basicConfig(level=getattr(logging, log_level.upper(), logging.INFO), force=True)
- def _extract_wikidata_qids_from_entity_dump(wd_dump: dict[str, Any]) -> list[str]:
- """Extract candidate type/class QIDs from a Wikidata EntityData dump."""
- # dumps from Special:EntityData/{qid}.json look like:
- # { "entities": { "Q...": { "claims": {...} } } }
- entities = wd_dump.get("entities") or {}
- if not entities:
- return []
- entity = next(iter(entities.values()))
- claims = entity.get("claims") or {}
- # P31 = instance of, P279 = subclass of
- candidates: list[str] = []
- for prop, key in [("P31", "instance of"), ("P279", "subclass of")]:
- for claim_list in claims.get(prop, []) or []:
- try:
- mainsnak = claim_list.get("mainsnak", {})
- datavalue = mainsnak.get("datavalue", {})
- value = datavalue.get("value") or {}
- if isinstance(value, dict) and "id" in value:
- candidates.append(value["id"])
- except Exception:
- continue
- # de-dupe while preserving order
- seen = set()
- out: list[str] = []
- for q in candidates:
- if q and q not in seen:
- seen.add(q)
- out.append(q)
- return out
- def _infer_atlas_type_from_qids(qids: Iterable[str], subclass_graph: rdflib.Graph) -> str:
- """Map Wikidata classes to Atlas ontology types using wikidata_subclassof.ttl."""
- # The ttl maps Wikidata resources to dbpedia ontology classes.
- # We then bucket dbpedia classes by keyword into atlas types.
- def _bucket_from_obj(obj_str: str) -> Optional[str]:
- lower = obj_str.lower()
- if "person" in lower or "politician" in lower:
- return "atlas:Person"
- if "human" in lower or "bio" in lower or "biography" in lower:
- return "atlas:Person"
- if "organisation" in lower or "organization" in lower or "governmentagency" in lower or "agency" in lower:
- return "atlas:Organization"
- if any(k in lower for k in [
- "location",
- "place",
- "city",
- "town",
- "village",
- "populatedplace",
- "settlement",
- "country",
- "region",
- "river",
- ]):
- return "atlas:Location"
- # creative works: keep broad for now; refine later using debug logs.
- if any(k in lower for k in ["creativework", "creative work", "work", "album", "film", "book", "musicalwork", "song", "novel"]):
- return "atlas:CreativeWork"
- # events
- if any(k in lower for k in ["event", "tournament", "competition", "meeting", "conference", "festival", "match"]):
- return "atlas:Event"
- # products
- if any(k in lower for k in ["product", "software", "device", "instrument", "hardware"]):
- return "atlas:Product"
- return None
- for q in qids:
- # Support both namespace styles used in tests and data sources.
- start_nodes = [
- rdflib.URIRef(f"http://wikidata.dbpedia.org/resource/{q}"),
- rdflib.URIRef(f"http://www.wikidata.org/entity/{q}"),
- ]
- # Walk the rdfs:subClassOf closure to find something that matches our buckets.
- queue = list(start_nodes)
- visited: set[rdflib.term.Node] = set()
- while queue:
- cur = queue.pop(0)
- if cur in visited:
- continue
- visited.add(cur)
- for _s, _p, obj in subclass_graph.triples((cur, rdflib.RDFS.subClassOf, None)):
- obj_str = str(obj)
- logger.info("infer_type: qid=%s via=%s subClassOf=%s", q, str(cur), obj_str)
- bucket = _bucket_from_obj(obj_str)
- if bucket:
- logger.info("infer_type: mapped to %s via %s", bucket, obj_str)
- return bucket
- # Continue walking if the object is also a class resource (best-effort).
- if isinstance(obj, rdflib.URIRef) and obj not in visited:
- queue.append(obj)
- logger.info("infer_type: no mapping matched in closure, default atlas:Other")
- return "atlas:Other"
- def _extract_description_and_type_specific_details(qid: str, wd_dump: dict[str, Any], atlas_type: str) -> dict[str, Any]:
- entities = wd_dump.get("entities") or {}
- if not entities:
- return {}
- entity = next(iter(entities.values()))
- # labels/descriptions
- labels = entity.get("labels") or {}
- descs = entity.get("descriptions") or {}
- # Prefer en, fallback to any.
- label_en = labels.get("en", {}).get("value")
- desc_en = descs.get("en", {}).get("value")
- out: dict[str, Any] = {}
- if desc_en:
- out["description"] = desc_en
- if label_en:
- out["label"] = label_en
- claims = entity.get("claims") or {}
- if atlas_type == "atlas:Person":
- # birth date P569, birth place P19
- birth_date = _extract_time_literal_from_claims(claims.get("P569", []) or [])
- birth_place_qid = _extract_entity_qid_from_claims(claims.get("P19", []) or [])
- if birth_date:
- out["birthDate"] = birth_date
- if birth_place_qid:
- out["birthPlaceQid"] = birth_place_qid
- if atlas_type == "atlas:Location":
- # coordinates P625
- coords = _extract_coordinates_from_claims(claims.get("P625", []) or [])
- if coords:
- out["latitude"] = coords[0]
- out["longitude"] = coords[1]
- return out
- def _extract_entity_qid_from_claims(claim_list: list[dict[str, Any]]) -> Optional[str]:
- for claim in claim_list:
- try:
- mainsnak = claim.get("mainsnak", {})
- datavalue = mainsnak.get("datavalue", {})
- value = datavalue.get("value") or {}
- if isinstance(value, dict) and "id" in value:
- return value["id"]
- except Exception:
- continue
- return None
- def _extract_time_literal_from_claims(claim_list: list[dict[str, Any]]) -> Optional[str]:
- # Return the raw time string if present.
- for claim in claim_list:
- try:
- mainsnak = claim.get("mainsnak", {})
- datavalue = mainsnak.get("datavalue", {})
- value = datavalue.get("value") or {}
- if isinstance(value, dict) and "time" in value:
- return value["time"]
- except Exception:
- continue
- return None
- def _extract_coordinates_from_claims(claim_list: list[dict[str, Any]]) -> Optional[tuple[float, float]]:
- for claim in claim_list:
- try:
- mainsnak = claim.get("mainsnak", {})
- datavalue = mainsnak.get("datavalue", {})
- value = datavalue.get("value") or {}
- if isinstance(value, dict) and "latitude" in value and "longitude" in value:
- return (float(value["latitude"]), float(value["longitude"]))
- except Exception:
- continue
- return None
- async def _load_wikidata_entity_data(qid: str) -> dict[str, Any]:
- # Reuse the WikidataSearch helper.
- search = WikidataSearch({"search": "", "limit": 1})
- return await search.get_entity_data(qid)
- def _spatial_placeholder_update(entity_iri: str, new_fields: dict[str, Any]) -> str:
- # Scaffold only: we’ll wire the exact triple mappings in the next iteration.
- # For now we just return a comment string.
- return f"""# planned update for {entity_iri}: {json.dumps(new_fields, ensure_ascii=False)}"""
- async def run(args: argparse.Namespace) -> None:
- _setup_logging(args.log_level)
- logger.info("maintenance start dry_run=%s batch_size=%s", args.dry_run, args.batch_size)
- subclass_graph = rdflib.Graph()
- subclass_graph.parse(WIKIDATA_SUBCLASS_TTL, format="turtle")
- # 1) Fetch needsCuration=true entities.
- # We only need label + wikidata-qid identifier to proceed.
- select_query = f"""
- PREFIX atlas: <http://world.eu.org/atlas_ontology#>
- SELECT ?entity ?atlasId ?label ?qid
- WHERE {{
- GRAPH <{ATLAS_GRAPH_IRI}> {{
- ?entity a atlas:Entity ;
- atlas:atlasId ?atlasId ;
- atlas:canonicalLabel ?label ;
- atlas:needsCuration true .
- OPTIONAL {{
- ?entity atlas:hasIdentifier ?ident .
- ?ident atlas:scheme "wikidata-qid" ; atlas:value ?qid .
- }}
- }}
- }}
- LIMIT {args.batch_size}
- """.strip()
- rows = await _sparql_select(VIRTUOSO_MCP_SSE_URL, select_query)
- candidates = rows or []
- logger.info("maintenance candidates=%s", len(candidates))
- processed = 0
- planned_updates = 0
- skipped = 0
- for r in candidates:
- processed += 1
- entity_iri = r.get("entity", {}).get("value")
- atlas_id = r.get("atlasId", {}).get("value")
- label = r.get("label", {}).get("value")
- qid = (r.get("qid", {}) or {}).get("value")
- logger.info("candidate atlas_id=%s label=%s qid=%s", atlas_id, label, qid)
- if not qid:
- # Per your current reality this should not happen; we still keep it safe.
- skipped += 1
- logger.info("skip: missing wikidata-qid (unexpected) atlas_id=%s", atlas_id)
- continue
- wd_dump = await _load_wikidata_entity_data(qid)
- qids = _extract_wikidata_qids_from_entity_dump(wd_dump)
- logger.info("type inference: atlas_id=%s qids=%s", atlas_id, qids)
- atlas_type = _infer_atlas_type_from_qids(qids, subclass_graph)
- details = _extract_description_and_type_specific_details(qid, wd_dump, atlas_type)
- # Always plan type update + type-specific details.
- new_fields = {
- "type": atlas_type,
- **details,
- }
- if args.dry_run:
- logger.info("DRY RUN would update: %s", new_fields)
- else:
- # Non-dry-run wiring intentionally left for next step.
- # We only scaffold the planned update for now.
- logger.info("APPLY scaffold only (writes not enabled yet): %s", new_fields)
- planned_updates += 1
- logger.info(
- "maintenance done processed=%s planned_updates=%s skipped=%s dry_run=%s",
- processed,
- planned_updates,
- skipped,
- args.dry_run,
- )
- def main() -> None:
- parser = argparse.ArgumentParser(description="atlas2-mcp maintenance (needsCuration=true) — dry-run first.")
- parser.add_argument("--dry-run", action="store_true", help="Do not write updates")
- parser.add_argument("--batch-size", type=int, default=10)
- parser.add_argument("--log-level", type=str, default="INFO")
- parser.add_argument("--limit", type=int, default=None, help="Optional max entities (overrides batch-size)")
- args = parser.parse_args()
- if args.limit is not None:
- args.batch_size = args.limit
- asyncio.run(run(args))
- if __name__ == "__main__":
- main()
|