| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536 |
- """
- atlas_store.py — SPARQL persistence for Atlas Entity objects.
- Two public functions:
- save_entity(entity, endpoint) — insert all triples into Virtuoso
- load_entity(atlas_id, endpoint) — reconstruct an Entity from the store
- Tested against Virtuoso 7.x (SPARQL 1.1 Update endpoint).
- The read side works against any SPARQL 1.1 endpoint.
- Dependencies:
- pip install SPARQLWrapper
- """
- from __future__ import annotations
- import json
- import re
- import asyncio
- import os
- from typing import Any, Dict, List, Optional
- import logging
- from mcp import ClientSession
- from mcp.client.sse import sse_client
- from .atlas_model import Claim, CurateFlag, Entity, Identifier, Provenance
- # ---------------------------------------------------------------------------
- # Namespace constants — keep in sync with atlas.ttl
- # ---------------------------------------------------------------------------
- ATLAS = "http://world.eu.org/atlas_ontology#"
- ATLAS_D = "http://world.eu.org/atlas_data#"
- XSD = "http://www.w3.org/2001/XMLSchema#"
- RDF = "http://www.w3.org/1999/02/22-rdf-syntax-ns#"
- PREFIXES = f"""\
- PREFIX atlas: <{ATLAS}>
- PREFIX atlas_data: <{ATLAS_D}>
- PREFIX xsd: <{XSD}>
- PREFIX rdf: <{RDF}>
- """
- DEFAULT_GRAPH_IRI = os.getenv("ATLAS_GRAPH_IRI", ATLAS_D)
- DEBUG_LOGS = os.getenv("ATLAS_DEBUG_LOGS", "false").lower() in {"1", "true", "yes", "on"}
- logger = logging.getLogger(__name__)
- # ---------------------------------------------------------------------------
- # Internal helpers
- # ---------------------------------------------------------------------------
- def _full_iri(prefixed: str) -> str:
- """Expand a prefixed IRI to a full IRI string."""
- prefixed = prefixed.strip()
- if prefixed.startswith("atlas_data:"):
- return ATLAS_D + prefixed[len("atlas_data:"):]
- if prefixed.startswith("atlas:"):
- return ATLAS + prefixed[len("atlas:"):]
- if prefixed.startswith("<") and prefixed.endswith(">"):
- return prefixed[1:-1]
- return prefixed
- def _short_iri(full: str) -> str:
- """Compress a full IRI back to a prefixed form."""
- if full.startswith(ATLAS_D):
- return "atlas_data:" + full[len(ATLAS_D):]
- if full.startswith(ATLAS):
- return "atlas:" + full[len(ATLAS):]
- return f"<{full}>"
- def _local(full_iri: str) -> str:
- """Return the local name of a full IRI (after # or last /)."""
- for sep in ("#", "/"):
- idx = full_iri.rfind(sep)
- if idx != -1:
- return full_iri[idx + 1:]
- return full_iri
- def _escape(s: str) -> str:
- """Escape a string for embedding in a SPARQL triple-quoted literal."""
- return s.replace("\\", "\\\\").replace('"', '\\"').replace("\n", "\\n")
- async def _sparql_update(endpoint: str, query: str) -> None:
- if "/mcp/sse" not in endpoint:
- raise RuntimeError("atlas_store only supports Virtuoso MCP/SSE endpoints in this scaffold")
- async def _run() -> None:
- async with sse_client(endpoint, timeout=10, sse_read_timeout=300) as (read_stream, write_stream):
- async with ClientSession(read_stream, write_stream) as session:
- await session.initialize()
- result = await session.call_tool("sparql_update", {"input": {"query": query}})
- if result.isError:
- raise RuntimeError(f"sparql_update failed: {result}")
- await _run()
- async def _sparql_select(endpoint: str, query: str) -> List[Dict[str, Any]]:
- if "/mcp/sse" not in endpoint:
- raise RuntimeError("atlas_store only supports Virtuoso MCP/SSE endpoints in this scaffold")
- async def _run() -> List[Dict[str, Any]]:
- async with sse_client(endpoint, timeout=10, sse_read_timeout=300) as (read_stream, write_stream):
- async with ClientSession(read_stream, write_stream) as session:
- await session.initialize()
- result = await session.call_tool("sparql_query", {"input": {"query": query}})
- if result.isError:
- raise RuntimeError(f"sparql_query failed: {result}")
- data = result.structuredContent if result.structuredContent is not None else result.content
- if DEBUG_LOGS:
- if isinstance(data, dict):
- logger.info("sparql_select raw keys=%s", list(data.keys()))
- else:
- logger.info("sparql_select raw type=%s", type(data).__name__)
- # Some MCP servers return content as a list of TextContent items.
- if isinstance(data, list) and data:
- first = data[0]
- text = getattr(first, "text", None)
- if text:
- try:
- data = json.loads(text)
- if DEBUG_LOGS and isinstance(data, dict):
- logger.info("sparql_select decoded list->dict keys=%s", list(data.keys()))
- except Exception:
- if DEBUG_LOGS:
- logger.info("sparql_select could not decode list text as JSON")
- if isinstance(data, dict):
- bindings = data.get("results", {}).get("bindings", []) or []
- if DEBUG_LOGS:
- logger.info("sparql_select extracted bindings=%s", len(bindings) if bindings is not None else 0)
- return bindings
- return []
- return await _run()
- async def load_entity_by_subject(subject: str, endpoint: str, graph_iri: str = DEFAULT_GRAPH_IRI) -> Dict[str, Any] | None:
- needle = _escape((subject or "").strip())
- if not needle:
- return None
- label_query = f"""
- {PREFIXES}
- SELECT ?atlasId ?label ?type ?qid WHERE {{
- VALUES ?needle {{ "{needle}" }}
- GRAPH <{graph_iri}> {{
- ?entity a atlas:Entity ;
- atlas:atlasId ?atlasId ;
- atlas:canonicalLabel ?label .
- OPTIONAL {{ ?entity atlas:hasCanonicalType ?type . }}
- OPTIONAL {{ ?entity atlas:hasIdentifier ?ident . ?ident atlas:scheme "wikidata-qid" ; atlas:value ?qid . }}
- FILTER(LCASE(STR(?label)) = LCASE(STR(?needle)))
- }}
- }}
- LIMIT 1
- """.strip()
- alias_query = f"""
- {PREFIXES}
- SELECT ?atlasId ?label ?type ?qid ?alias WHERE {{
- VALUES ?needle {{ "{needle}" }}
- GRAPH <{graph_iri}> {{
- ?entity a atlas:Entity ;
- atlas:atlasId ?atlasId ;
- atlas:aliasLabel ?alias .
- OPTIONAL {{ ?entity atlas:canonicalLabel ?label . }}
- OPTIONAL {{ ?entity atlas:hasCanonicalType ?type . }}
- OPTIONAL {{ ?entity atlas:hasIdentifier ?ident . ?ident atlas:scheme "wikidata-qid" ; atlas:value ?qid . }}
- FILTER(LCASE(STR(?alias)) = LCASE(STR(?needle)))
- }}
- }}
- LIMIT 1
- """.strip()
- if DEBUG_LOGS:
- logger.info("store lookup by subject: needle=%s graph=%s", (subject or "").strip(), graph_iri)
- logger.info("store label query=%s", label_query)
- rows = await _sparql_select(endpoint, label_query)
- if not rows:
- if DEBUG_LOGS:
- logger.info("store alias query=%s", alias_query)
- rows = await _sparql_select(endpoint, alias_query)
- if DEBUG_LOGS:
- logger.info("store lookup rows=%s", len(rows) if rows else 0)
- if not rows:
- return None
- row = rows[0]
- type_value = row.get("type", {}).get("value")
- if type_value and type_value.startswith(ATLAS):
- type_value = f"atlas:{type_value.split('#', 1)[-1]}"
- return {
- "atlas_id": row.get("atlasId", {}).get("value"),
- "label": row.get("label", {}).get("value"),
- "type": type_value,
- "wikidata_id": row.get("qid", {}).get("value"),
- "alias": row.get("alias", {}).get("value"),
- }
- async def save_entity_minimal(entity: Entity, endpoint: str, graph_iri: str = DEFAULT_GRAPH_IRI) -> None:
- body = _build_insert_body(entity)
- query = f"""
- {PREFIXES}
- INSERT DATA {{
- GRAPH <{graph_iri}> {{
- {body}
- }}
- }}
- """.strip()
- await _sparql_update(endpoint, query)
- # ---------------------------------------------------------------------------
- # Save
- # ---------------------------------------------------------------------------
- def save_entity(entity: Entity, endpoint: str) -> None:
- """
- Insert all triples for an Entity into the SPARQL store.
- Uses SPARQL 1.1 INSERT DATA. Call delete_entity() first if you need
- to replace an existing entity (full replace pattern).
- Args:
- entity: The Entity to persist.
- endpoint: SPARQL Update endpoint URL,
- e.g. "http://localhost:8890/sparql-auth"
- """
- ttl_body = _build_insert_body(entity)
- query = f"{PREFIXES}\nINSERT DATA {{\n{ttl_body}\n}}"
- _sparql_update(endpoint, query)
- def delete_entity(atlas_id: str, endpoint: str) -> None:
- """
- Remove all triples where the entity or any of its blank/named nodes
- are the subject. Run before save_entity() for a clean replace.
- Args:
- atlas_id: e.g. "atlas:1b0e7222c7730540"
- endpoint: SPARQL Update endpoint URL.
- """
- entity_iri = f"<{ATLAS_D}entity_{atlas_id.replace('atlas:', '')}>"
- # Delete triples where entity is subject, plus all linked sub-nodes
- # (identifiers, claims, provenance, curate flag) via a SPARQL DELETE WHERE.
- query = f"""{PREFIXES}
- DELETE {{
- ?s ?p ?o .
- }}
- WHERE {{
- {{
- BIND({entity_iri} AS ?s)
- ?s ?p ?o .
- }}
- UNION
- {{
- {entity_iri} atlas:hasIdentifier ?s .
- ?s ?p ?o .
- }}
- UNION
- {{
- {entity_iri} atlas:hasClaim ?s .
- ?s ?p ?o .
- }}
- UNION
- {{
- {entity_iri} atlas:hasClaim ?claim .
- ?claim atlas:hasProvenance ?s .
- ?s ?p ?o .
- }}
- UNION
- {{
- {entity_iri} atlas:hasCurateFlag ?s .
- ?s ?p ?o .
- }}
- }}"""
- _sparql_update(endpoint, query)
- def _build_insert_body(entity: Entity) -> str:
- """Build the triple block (no INSERT DATA wrapper) for an Entity."""
- lines: List[str] = []
- e = f"<{_full_iri(entity._entity_iri())}>"
- # --- Entity core ---
- lines += [
- f" {e} a atlas:Entity ;",
- f' atlas:atlasId "{_escape(entity.id)}" ;',
- f' atlas:canonicalLabel "{_escape(entity.label)}"@en ;',
- ]
- if entity.description:
- lines.append(f' atlas:canonicalDescription "{_escape(entity.description)}"@en ;')
- if entity.type:
- lines.append(f' atlas:hasCanonicalType <{_full_iri(entity.type)}> ;')
- for alias in entity.aliases:
- lines.append(f' atlas:aliasLabel "{_escape(alias)}"@en ;')
- for ident in entity.identifiers:
- iiri = f"<{_full_iri(entity._identifier_iri(ident))}>"
- lines.append(f' atlas:hasIdentifier {iiri} ;')
- for key, val in entity.attributes.items():
- if isinstance(val, bool):
- lines.append(f' atlas:{key} "{str(val).lower()}"^^xsd:boolean ;')
- elif isinstance(val, float):
- lines.append(f' atlas:{key} "{val}"^^xsd:decimal ;')
- elif isinstance(val, int):
- lines.append(f' atlas:{key} "{val}"^^xsd:integer ;')
- else:
- lines.append(f' atlas:{key} "{_escape(str(val))}" ;')
- for blob in entity.raw_json:
- lines.append(f' atlas:rawJson "{_escape(blob)}"^^xsd:string ;')
- for claim in entity.claims:
- obj = claim.object_iri or claim.object_literal or ""
- cid = f"<{_full_iri(entity._claim_id(claim.predicate, obj))}>"
- lines.append(f' atlas:hasClaim {cid} ;')
- lines.append(f' atlas:needsCuration "{str(entity.needs_curation).lower()}"^^xsd:boolean')
- if entity.curate_flag:
- lines[-1] += " ;"
- curate_iri = f"<{_full_iri(entity._entity_iri())}_curate>"
- lines.append(f' atlas:hasCurateFlag {curate_iri}')
- lines[-1] += " ."
- # --- Identifier nodes ---
- for ident in entity.identifiers:
- iiri = f"<{_full_iri(entity._identifier_iri(ident))}>"
- lines += [
- f" {iiri} a atlas:Identifier ;",
- f' atlas:scheme "{_escape(ident.scheme)}" ;',
- f' atlas:value "{_escape(ident.value)}" .',
- ]
- # --- Claim + Provenance nodes ---
- for claim in entity.claims:
- obj = claim.object_iri or claim.object_literal or ""
- cid_str = entity._claim_id(claim.predicate, obj)
- cid = f"<{_full_iri(cid_str)}>"
- pid = f"<{_full_iri(entity._prov_id(cid_str))}>"
- pred_iri = f"<{_full_iri(claim.predicate)}>"
- lines += [
- f" {cid} a atlas:Claim ;",
- f' atlas:claimSubjectIri {e} ;',
- f' atlas:claimPredicate {pred_iri} ;',
- ]
- if claim.object_iri:
- lines.append(f' atlas:claimObjectIri <{_full_iri(claim.object_iri)}> ;')
- elif claim.object_literal:
- lines.append(f' atlas:claimObjectLiteral "{_escape(claim.object_literal)}" ;')
- lines += [
- f' atlas:claimLayer "{claim.layer}" ;',
- f' atlas:claimStatus "{claim.status}"',
- ]
- if claim.provenance:
- lines[-1] += " ;"
- lines.append(f' atlas:hasProvenance {pid}')
- lines[-1] += " ."
- if claim.provenance:
- p = claim.provenance
- lines += [
- f" {pid} a atlas:Provenance ;",
- f' atlas:provenanceSource "{_escape(p.source)}" ;',
- f' atlas:retrievalMethod "{_escape(p.method)}" ;',
- f' atlas:confidence "{p.confidence}"^^xsd:decimal ;',
- f' atlas:retrievedAt "{p.retrieved_at}"^^xsd:dateTime .',
- ]
- # --- CurateFlag node ---
- if entity.curate_flag:
- curate_iri = f"<{_full_iri(entity._entity_iri())}_curate>"
- lines += [
- f" {curate_iri} a atlas:CurateFlag ;",
- f' atlas:curationReason "{_escape(entity.curate_flag.reason)}"@en .',
- ]
- return "\n".join(lines)
- # ---------------------------------------------------------------------------
- # Load
- # ---------------------------------------------------------------------------
- def load_entity(atlas_id: str, endpoint: str) -> Optional[Entity]:
- """
- Reconstruct an Entity object from the SPARQL store.
- Returns None if the entity does not exist.
- Args:
- atlas_id: e.g. "atlas:1b0e7222c7730540"
- endpoint: SPARQL Query endpoint URL,
- e.g. "http://localhost:8890/sparql"
- """
- entity_iri = f"<{ATLAS_D}entity_{atlas_id.replace('atlas:', '')}>"
- # --- 1. Core entity fields ---
- core = _sparql_select(endpoint, f"""{PREFIXES}
- SELECT ?label ?description ?type ?needsCuration WHERE {{
- {entity_iri} atlas:canonicalLabel ?label .
- OPTIONAL {{ {entity_iri} atlas:canonicalDescription ?description . }}
- OPTIONAL {{ {entity_iri} atlas:hasCanonicalType ?type . }}
- OPTIONAL {{ {entity_iri} atlas:needsCuration ?needsCuration . }}
- }}""")
- if not core:
- return None
- row = core[0]
- label = row["label"]["value"]
- description = row.get("description", {}).get("value")
- type_full = row.get("type", {}).get("value")
- entity_type = _short_iri(type_full) if type_full else None
- needs_curation = row.get("needsCuration", {}).get("value", "false").lower() == "true"
- # --- 2. Aliases ---
- alias_rows = _sparql_select(endpoint, f"""{PREFIXES}
- SELECT ?alias WHERE {{
- {entity_iri} atlas:aliasLabel ?alias .
- }}""")
- aliases = [r["alias"]["value"] for r in alias_rows]
- # --- 3. Identifiers ---
- ident_rows = _sparql_select(endpoint, f"""{PREFIXES}
- SELECT ?scheme ?value WHERE {{
- {entity_iri} atlas:hasIdentifier ?ident .
- ?ident atlas:scheme ?scheme ;
- atlas:value ?value .
- }}""")
- identifiers = [
- Identifier(scheme=r["scheme"]["value"], value=r["value"]["value"])
- for r in ident_rows
- ]
- # --- 4. Attributes (any atlas: datatype property not covered above) ---
- KNOWN = {
- "atlasId", "canonicalLabel", "canonicalDescription",
- "aliasLabel", "needsCuration", "rawJson",
- }
- attr_rows = _sparql_select(endpoint, f"""{PREFIXES}
- SELECT ?pred ?val WHERE {{
- {entity_iri} ?pred ?val .
- FILTER(STRSTARTS(STR(?pred), "{ATLAS}"))
- FILTER(isLiteral(?val))
- }}""")
- attributes: Dict[str, Any] = {}
- raw_json: List[str] = []
- for r in attr_rows:
- pred_local = _local(r["pred"]["value"])
- if pred_local in KNOWN:
- continue
- val = r["val"]["value"]
- dt = r["val"].get("datatype", "")
- if pred_local == "rawJson":
- raw_json.append(val)
- elif dt.endswith("boolean"):
- attributes[pred_local] = val.lower() == "true"
- elif dt.endswith("decimal") or dt.endswith("float") or dt.endswith("double"):
- attributes[pred_local] = float(val)
- elif dt.endswith("integer") or dt.endswith("int"):
- attributes[pred_local] = int(val)
- else:
- attributes[pred_local] = val
- # --- 5. Claims + Provenance ---
- claim_rows = _sparql_select(endpoint, f"""{PREFIXES}
- SELECT ?pred ?objIri ?objLit ?layer ?status
- ?provSource ?provMethod ?provConf ?provAt
- WHERE {{
- {entity_iri} atlas:hasClaim ?claim .
- ?claim atlas:claimPredicate ?pred ;
- atlas:claimLayer ?layer ;
- atlas:claimStatus ?status .
- OPTIONAL {{ ?claim atlas:claimObjectIri ?objIri . }}
- OPTIONAL {{ ?claim atlas:claimObjectLiteral ?objLit . }}
- OPTIONAL {{
- ?claim atlas:hasProvenance ?prov .
- ?prov atlas:provenanceSource ?provSource ;
- atlas:retrievalMethod ?provMethod ;
- atlas:confidence ?provConf ;
- atlas:retrievedAt ?provAt .
- }}
- }}""")
- claims: List[Claim] = []
- for r in claim_rows:
- prov = None
- if "provSource" in r:
- prov = Provenance(
- source=r["provSource"]["value"],
- method=r["provMethod"]["value"],
- confidence=float(r["provConf"]["value"]),
- retrieved_at=r["provAt"]["value"],
- )
- claims.append(Claim(
- predicate=_short_iri(r["pred"]["value"]),
- object_iri=_short_iri(r["objIri"]["value"]) if "objIri" in r else None,
- object_literal=r["objLit"]["value"] if "objLit" in r else None,
- layer=r["layer"]["value"],
- status=r["status"]["value"],
- provenance=prov,
- ))
- # --- 6. CurateFlag ---
- curate_flag = None
- curate_rows = _sparql_select(endpoint, f"""{PREFIXES}
- SELECT ?reason WHERE {{
- {entity_iri} atlas:hasCurateFlag ?flag .
- ?flag atlas:curationReason ?reason .
- }}""")
- if curate_rows:
- curate_flag = CurateFlag(reason=curate_rows[0]["reason"]["value"])
- return Entity(
- id=atlas_id,
- label=label,
- description=description,
- type=entity_type,
- aliases=aliases,
- identifiers=identifiers,
- attributes=attributes,
- raw_json=raw_json,
- claims=claims,
- needs_curation=needs_curation,
- curate_flag=curate_flag,
- )
|