"""Serialize resolved Atlas entities to Turtle for inspection or write-path preparation.""" from __future__ import annotations from app.models import AtlasEntity, AtlasProvenance PREFIXES = """@prefix atlas: . @prefix atlas_data: . @prefix xsd: . @prefix rdfs: . """ def _safe_fragment(value: str) -> str: value = (value or "").strip().lower() out = [] for ch in value: if ch.isalnum() or ch in ["_", "-"]: out.append(ch) else: out.append("_") frag = "".join(out).strip("_") return frag or "entity" def _entity_node(entity: AtlasEntity) -> str: return f"atlas_data:entity_{_safe_fragment(entity.atlas_id)}" def _alias_node(alias_label: str) -> str: return f"atlas_data:alias_{_safe_fragment(alias_label)}" def _identifier_node(identifier_value: str) -> str: return f"atlas_data:ident_{_safe_fragment(identifier_value)}" def _provenance_node(source: str, retrieved_at: str | None, retrieval_method: str) -> str: parts = [source, retrieval_method, retrieved_at or ""] return f"atlas_data:prov_{_safe_fragment('_'.join(parts))}" def _type_assertion_node(entity: AtlasEntity, source: str) -> str: return f"atlas_data:typeassert_{_safe_fragment(entity.atlas_id)}_{_safe_fragment(source)}" def _literal(text: str) -> str: return text.replace("\\", "\\\\").replace('"', '\\"') def _identifier_type_resource(identifier_type: str) -> str: kind = _safe_fragment(identifier_type) if kind == "mid": return "atlas:Mid" if kind in {"qid", "wikidata_qid", "wikidataqid"}: return "atlas:WikidataQID" return f"atlas:{kind.capitalize()}" def _pick_provenance(entity: AtlasEntity, source_hint: str | None = None, method_hint: str | None = None) -> AtlasProvenance | None: if not entity.provenance: return None if method_hint: for p in entity.provenance: if p.retrieval_method == method_hint: return p if source_hint: for p in entity.provenance: if p.source == source_hint: return p return entity.provenance[0] def entity_to_turtle(entity: AtlasEntity) -> str: lines: list[str] = [PREFIXES] subject = _entity_node(entity) claim_nodes = [f"atlas_data:claim_ident_{_safe_fragment(i.value)}" for i in entity.identifiers] if entity.entity_type and entity.entity_type != "unknown": claim_nodes.append(f"atlas_data:claim_type_{_safe_fragment(entity.atlas_id)}") lines.append(f"{subject} a atlas:Entity ;") lines.append(f' atlas:canonicalLabel "{_literal(entity.canonical_label)}" ;') if entity.canonical_description: lines.append(f' atlas:canonicalDescription "{_literal(entity.canonical_description)}" ;') if entity.entity_type and entity.entity_type != "unknown": lines.append(f" atlas:hasCanonicalType atlas:{_safe_fragment(entity.entity_type).capitalize()} ;") for alias in entity.aliases: lines.append(f" atlas:hasAlias {_alias_node(alias.label)} ;") for ident in entity.identifiers: lines.append(f" atlas:hasIdentifier {_identifier_node(ident.value)} ;") for claim_node in claim_nodes: lines.append(f" atlas:hasClaim {claim_node} ;") lines.append(f" atlas:needsCuration {'true' if entity.needs_curation else 'false'} .") lines.append("") for alias in entity.aliases: alias_node = _alias_node(alias.label) lines.append(f"{alias_node} a atlas:Alias ;") lines.append(f' atlas:aliasLabel "{_literal(alias.label)}" ;') lines.append(f" atlas:resolvedTo {subject} .") lines.append("") for ident in entity.identifiers: ident_node = _identifier_node(ident.value) lines.append(f"{ident_node} a atlas:Identifier ;") lines.append(f' atlas:identifierValue "{_literal(ident.value)}" ;') lines.append(f' atlas:identifierSource "{_literal(ident.source)}" ;') lines.append(f" atlas:identifierType {_identifier_type_resource(ident.identifier_type)} ;") prov = _pick_provenance(entity, source_hint=ident.source) if prov: lines.append(f" atlas:hasIdentifierProvenance {_provenance_node(prov.source, prov.retrieved_at, prov.retrieval_method)} .") else: lines[-1] = lines[-1].rstrip(" ;") + " ." lines.append("") for prov in entity.provenance: prov_node = _provenance_node(prov.source, prov.retrieved_at, prov.retrieval_method) lines.append(f"{prov_node} a atlas:Provenance ;") lines.append(f' atlas:provenanceSource "{_literal(prov.source)}" ;') lines.append(f' atlas:retrievalMethod "{_literal(prov.retrieval_method)}" ;') lines.append(f' atlas:confidence "{prov.confidence}"^^xsd:decimal ;') if prov.retrieved_at: lines.append(f' atlas:retrievedAt "{_literal(prov.retrieved_at)}"^^xsd:dateTime .') else: lines[-1] = lines[-1].rstrip(" ;") + " ." lines.append("") wd = entity.raw_payload.get("wikidata") or {} if wd.get("status") == "ok": typeassert_node = _type_assertion_node(entity, "wikidata") lines.append(f"{typeassert_node} a atlas:TypeAssertion ;") lines.append(" atlas:assertedType atlas:WikidataType_Q5 ;") prov = _pick_provenance(entity, source_hint="wikidata") if prov: lines.append(f" atlas:hasAssertionProvenance {_provenance_node(prov.source, prov.retrieved_at, prov.retrieval_method)} ;") lines.append(' atlas:assertionReason "wikidata instance-of" .') lines.append("") if entity.entity_type and entity.entity_type != "unknown": typeassert_node = _type_assertion_node(entity, "canonical") lines.append(f"{typeassert_node} a atlas:TypeAssertion ;") lines.append(f" atlas:assertedType atlas:{_safe_fragment(entity.entity_type).capitalize()} ;") prov = _pick_provenance(entity, method_hint="type-classification") if prov: lines.append(f" atlas:hasAssertionProvenance {_provenance_node(prov.source, prov.retrieved_at, prov.retrieval_method)} ;") lines.append(' atlas:assertionReason "canonical type adjudication" .') lines.append("") # Claim nodes with explicit claim-object semantics for ident in entity.identifiers: claim_node = f"atlas_data:claim_ident_{_safe_fragment(ident.value)}" ident_node = _identifier_node(ident.value) prov = _pick_provenance(entity, source_hint=ident.source) lines.append(f"{claim_node} a atlas:Claim ;") lines.append(f" atlas:claimSubjectIri {subject} ;") lines.append(' atlas:claimPredicate "atlas:hasIdentifier" ;') lines.append(f" atlas:claimObjectIri {ident_node} ;") lines.append(' atlas:claimLayer "raw" ;') lines.append(' atlas:claimStatus "active" ;') if prov: lines.append(f" atlas:hasProvenance {_provenance_node(prov.source, prov.retrieved_at, prov.retrieval_method)} .") else: lines[-1] = lines[-1].rstrip(" ;") + " ." lines.append("") if entity.entity_type and entity.entity_type != "unknown": claim_node = f"atlas_data:claim_type_{_safe_fragment(entity.atlas_id)}" prov = _pick_provenance(entity, method_hint="type-classification") lines.append(f"{claim_node} a atlas:Claim ;") lines.append(f" atlas:claimSubjectIri {subject} ;") lines.append(' atlas:claimPredicate "atlas:hasCanonicalType" ;') lines.append(f" atlas:claimObjectIri atlas:{_safe_fragment(entity.entity_type).capitalize()} ;") lines.append(' atlas:claimLayer "derived" ;') lines.append(' atlas:claimStatus "active" ;') if prov: lines.append(f" atlas:hasProvenance {_provenance_node(prov.source, prov.retrieved_at, prov.retrieval_method)} .") else: lines[-1] = lines[-1].rstrip(" ;") + " ." lines.append("") return "\n".join(lines).strip() + "\n"