maintenance.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323
  1. from __future__ import annotations
  2. import argparse
  3. import asyncio
  4. import json
  5. import logging
  6. import os
  7. from typing import Any, Dict, Iterable, List, Optional, Tuple
  8. import rdflib
  9. from .atlas_store import _sparql_select, _sparql_update
  10. from .wikidata import WikidataSearch
  11. ATLAS_GRAPH_IRI = os.getenv("ATLAS_GRAPH_IRI", "http://world.eu.org/atlas_data#")
  12. VIRTUOSO_MCP_SSE_URL = os.getenv("ATLAS_VIRTUOSO_MCP_SSE_URL", "http://192.168.0.249:8501/mcp/sse")
  13. WIKIDATA_SUBCLASS_TTL = os.path.join(
  14. os.path.dirname(os.path.dirname(__file__)),
  15. "ontology",
  16. "wikidata_subclassof.ttl",
  17. )
  18. logger = logging.getLogger(__name__)
  19. def _setup_logging(log_level: str) -> None:
  20. logging.basicConfig(level=getattr(logging, log_level.upper(), logging.INFO), force=True)
  21. def _extract_wikidata_qids_from_entity_dump(wd_dump: dict[str, Any]) -> list[str]:
  22. """Extract candidate type/class QIDs from a Wikidata EntityData dump."""
  23. # dumps from Special:EntityData/{qid}.json look like:
  24. # { "entities": { "Q...": { "claims": {...} } } }
  25. entities = wd_dump.get("entities") or {}
  26. if not entities:
  27. return []
  28. entity = next(iter(entities.values()))
  29. claims = entity.get("claims") or {}
  30. # P31 = instance of, P279 = subclass of
  31. candidates: list[str] = []
  32. for prop, key in [("P31", "instance of"), ("P279", "subclass of")]:
  33. for claim_list in claims.get(prop, []) or []:
  34. try:
  35. mainsnak = claim_list.get("mainsnak", {})
  36. datavalue = mainsnak.get("datavalue", {})
  37. value = datavalue.get("value") or {}
  38. if isinstance(value, dict) and "id" in value:
  39. candidates.append(value["id"])
  40. except Exception:
  41. continue
  42. # de-dupe while preserving order
  43. seen = set()
  44. out: list[str] = []
  45. for q in candidates:
  46. if q and q not in seen:
  47. seen.add(q)
  48. out.append(q)
  49. return out
  50. def _infer_atlas_type_from_qids(qids: Iterable[str], subclass_graph: rdflib.Graph) -> str:
  51. """Map Wikidata classes to Atlas ontology types using wikidata_subclassof.ttl."""
  52. # The ttl maps Wikidata resources to dbpedia ontology classes.
  53. # We then bucket dbpedia classes by keyword into atlas types.
  54. def _bucket_from_obj(obj_str: str) -> Optional[str]:
  55. lower = obj_str.lower()
  56. if "person" in lower or "politician" in lower:
  57. return "atlas:Person"
  58. if "human" in lower or "bio" in lower or "biography" in lower:
  59. return "atlas:Person"
  60. if "organisation" in lower or "organization" in lower or "governmentagency" in lower or "agency" in lower:
  61. return "atlas:Organization"
  62. if any(k in lower for k in [
  63. "location",
  64. "place",
  65. "city",
  66. "town",
  67. "village",
  68. "populatedplace",
  69. "settlement",
  70. "country",
  71. "region",
  72. "river",
  73. ]):
  74. return "atlas:Location"
  75. # creative works: keep broad for now; refine later using debug logs.
  76. if any(k in lower for k in ["creativework", "creative work", "work", "album", "film", "book", "musicalwork", "song", "novel"]):
  77. return "atlas:CreativeWork"
  78. # events
  79. if any(k in lower for k in ["event", "tournament", "competition", "meeting", "conference", "festival", "match"]):
  80. return "atlas:Event"
  81. # products
  82. if any(k in lower for k in ["product", "software", "device", "instrument", "hardware"]):
  83. return "atlas:Product"
  84. return None
  85. for q in qids:
  86. # Support both namespace styles used in tests and data sources.
  87. start_nodes = [
  88. rdflib.URIRef(f"http://wikidata.dbpedia.org/resource/{q}"),
  89. rdflib.URIRef(f"http://www.wikidata.org/entity/{q}"),
  90. ]
  91. # Walk the rdfs:subClassOf closure to find something that matches our buckets.
  92. queue = list(start_nodes)
  93. visited: set[rdflib.term.Node] = set()
  94. while queue:
  95. cur = queue.pop(0)
  96. if cur in visited:
  97. continue
  98. visited.add(cur)
  99. for _s, _p, obj in subclass_graph.triples((cur, rdflib.RDFS.subClassOf, None)):
  100. obj_str = str(obj)
  101. logger.info("infer_type: qid=%s via=%s subClassOf=%s", q, str(cur), obj_str)
  102. bucket = _bucket_from_obj(obj_str)
  103. if bucket:
  104. logger.info("infer_type: mapped to %s via %s", bucket, obj_str)
  105. return bucket
  106. # Continue walking if the object is also a class resource (best-effort).
  107. if isinstance(obj, rdflib.URIRef) and obj not in visited:
  108. queue.append(obj)
  109. logger.info("infer_type: no mapping matched in closure, default atlas:Other")
  110. return "atlas:Other"
  111. def _extract_description_and_type_specific_details(qid: str, wd_dump: dict[str, Any], atlas_type: str) -> dict[str, Any]:
  112. entities = wd_dump.get("entities") or {}
  113. if not entities:
  114. return {}
  115. entity = next(iter(entities.values()))
  116. # labels/descriptions
  117. labels = entity.get("labels") or {}
  118. descs = entity.get("descriptions") or {}
  119. # Prefer en, fallback to any.
  120. label_en = labels.get("en", {}).get("value")
  121. desc_en = descs.get("en", {}).get("value")
  122. out: dict[str, Any] = {}
  123. if desc_en:
  124. out["description"] = desc_en
  125. if label_en:
  126. out["label"] = label_en
  127. claims = entity.get("claims") or {}
  128. if atlas_type == "atlas:Person":
  129. # birth date P569, birth place P19
  130. birth_date = _extract_time_literal_from_claims(claims.get("P569", []) or [])
  131. birth_place_qid = _extract_entity_qid_from_claims(claims.get("P19", []) or [])
  132. if birth_date:
  133. out["birthDate"] = birth_date
  134. if birth_place_qid:
  135. out["birthPlaceQid"] = birth_place_qid
  136. if atlas_type == "atlas:Location":
  137. # coordinates P625
  138. coords = _extract_coordinates_from_claims(claims.get("P625", []) or [])
  139. if coords:
  140. out["latitude"] = coords[0]
  141. out["longitude"] = coords[1]
  142. return out
  143. def _extract_entity_qid_from_claims(claim_list: list[dict[str, Any]]) -> Optional[str]:
  144. for claim in claim_list:
  145. try:
  146. mainsnak = claim.get("mainsnak", {})
  147. datavalue = mainsnak.get("datavalue", {})
  148. value = datavalue.get("value") or {}
  149. if isinstance(value, dict) and "id" in value:
  150. return value["id"]
  151. except Exception:
  152. continue
  153. return None
  154. def _extract_time_literal_from_claims(claim_list: list[dict[str, Any]]) -> Optional[str]:
  155. # Return the raw time string if present.
  156. for claim in claim_list:
  157. try:
  158. mainsnak = claim.get("mainsnak", {})
  159. datavalue = mainsnak.get("datavalue", {})
  160. value = datavalue.get("value") or {}
  161. if isinstance(value, dict) and "time" in value:
  162. return value["time"]
  163. except Exception:
  164. continue
  165. return None
  166. def _extract_coordinates_from_claims(claim_list: list[dict[str, Any]]) -> Optional[tuple[float, float]]:
  167. for claim in claim_list:
  168. try:
  169. mainsnak = claim.get("mainsnak", {})
  170. datavalue = mainsnak.get("datavalue", {})
  171. value = datavalue.get("value") or {}
  172. if isinstance(value, dict) and "latitude" in value and "longitude" in value:
  173. return (float(value["latitude"]), float(value["longitude"]))
  174. except Exception:
  175. continue
  176. return None
  177. async def _load_wikidata_entity_data(qid: str) -> dict[str, Any]:
  178. # Reuse the WikidataSearch helper.
  179. search = WikidataSearch({"search": "", "limit": 1})
  180. return await search.get_entity_data(qid)
  181. def _spatial_placeholder_update(entity_iri: str, new_fields: dict[str, Any]) -> str:
  182. # Scaffold only: we’ll wire the exact triple mappings in the next iteration.
  183. # For now we just return a comment string.
  184. return f"""# planned update for {entity_iri}: {json.dumps(new_fields, ensure_ascii=False)}"""
  185. async def run(args: argparse.Namespace) -> None:
  186. _setup_logging(args.log_level)
  187. logger.info("maintenance start dry_run=%s batch_size=%s", args.dry_run, args.batch_size)
  188. subclass_graph = rdflib.Graph()
  189. subclass_graph.parse(WIKIDATA_SUBCLASS_TTL, format="turtle")
  190. # 1) Fetch needsCuration=true entities.
  191. # We only need label + wikidata-qid identifier to proceed.
  192. select_query = f"""
  193. PREFIX atlas: <http://world.eu.org/atlas_ontology#>
  194. SELECT ?entity ?atlasId ?label ?qid
  195. WHERE {{
  196. GRAPH <{ATLAS_GRAPH_IRI}> {{
  197. ?entity a atlas:Entity ;
  198. atlas:atlasId ?atlasId ;
  199. atlas:canonicalLabel ?label ;
  200. atlas:needsCuration true .
  201. OPTIONAL {{
  202. ?entity atlas:hasIdentifier ?ident .
  203. ?ident atlas:scheme "wikidata-qid" ; atlas:value ?qid .
  204. }}
  205. }}
  206. }}
  207. LIMIT {args.batch_size}
  208. """.strip()
  209. rows = await _sparql_select(VIRTUOSO_MCP_SSE_URL, select_query)
  210. candidates = rows or []
  211. logger.info("maintenance candidates=%s", len(candidates))
  212. processed = 0
  213. planned_updates = 0
  214. skipped = 0
  215. for r in candidates:
  216. processed += 1
  217. entity_iri = r.get("entity", {}).get("value")
  218. atlas_id = r.get("atlasId", {}).get("value")
  219. label = r.get("label", {}).get("value")
  220. qid = (r.get("qid", {}) or {}).get("value")
  221. logger.info("candidate atlas_id=%s label=%s qid=%s", atlas_id, label, qid)
  222. if not qid:
  223. # Per your current reality this should not happen; we still keep it safe.
  224. skipped += 1
  225. logger.info("skip: missing wikidata-qid (unexpected) atlas_id=%s", atlas_id)
  226. continue
  227. wd_dump = await _load_wikidata_entity_data(qid)
  228. qids = _extract_wikidata_qids_from_entity_dump(wd_dump)
  229. logger.info("type inference: atlas_id=%s qids=%s", atlas_id, qids)
  230. atlas_type = _infer_atlas_type_from_qids(qids, subclass_graph)
  231. details = _extract_description_and_type_specific_details(qid, wd_dump, atlas_type)
  232. # Always plan type update + type-specific details.
  233. new_fields = {
  234. "type": atlas_type,
  235. **details,
  236. }
  237. if args.dry_run:
  238. logger.info("DRY RUN would update: %s", new_fields)
  239. else:
  240. # Non-dry-run wiring intentionally left for next step.
  241. # We only scaffold the planned update for now.
  242. logger.info("APPLY scaffold only (writes not enabled yet): %s", new_fields)
  243. planned_updates += 1
  244. logger.info(
  245. "maintenance done processed=%s planned_updates=%s skipped=%s dry_run=%s",
  246. processed,
  247. planned_updates,
  248. skipped,
  249. args.dry_run,
  250. )
  251. def main() -> None:
  252. parser = argparse.ArgumentParser(description="atlas2-mcp maintenance (needsCuration=true) — dry-run first.")
  253. parser.add_argument("--dry-run", action="store_true", help="Do not write updates")
  254. parser.add_argument("--batch-size", type=int, default=10)
  255. parser.add_argument("--log-level", type=str, default="INFO")
  256. parser.add_argument("--limit", type=int, default=None, help="Optional max entities (overrides batch-size)")
  257. args = parser.parse_args()
  258. if args.limit is not None:
  259. args.batch_size = args.limit
  260. asyncio.run(run(args))
  261. if __name__ == "__main__":
  262. main()