"""Serialize resolved Atlas entities to Turtle for inspection or write-path preparation.""" from __future__ import annotations import json from app.models import AtlasClaim, AtlasEntity PREFIXES = """@prefix atlas: . @prefix atlas_data: . @prefix xsd: . @prefix rdfs: . """ def _safe_fragment(value: str) -> str: value = (value or "").strip().lower() out = [] for ch in value: if ch.isalnum() or ch in ["_", "-"]: out.append(ch) else: out.append("_") frag = "".join(out).strip("_") return frag or "entity" def _entity_node(entity: AtlasEntity) -> str: return f"atlas_data:entity_{_safe_fragment(entity.atlas_id)}" def _alias_node(alias_label: str) -> str: return f"atlas_data:alias_{_safe_fragment(alias_label)}" def _claim_node(claim: AtlasClaim) -> str: hash_part = claim.claim_id.split("_", maxsplit=2)[-1] return f"atlas_data:Claim_{_safe_fragment(hash_part)}" def _provenance_node(claim: AtlasClaim) -> str: prov = claim.provenance if prov is None: return "" parts = [claim.claim_id, prov.source, prov.retrieval_method, prov.retrieved_at or ""] return f"atlas_data:prov_{_safe_fragment('_'.join(parts))}" def _literal(text: str) -> str: return text.replace("\\", "\\\\").replace('"', '\\"') def _claim_object_iri(claim: AtlasClaim) -> str | None: if claim.object.kind == "type": return claim.object.value if claim.object.kind == "identifier" and claim.object.id_type: return f"atlas_data:ident_{_safe_fragment(claim.object.id_type + '_' + claim.object.value)}" return None def entity_to_turtle(entity: AtlasEntity) -> str: lines: list[str] = [PREFIXES] subject = _entity_node(entity) lines.append(f"{subject} a atlas:Entity ;") lines.append(f' atlas:canonicalLabel "{_literal(entity.canonical_label)}" ;') if entity.canonical_description: lines.append(f' atlas:canonicalDescription "{_literal(entity.canonical_description)}" ;') if entity.entity_type and entity.entity_type != "unknown": lines.append(f" atlas:hasCanonicalType atlas:{_safe_fragment(entity.entity_type).capitalize()} ;") wd = entity.raw_payload.get("wikidata") if isinstance(entity.raw_payload, dict) else None if isinstance(wd, dict) and wd.get("status") == "ok": lines.append(f' atlas:rawWikidataJson "{_literal(json.dumps(wd, ensure_ascii=False))}"^^xsd:string ;') if isinstance(entity.raw_payload, dict): trends_payload = {k: v for k, v in entity.raw_payload.items() if k != "wikidata"} if trends_payload: lines.append(f' atlas:rawTrendsJson "{_literal(json.dumps(trends_payload, ensure_ascii=False))}"^^xsd:string ;') for alias in entity.aliases: lines.append(f" atlas:hasAlias {_alias_node(alias.label)} ;") for claim in entity.claims: lines.append(f" atlas:hasClaim {_claim_node(claim)} ;") lines.append(f" atlas:needsCuration {'true' if entity.needs_curation else 'false'} .") lines.append("") for alias in entity.aliases: alias_node = _alias_node(alias.label) lines.append(f"{alias_node} a atlas:Alias ;") lines.append(f' atlas:aliasLabel "{_literal(alias.label)}" ;') lines.append(f" atlas:resolvedTo {subject} .") lines.append("") # Materialize identifier resources from identifier claims. for claim in entity.claims: if claim.predicate != "atlas:hasIdentifier" or claim.object.kind != "identifier": continue ident_node = _claim_object_iri(claim) if not ident_node: continue id_type = claim.object.id_type or "unknown" id_type_iri = "atlas:Mid" if id_type == "mid" else ("atlas:WikidataQID" if id_type == "qid" else f"atlas:{_safe_fragment(id_type).capitalize()}") lines.append(f"{ident_node} a atlas:Identifier ;") lines.append(f' atlas:identifierValue "{_literal(claim.object.value)}" ;') lines.append(f' atlas:identifierType {id_type_iri} .') lines.append("") for claim in entity.claims: claim_node = _claim_node(claim) lines.append(f"{claim_node} a atlas:Claim ;") lines.append(f" atlas:claimSubjectIri {subject} ;") lines.append(f' atlas:claimPredicate "{_literal(claim.predicate)}" ;') obj_iri = _claim_object_iri(claim) if obj_iri: lines.append(f" atlas:claimObjectIri {obj_iri} ;") else: lines.append(f' atlas:claimObjectLiteral "{_literal(claim.object.value)}" ;') lines.append(f' atlas:claimLayer "{_literal(claim.layer)}" ;') lines.append(f' atlas:claimStatus "{_literal(claim.status)}" ;') prov_node = _provenance_node(claim) if prov_node: lines.append(f" atlas:hasProvenance {prov_node} .") else: lines[-1] = lines[-1].rstrip(" ;") + " ." lines.append("") if claim.provenance: prov = claim.provenance lines.append(f"{prov_node} a atlas:Provenance ;") lines.append(f' atlas:provenanceSource "{_literal(prov.source)}" ;') lines.append(f' atlas:retrievalMethod "{_literal(prov.retrieval_method)}" ;') lines.append(f' atlas:confidence "{prov.confidence}"^^xsd:decimal ;') if prov.retrieved_at: lines.append(f' atlas:retrievedAt "{_literal(prov.retrieved_at)}"^^xsd:dateTime .') else: lines[-1] = lines[-1].rstrip(" ;") + " ." lines.append("") return "\n".join(lines).strip() + "\n"