maintenance.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317
  1. from __future__ import annotations
  2. import argparse
  3. import asyncio
  4. import json
  5. import logging
  6. import os
  7. from typing import Any, Dict, Iterable, List, Optional, Tuple
  8. import rdflib
  9. from .atlas_store import _sparql_select, _sparql_update
  10. from .wikidata import WikidataSearch
  11. ATLAS_GRAPH_IRI = os.getenv("ATLAS_GRAPH_IRI", "http://world.eu.org/atlas_data#")
  12. VIRTUOSO_MCP_SSE_URL = os.getenv("ATLAS_VIRTUOSO_MCP_SSE_URL", "http://192.168.0.249:8501/mcp/sse")
  13. WIKIDATA_SUBCLASS_TTL = os.path.join(
  14. os.path.dirname(os.path.dirname(__file__)),
  15. "ontology",
  16. "wikidata_subclassof.ttl",
  17. )
  18. logger = logging.getLogger(__name__)
  19. def _setup_logging(log_level: str) -> None:
  20. logging.basicConfig(level=getattr(logging, log_level.upper(), logging.INFO), force=True)
  21. def _extract_wikidata_qids_from_entity_dump(wd_dump: dict[str, Any]) -> list[str]:
  22. """Extract candidate type/class QIDs from a Wikidata EntityData dump."""
  23. # dumps from Special:EntityData/{qid}.json look like:
  24. # { "entities": { "Q...": { "claims": {...} } } }
  25. entities = wd_dump.get("entities") or {}
  26. if not entities:
  27. return []
  28. entity = next(iter(entities.values()))
  29. claims = entity.get("claims") or {}
  30. # P31 = instance of, P279 = subclass of
  31. candidates: list[str] = []
  32. for prop, key in [("P31", "instance of"), ("P279", "subclass of")]:
  33. for claim_list in claims.get(prop, []) or []:
  34. try:
  35. mainsnak = claim_list.get("mainsnak", {})
  36. datavalue = mainsnak.get("datavalue", {})
  37. value = datavalue.get("value") or {}
  38. if isinstance(value, dict) and "id" in value:
  39. candidates.append(value["id"])
  40. except Exception:
  41. continue
  42. # de-dupe while preserving order
  43. seen = set()
  44. out: list[str] = []
  45. for q in candidates:
  46. if q and q not in seen:
  47. seen.add(q)
  48. out.append(q)
  49. return out
  50. def _infer_atlas_type_from_qids(qids: Iterable[str], subclass_graph: rdflib.Graph) -> str:
  51. """Map Wikidata classes to Atlas ontology types using wikidata_subclassof.ttl."""
  52. # The ttl maps Wikidata resources to dbpedia ontology classes.
  53. # We then bucket dbpedia classes by keyword into atlas types.
  54. def _bucket_from_obj(obj_str: str) -> Optional[str]:
  55. lower = obj_str.lower()
  56. if "person" in lower or "politician" in lower:
  57. return "atlas:Person"
  58. if "organisation" in lower or "organization" in lower or "governmentagency" in lower or "agency" in lower:
  59. return "atlas:Organization"
  60. if any(k in lower for k in [
  61. "location",
  62. "place",
  63. "city",
  64. "town",
  65. "village",
  66. "populatedplace",
  67. "settlement",
  68. "country",
  69. "region",
  70. "river",
  71. ]):
  72. return "atlas:Location"
  73. # creative works: keep broad for now; refine later using debug logs.
  74. if any(k in lower for k in ["creativework", "creative work", "work", "album", "film", "book", "musicalwork", "song", "novel"]):
  75. return "atlas:CreativeWork"
  76. # events
  77. if any(k in lower for k in ["event", "tournament", "competition", "meeting", "conference", "festival", "match"]):
  78. return "atlas:Event"
  79. # products
  80. if any(k in lower for k in ["product", "software", "device", "instrument", "hardware"]):
  81. return "atlas:Product"
  82. return None
  83. for q in qids:
  84. start = rdflib.URIRef(f"http://wikidata.dbpedia.org/resource/{q}")
  85. # Walk the rdfs:subClassOf closure to find something that matches our buckets.
  86. queue = [start]
  87. visited: set[rdflib.term.Node] = set()
  88. while queue:
  89. cur = queue.pop(0)
  90. if cur in visited:
  91. continue
  92. visited.add(cur)
  93. for _s, _p, obj in subclass_graph.triples((cur, rdflib.RDFS.subClassOf, None)):
  94. obj_str = str(obj)
  95. logger.info("infer_type: qid=%s via=%s subClassOf=%s", q, str(cur), obj_str)
  96. bucket = _bucket_from_obj(obj_str)
  97. if bucket:
  98. logger.info("infer_type: mapped to %s via %s", bucket, obj_str)
  99. return bucket
  100. # Continue walking if the object is also a class resource (best-effort).
  101. if isinstance(obj, rdflib.URIRef) and obj not in visited:
  102. queue.append(obj)
  103. logger.info("infer_type: no mapping matched in closure, default atlas:Other")
  104. return "atlas:Other"
  105. def _extract_description_and_type_specific_details(qid: str, wd_dump: dict[str, Any], atlas_type: str) -> dict[str, Any]:
  106. entities = wd_dump.get("entities") or {}
  107. if not entities:
  108. return {}
  109. entity = next(iter(entities.values()))
  110. # labels/descriptions
  111. labels = entity.get("labels") or {}
  112. descs = entity.get("descriptions") or {}
  113. # Prefer en, fallback to any.
  114. label_en = labels.get("en", {}).get("value")
  115. desc_en = descs.get("en", {}).get("value")
  116. out: dict[str, Any] = {}
  117. if desc_en:
  118. out["description"] = desc_en
  119. if label_en:
  120. out["label"] = label_en
  121. claims = entity.get("claims") or {}
  122. if atlas_type == "atlas:Person":
  123. # birth date P569, birth place P19
  124. birth_date = _extract_time_literal_from_claims(claims.get("P569", []) or [])
  125. birth_place_qid = _extract_entity_qid_from_claims(claims.get("P19", []) or [])
  126. if birth_date:
  127. out["birthDate"] = birth_date
  128. if birth_place_qid:
  129. out["birthPlaceQid"] = birth_place_qid
  130. if atlas_type == "atlas:Location":
  131. # coordinates P625
  132. coords = _extract_coordinates_from_claims(claims.get("P625", []) or [])
  133. if coords:
  134. out["latitude"] = coords[0]
  135. out["longitude"] = coords[1]
  136. return out
  137. def _extract_entity_qid_from_claims(claim_list: list[dict[str, Any]]) -> Optional[str]:
  138. for claim in claim_list:
  139. try:
  140. mainsnak = claim.get("mainsnak", {})
  141. datavalue = mainsnak.get("datavalue", {})
  142. value = datavalue.get("value") or {}
  143. if isinstance(value, dict) and "id" in value:
  144. return value["id"]
  145. except Exception:
  146. continue
  147. return None
  148. def _extract_time_literal_from_claims(claim_list: list[dict[str, Any]]) -> Optional[str]:
  149. # Return the raw time string if present.
  150. for claim in claim_list:
  151. try:
  152. mainsnak = claim.get("mainsnak", {})
  153. datavalue = mainsnak.get("datavalue", {})
  154. value = datavalue.get("value") or {}
  155. if isinstance(value, dict) and "time" in value:
  156. return value["time"]
  157. except Exception:
  158. continue
  159. return None
  160. def _extract_coordinates_from_claims(claim_list: list[dict[str, Any]]) -> Optional[tuple[float, float]]:
  161. for claim in claim_list:
  162. try:
  163. mainsnak = claim.get("mainsnak", {})
  164. datavalue = mainsnak.get("datavalue", {})
  165. value = datavalue.get("value") or {}
  166. if isinstance(value, dict) and "latitude" in value and "longitude" in value:
  167. return (float(value["latitude"]), float(value["longitude"]))
  168. except Exception:
  169. continue
  170. return None
  171. async def _load_wikidata_entity_data(qid: str) -> dict[str, Any]:
  172. # Reuse the WikidataSearch helper.
  173. search = WikidataSearch({"search": "", "limit": 1})
  174. return await search.get_entity_data(qid)
  175. def _spatial_placeholder_update(entity_iri: str, new_fields: dict[str, Any]) -> str:
  176. # Scaffold only: we’ll wire the exact triple mappings in the next iteration.
  177. # For now we just return a comment string.
  178. return f"""# planned update for {entity_iri}: {json.dumps(new_fields, ensure_ascii=False)}"""
  179. async def run(args: argparse.Namespace) -> None:
  180. _setup_logging(args.log_level)
  181. logger.info("maintenance start dry_run=%s batch_size=%s", args.dry_run, args.batch_size)
  182. subclass_graph = rdflib.Graph()
  183. subclass_graph.parse(WIKIDATA_SUBCLASS_TTL, format="turtle")
  184. # 1) Fetch needsCuration=true entities.
  185. # We only need label + wikidata-qid identifier to proceed.
  186. select_query = f"""
  187. PREFIX atlas: <http://world.eu.org/atlas_ontology#>
  188. SELECT ?entity ?atlasId ?label ?qid
  189. WHERE {{
  190. GRAPH <{ATLAS_GRAPH_IRI}> {{
  191. ?entity a atlas:Entity ;
  192. atlas:atlasId ?atlasId ;
  193. atlas:canonicalLabel ?label ;
  194. atlas:needsCuration true .
  195. OPTIONAL {{
  196. ?entity atlas:hasIdentifier ?ident .
  197. ?ident atlas:scheme "wikidata-qid" ; atlas:value ?qid .
  198. }}
  199. }}
  200. }}
  201. LIMIT {args.batch_size}
  202. """.strip()
  203. rows = await _sparql_select(VIRTUOSO_MCP_SSE_URL, select_query)
  204. candidates = rows or []
  205. logger.info("maintenance candidates=%s", len(candidates))
  206. processed = 0
  207. planned_updates = 0
  208. skipped = 0
  209. for r in candidates:
  210. processed += 1
  211. entity_iri = r.get("entity", {}).get("value")
  212. atlas_id = r.get("atlasId", {}).get("value")
  213. label = r.get("label", {}).get("value")
  214. qid = (r.get("qid", {}) or {}).get("value")
  215. logger.info("candidate atlas_id=%s label=%s qid=%s", atlas_id, label, qid)
  216. if not qid:
  217. # Per your current reality this should not happen; we still keep it safe.
  218. skipped += 1
  219. logger.info("skip: missing wikidata-qid (unexpected) atlas_id=%s", atlas_id)
  220. continue
  221. wd_dump = await _load_wikidata_entity_data(qid)
  222. qids = _extract_wikidata_qids_from_entity_dump(wd_dump)
  223. logger.info("type inference: atlas_id=%s qids=%s", atlas_id, qids)
  224. atlas_type = _infer_atlas_type_from_qids(qids, subclass_graph)
  225. details = _extract_description_and_type_specific_details(qid, wd_dump, atlas_type)
  226. # Always plan type update + type-specific details.
  227. new_fields = {
  228. "type": atlas_type,
  229. **details,
  230. }
  231. if args.dry_run:
  232. logger.info("DRY RUN would update: %s", new_fields)
  233. else:
  234. # Non-dry-run wiring intentionally left for next step.
  235. # We only scaffold the planned update for now.
  236. logger.info("APPLY scaffold only (writes not enabled yet): %s", new_fields)
  237. planned_updates += 1
  238. logger.info(
  239. "maintenance done processed=%s planned_updates=%s skipped=%s dry_run=%s",
  240. processed,
  241. planned_updates,
  242. skipped,
  243. args.dry_run,
  244. )
  245. def main() -> None:
  246. parser = argparse.ArgumentParser(description="atlas2-mcp maintenance (needsCuration=true) — dry-run first.")
  247. parser.add_argument("--dry-run", action="store_true", help="Do not write updates")
  248. parser.add_argument("--batch-size", type=int, default=10)
  249. parser.add_argument("--log-level", type=str, default="INFO")
  250. parser.add_argument("--limit", type=int, default=None, help="Optional max entities (overrides batch-size)")
  251. args = parser.parse_args()
  252. if args.limit is not None:
  253. args.batch_size = args.limit
  254. asyncio.run(run(args))
  255. if __name__ == "__main__":
  256. main()