""" atlas_store.py — SPARQL persistence for Atlas Entity objects. Two public functions: save_entity(entity, endpoint) — insert all triples into Virtuoso load_entity(atlas_id, endpoint) — reconstruct an Entity from the store Tested against Virtuoso 7.x (SPARQL 1.1 Update endpoint). The read side works against any SPARQL 1.1 endpoint. Dependencies: pip install SPARQLWrapper """ from __future__ import annotations import json import re import asyncio from typing import Any, Dict, List, Optional from SPARQLWrapper import JSON, POST, SPARQLWrapper from mcp import ClientSession from mcp.client.sse import sse_client from atlas_model import Claim, CurateFlag, Entity, Identifier, Provenance # --------------------------------------------------------------------------- # Namespace constants — keep in sync with atlas.ttl # --------------------------------------------------------------------------- ATLAS = "http://world.eu.org/atlas_ontology#" ATLAS_D = "http://world.eu.org/atlas_data#" XSD = "http://www.w3.org/2001/XMLSchema#" RDF = "http://www.w3.org/1999/02/22-rdf-syntax-ns#" PREFIXES = f"""\ PREFIX atlas: <{ATLAS}> PREFIX atlas_data: <{ATLAS_D}> PREFIX xsd: <{XSD}> PREFIX rdf: <{RDF}> """ # --------------------------------------------------------------------------- # Internal helpers # --------------------------------------------------------------------------- def _full_iri(prefixed: str) -> str: """Expand a prefixed IRI to a full IRI string.""" prefixed = prefixed.strip() if prefixed.startswith("atlas_data:"): return ATLAS_D + prefixed[len("atlas_data:"):] if prefixed.startswith("atlas:"): return ATLAS + prefixed[len("atlas:"):] if prefixed.startswith("<") and prefixed.endswith(">"): return prefixed[1:-1] return prefixed def _short_iri(full: str) -> str: """Compress a full IRI back to a prefixed form.""" if full.startswith(ATLAS_D): return "atlas_data:" + full[len(ATLAS_D):] if full.startswith(ATLAS): return "atlas:" + full[len(ATLAS):] return f"<{full}>" def _local(full_iri: str) -> str: """Return the local name of a full IRI (after # or last /).""" for sep in ("#", "/"): idx = full_iri.rfind(sep) if idx != -1: return full_iri[idx + 1:] return full_iri def _escape(s: str) -> str: """Escape a string for embedding in a SPARQL triple-quoted literal.""" return s.replace("\\", "\\\\").replace('"', '\\"').replace("\n", "\\n") def _sparql_update(endpoint: str, query: str) -> None: # If endpoint looks like an MCP SSE URL, call the remote MCP tool. if "/mcp/sse" in endpoint: async def _run() -> None: async with sse_client(endpoint, timeout=10, sse_read_timeout=300) as (read_stream, write_stream): async with ClientSession(read_stream, write_stream) as session: await session.initialize() result = await session.call_tool("sparql_update", {"input": {"query": query}}) if result.isError: raise RuntimeError(f"sparql_update failed: {result.error}") asyncio.run(_run()) return # Fallback: plain SPARQL endpoint URL. sparql = SPARQLWrapper(endpoint) sparql.setMethod(POST) sparql.setQuery(query) sparql.setReturnFormat(JSON) sparql.query() def _sparql_select(endpoint: str, query: str) -> List[Dict[str, Any]]: # If endpoint looks like an MCP SSE URL, call the remote MCP tool. if "/mcp/sse" in endpoint: async def _run() -> List[Dict[str, Any]]: async with sse_client(endpoint, timeout=10, sse_read_timeout=300) as (read_stream, write_stream): async with ClientSession(read_stream, write_stream) as session: await session.initialize() result = await session.call_tool("sparql_query", {"input": {"query": query}}) if result.isError: raise RuntimeError(f"sparql_query failed: {result.error}") data = result.structuredContent if result.structuredContent is not None else result.content if isinstance(data, dict): return data.get("results", {}).get("bindings", []) or [] return [] return asyncio.run(_run()) # Fallback: plain SPARQL endpoint URL. sparql = SPARQLWrapper(endpoint) sparql.setMethod('GET') sparql.setQuery(query) sparql.setReturnFormat(JSON) res = sparql.query().convert() return res.get("results", {}).get("bindings", []) # --------------------------------------------------------------------------- # Save # --------------------------------------------------------------------------- def save_entity(entity: Entity, endpoint: str) -> None: """ Insert all triples for an Entity into the SPARQL store. Uses SPARQL 1.1 INSERT DATA. Call delete_entity() first if you need to replace an existing entity (full replace pattern). Args: entity: The Entity to persist. endpoint: SPARQL Update endpoint URL, e.g. "http://localhost:8890/sparql-auth" """ ttl_body = _build_insert_body(entity) query = f"{PREFIXES}\nINSERT DATA {{\n{ttl_body}\n}}" _sparql_update(endpoint, query) def delete_entity(atlas_id: str, endpoint: str) -> None: """ Remove all triples where the entity or any of its blank/named nodes are the subject. Run before save_entity() for a clean replace. Args: atlas_id: e.g. "atlas:1b0e7222c7730540" endpoint: SPARQL Update endpoint URL. """ entity_iri = f"<{ATLAS_D}entity_{atlas_id.replace('atlas:', '')}>" # Delete triples where entity is subject, plus all linked sub-nodes # (identifiers, claims, provenance, curate flag) via a SPARQL DELETE WHERE. query = f"""{PREFIXES} DELETE {{ ?s ?p ?o . }} WHERE {{ {{ BIND({entity_iri} AS ?s) ?s ?p ?o . }} UNION {{ {entity_iri} atlas:hasIdentifier ?s . ?s ?p ?o . }} UNION {{ {entity_iri} atlas:hasClaim ?s . ?s ?p ?o . }} UNION {{ {entity_iri} atlas:hasClaim ?claim . ?claim atlas:hasProvenance ?s . ?s ?p ?o . }} UNION {{ {entity_iri} atlas:hasCurateFlag ?s . ?s ?p ?o . }} }}""" _sparql_update(endpoint, query) def _build_insert_body(entity: Entity) -> str: """Build the triple block (no INSERT DATA wrapper) for an Entity.""" lines: List[str] = [] e = f"<{_full_iri(entity._entity_iri())}>" # --- Entity core --- lines += [ f" {e} a atlas:Entity ;", f' atlas:atlasId "{_escape(entity.id)}" ;', f' atlas:canonicalLabel "{_escape(entity.label)}"@en ;', ] if entity.description: lines.append(f' atlas:canonicalDescription "{_escape(entity.description)}"@en ;') if entity.type: lines.append(f' atlas:hasCanonicalType <{_full_iri(entity.type)}> ;') for alias in entity.aliases: lines.append(f' atlas:aliasLabel "{_escape(alias)}"@en ;') for ident in entity.identifiers: iiri = f"<{_full_iri(entity._identifier_iri(ident))}>" lines.append(f' atlas:hasIdentifier {iiri} ;') for key, val in entity.attributes.items(): if isinstance(val, bool): lines.append(f' atlas:{key} "{str(val).lower()}"^^xsd:boolean ;') elif isinstance(val, float): lines.append(f' atlas:{key} "{val}"^^xsd:decimal ;') elif isinstance(val, int): lines.append(f' atlas:{key} "{val}"^^xsd:integer ;') else: lines.append(f' atlas:{key} "{_escape(str(val))}" ;') for blob in entity.raw_json: lines.append(f' atlas:rawJson "{_escape(blob)}"^^xsd:string ;') for claim in entity.claims: obj = claim.object_iri or claim.object_literal or "" cid = f"<{_full_iri(entity._claim_id(claim.predicate, obj))}>" lines.append(f' atlas:hasClaim {cid} ;') lines.append(f' atlas:needsCuration "{str(entity.needs_curation).lower()}"^^xsd:boolean') if entity.curate_flag: lines[-1] += " ;" curate_iri = f"<{_full_iri(entity._entity_iri())}_curate>" lines.append(f' atlas:hasCurateFlag {curate_iri}') lines[-1] += " ." # --- Identifier nodes --- for ident in entity.identifiers: iiri = f"<{_full_iri(entity._identifier_iri(ident))}>" lines += [ f" {iiri} a atlas:Identifier ;", f' atlas:scheme "{_escape(ident.scheme)}" ;', f' atlas:value "{_escape(ident.value)}" .', ] # --- Claim + Provenance nodes --- for claim in entity.claims: obj = claim.object_iri or claim.object_literal or "" cid_str = entity._claim_id(claim.predicate, obj) cid = f"<{_full_iri(cid_str)}>" pid = f"<{_full_iri(entity._prov_id(cid_str))}>" pred_iri = f"<{_full_iri(claim.predicate)}>" lines += [ f" {cid} a atlas:Claim ;", f' atlas:claimSubjectIri {e} ;', f' atlas:claimPredicate {pred_iri} ;', ] if claim.object_iri: lines.append(f' atlas:claimObjectIri <{_full_iri(claim.object_iri)}> ;') elif claim.object_literal: lines.append(f' atlas:claimObjectLiteral "{_escape(claim.object_literal)}" ;') lines += [ f' atlas:claimLayer "{claim.layer}" ;', f' atlas:claimStatus "{claim.status}"', ] if claim.provenance: lines[-1] += " ;" lines.append(f' atlas:hasProvenance {pid}') lines[-1] += " ." if claim.provenance: p = claim.provenance lines += [ f" {pid} a atlas:Provenance ;", f' atlas:provenanceSource "{_escape(p.source)}" ;', f' atlas:retrievalMethod "{_escape(p.method)}" ;', f' atlas:confidence "{p.confidence}"^^xsd:decimal ;', f' atlas:retrievedAt "{p.retrieved_at}"^^xsd:dateTime .', ] # --- CurateFlag node --- if entity.curate_flag: curate_iri = f"<{_full_iri(entity._entity_iri())}_curate>" lines += [ f" {curate_iri} a atlas:CurateFlag ;", f' atlas:curationReason "{_escape(entity.curate_flag.reason)}"@en .', ] return "\n".join(lines) # --------------------------------------------------------------------------- # Load # --------------------------------------------------------------------------- def load_entity(atlas_id: str, endpoint: str) -> Optional[Entity]: """ Reconstruct an Entity object from the SPARQL store. Returns None if the entity does not exist. Args: atlas_id: e.g. "atlas:1b0e7222c7730540" endpoint: SPARQL Query endpoint URL, e.g. "http://localhost:8890/sparql" """ entity_iri = f"<{ATLAS_D}entity_{atlas_id.replace('atlas:', '')}>" # --- 1. Core entity fields --- core = _sparql_select(endpoint, f"""{PREFIXES} SELECT ?label ?description ?type ?needsCuration WHERE {{ {entity_iri} atlas:canonicalLabel ?label . OPTIONAL {{ {entity_iri} atlas:canonicalDescription ?description . }} OPTIONAL {{ {entity_iri} atlas:hasCanonicalType ?type . }} OPTIONAL {{ {entity_iri} atlas:needsCuration ?needsCuration . }} }}""") if not core: return None row = core[0] label = row["label"]["value"] description = row.get("description", {}).get("value") type_full = row.get("type", {}).get("value") entity_type = _short_iri(type_full) if type_full else None needs_curation = row.get("needsCuration", {}).get("value", "false").lower() == "true" # --- 2. Aliases --- alias_rows = _sparql_select(endpoint, f"""{PREFIXES} SELECT ?alias WHERE {{ {entity_iri} atlas:aliasLabel ?alias . }}""") aliases = [r["alias"]["value"] for r in alias_rows] # --- 3. Identifiers --- ident_rows = _sparql_select(endpoint, f"""{PREFIXES} SELECT ?scheme ?value WHERE {{ {entity_iri} atlas:hasIdentifier ?ident . ?ident atlas:scheme ?scheme ; atlas:value ?value . }}""") identifiers = [ Identifier(scheme=r["scheme"]["value"], value=r["value"]["value"]) for r in ident_rows ] # --- 4. Attributes (any atlas: datatype property not covered above) --- KNOWN = { "atlasId", "canonicalLabel", "canonicalDescription", "aliasLabel", "needsCuration", "rawJson", } attr_rows = _sparql_select(endpoint, f"""{PREFIXES} SELECT ?pred ?val WHERE {{ {entity_iri} ?pred ?val . FILTER(STRSTARTS(STR(?pred), "{ATLAS}")) FILTER(isLiteral(?val)) }}""") attributes: Dict[str, Any] = {} raw_json: List[str] = [] for r in attr_rows: pred_local = _local(r["pred"]["value"]) if pred_local in KNOWN: continue val = r["val"]["value"] dt = r["val"].get("datatype", "") if pred_local == "rawJson": raw_json.append(val) elif dt.endswith("boolean"): attributes[pred_local] = val.lower() == "true" elif dt.endswith("decimal") or dt.endswith("float") or dt.endswith("double"): attributes[pred_local] = float(val) elif dt.endswith("integer") or dt.endswith("int"): attributes[pred_local] = int(val) else: attributes[pred_local] = val # --- 5. Claims + Provenance --- claim_rows = _sparql_select(endpoint, f"""{PREFIXES} SELECT ?pred ?objIri ?objLit ?layer ?status ?provSource ?provMethod ?provConf ?provAt WHERE {{ {entity_iri} atlas:hasClaim ?claim . ?claim atlas:claimPredicate ?pred ; atlas:claimLayer ?layer ; atlas:claimStatus ?status . OPTIONAL {{ ?claim atlas:claimObjectIri ?objIri . }} OPTIONAL {{ ?claim atlas:claimObjectLiteral ?objLit . }} OPTIONAL {{ ?claim atlas:hasProvenance ?prov . ?prov atlas:provenanceSource ?provSource ; atlas:retrievalMethod ?provMethod ; atlas:confidence ?provConf ; atlas:retrievedAt ?provAt . }} }}""") claims: List[Claim] = [] for r in claim_rows: prov = None if "provSource" in r: prov = Provenance( source=r["provSource"]["value"], method=r["provMethod"]["value"], confidence=float(r["provConf"]["value"]), retrieved_at=r["provAt"]["value"], ) claims.append(Claim( predicate=_short_iri(r["pred"]["value"]), object_iri=_short_iri(r["objIri"]["value"]) if "objIri" in r else None, object_literal=r["objLit"]["value"] if "objLit" in r else None, layer=r["layer"]["value"], status=r["status"]["value"], provenance=prov, )) # --- 6. CurateFlag --- curate_flag = None curate_rows = _sparql_select(endpoint, f"""{PREFIXES} SELECT ?reason WHERE {{ {entity_iri} atlas:hasCurateFlag ?flag . ?flag atlas:curationReason ?reason . }}""") if curate_rows: curate_flag = CurateFlag(reason=curate_rows[0]["reason"]["value"]) return Entity( id=atlas_id, label=label, description=description, type=entity_type, aliases=aliases, identifiers=identifiers, attributes=attributes, raw_json=raw_json, claims=claims, needs_curation=needs_curation, curate_flag=curate_flag, )