from __future__ import annotations import hashlib import os import logging from dataclasses import dataclass from typing import Any import time import uuid import datetime import math from .atlas_model import Entity, Identifier from .atlas_store import load_entity_by_subject, save_entity_minimal from .wikidata import WikidataSearch ATLAS = "http://world.eu.org/atlas_ontology#" DEFAULT_ENDPOINT = os.getenv("ATLAS_VIRTUOSO_MCP_SSE_URL", "http://192.168.0.249:8501/mcp/sse") DEFAULT_UPDATE_ENDPOINT = os.getenv("ATLAS_VIRTUOSO_MCP_SSE_URL", DEFAULT_ENDPOINT) DEBUG_LOGS = os.getenv("ATLAS_DEBUG_LOGS", "false").lower() in {"1", "true", "yes", "on"} logger = logging.getLogger(__name__) def _hash_id(subject: str) -> str: return hashlib.sha1(subject.strip().lower().encode("utf-8")).hexdigest()[:16] def _entity_iri(atlas_id: str) -> str: return f"atlas_data:entity_{atlas_id}" async def _wikidata_lookup(subject: str, language: str = "en", limit: int = 1) -> list[dict[str, Any]]: search = WikidataSearch({"search": subject, "limit": limit, "language": language}) result = await search.quick_resolve(subject, limit=limit) return result.get("results", []) or [] def _candidate_text(subject: str, wd: dict[str, Any], hints: dict[str, Any] | None = None) -> str: hints = hints or {} aliases = hints.get("aliases") or [] parts = [subject, wd.get("label") or "", wd.get("description") or "", " ".join(str(a) for a in aliases)] return " | ".join(part for part in parts if part) def _cosine_similarity(a: list[float] | None, b: list[float] | None) -> float: if not a or not b or len(a) != len(b): return 0.0 dot = sum(x * y for x, y in zip(a, b)) norm_a = math.sqrt(sum(x * x for x in a)) norm_b = math.sqrt(sum(y * y for y in b)) if not norm_a or not norm_b: return 0.0 return dot / (norm_a * norm_b) def _infer_atlas_type(label: str | None, description: str | None) -> str: text = f"{label or ''} {description or ''}".lower() if any(k in text for k in ["president", "person", "singer", "composer", "human", "actor", "writer"]): return "atlas:Person" if any(k in text for k in ["city", "town", "village", "country", "state", "location", "place"]): return "atlas:Location" if any(k in text for k in ["company", "organization", "organisation", "institution", "foundation", "band"]): return "atlas:Organization" return "atlas:Other" def _score_wikidata_candidate( subject: str, wd: dict[str, Any], *, context: dict[str, Any] | None = None, hints: dict[str, Any] | None = None, use_embeddings: bool = False, subject_embedding: list[float] | None = None, candidate_embedding: list[float] | None = None, ) -> tuple[float, dict[str, float]]: context = context or {} hints = hints or {} score = 0.0 breakdown: dict[str, float] = {} subject_norm = subject.strip().lower() label = (wd.get("label") or "").strip() description = (wd.get("description") or "").strip() label_norm = label.lower() description_norm = description.lower() if label_norm == subject_norm: score += 0.75 breakdown["exact_label"] = 0.75 elif subject_norm and subject_norm in label_norm: score += 0.45 breakdown["partial_label"] = 0.45 for alias in hints.get("aliases") or []: alias_norm = str(alias).strip().lower() if alias_norm and alias_norm == label_norm: score += 0.15 breakdown["alias_match"] = 0.15 break expected_type = (hints.get("expected_type") or "").strip().lower() inferred_type = _infer_atlas_type(label, description).lower() if expected_type and expected_type in inferred_type: score += 0.1 breakdown["expected_type"] = 0.1 realm = (context.get("realm") or "").strip().lower() if realm and realm in description_norm: score += 0.1 breakdown["realm"] = 0.1 if wd.get("id"): score += 0.05 breakdown["has_qid"] = 0.05 if use_embeddings: sim = _cosine_similarity(subject_embedding, candidate_embedding) if sim > 0: emb_score = max(0.0, min(0.25, sim * 0.25)) score += emb_score breakdown["embedding_similarity"] = emb_score score = min(score, 0.99) return score, breakdown def _entity_from_wikidata(subject: str, wd: dict[str, Any]) -> Entity: atlas_id = _hash_id(subject) label = wd.get("label") or subject description = wd.get("description") qid = wd.get("id") entity_type = _infer_atlas_type(label, description) ent = Entity( id=atlas_id, label=label, description=description, type=entity_type, aliases=[subject] if subject.lower() != label.lower() else [], identifiers=[Identifier(scheme="wikidata-qid", value=qid)] if qid else [], needs_curation=True, ) return ent def _flatten_exception_details(exc: BaseException) -> list[str]: parts = [f"{type(exc).__name__}: {exc}"] nested = getattr(exc, "exceptions", None) if nested: for sub in nested: parts.extend(_flatten_exception_details(sub)) return parts async def _persist_entity(entity: Entity) -> None: await save_entity_minimal(entity, DEFAULT_UPDATE_ENDPOINT) async def _load_entity(subject: str) -> dict[str, Any] | None: return await load_entity_by_subject(subject, DEFAULT_ENDPOINT) def _required_confidence(mode: str, constraints: dict[str, Any]) -> float: requested = constraints.get("min_confidence") if requested is not None: return float(requested) if mode == "quick": return 0.55 if mode in {"ranked", "hybrid", "llm_select"}: return 0.85 if mode == "interactive": return 0.0 return 0.5 def _is_ambiguous_subject(subject: str, wd_candidates: list[dict[str, Any]]) -> bool: if len(wd_candidates) < 2: return False subject_norm = subject.strip().lower() labels = [(cand.get("label") or "").strip().lower() for cand in wd_candidates] exact_matches = sum(1 for label in labels if label == subject_norm) return exact_matches >= 2 or (exact_matches == 1 and any(label == subject_norm for label in labels[1:])) def _cache_can_satisfy(stored: dict[str, Any], mode: str, constraints: dict[str, Any]) -> bool: stored_confidence = float(stored.get("confidence") or 0.0) return stored_confidence >= _required_confidence(mode, constraints) def _debug_decision( *, mode: str, top_confidence: float, auto_accept_threshold: float, interactive_below_threshold: bool, required_confidence: float, used_cache: bool, cache_confidence: float | None = None, ) -> dict[str, Any]: return { "mode": mode, "top_confidence": top_confidence, "auto_accept_threshold": auto_accept_threshold, "interactive_below_threshold": interactive_below_threshold, "required_confidence": required_confidence, "used_cache": used_cache, "cache_confidence": cache_confidence, "decision": ( "cache_hit" if used_cache else "resolved" if top_confidence >= auto_accept_threshold else "ambiguous_below_threshold" ), } @dataclass class ResolveService: load_entity_fn: Any = _load_entity wikidata_lookup_fn: Any = _wikidata_lookup persist_entity_fn: Any = _persist_entity async def resolve(self, *, subject: str, context: dict[str, Any] | None = None, constraints: dict[str, Any] | None = None, hints: dict[str, Any] | None = None, debug: dict[str, Any] | None = None, strategy: dict[str, Any] | None = None) -> dict[str, Any]: context = context or {} constraints = constraints or {} hints = hints or {} debug = debug or {} strategy = strategy or {} language = (context.get("language") or "en").strip() or "en" mode = (strategy.get("mode") or "quick").strip().lower() or "quick" use_embeddings = bool(strategy.get("use_embeddings", False)) max_candidates = int(constraints.get("max_candidates") or 5) auto_accept_threshold = float(strategy.get("auto_accept_threshold") or 0.85) interactive_below_threshold = bool(strategy.get("interactive_below_threshold", True)) required_confidence = _required_confidence(mode, constraints) try: request_id = str(uuid.uuid4()) ts = datetime.datetime.now(datetime.timezone.utc).isoformat() start = time.time() subject = (subject or "").strip() if not subject: return { "status": "not_found", "entity": None, "confidence": 0.0, "candidates": [], "ambiguity": None, "resolution_path": [], "meta": {"request_id": request_id, "timestamp": ts, "duration_ms": 0}, "error": None, } if DEBUG_LOGS: logger.info("resolve start subject=%s", subject) stored = await self.load_entity_fn(subject) if stored: if DEBUG_LOGS: logger.info("store hit subject=%s atlas_id=%s", subject, stored.get("atlas_id")) stored_confidence = float(stored.get("confidence") or (0.9 if not stored.get("needs_curation", False) else 0.6)) if _cache_can_satisfy(stored, mode, constraints): return { "status": "resolved", "entity": { "id": stored.get("atlas_id"), "label": stored.get("label"), "type": stored.get("type"), "description": stored.get("description"), "source": None, "uri": None, "attributes": {}, }, "confidence": stored_confidence, "candidates": [], "ambiguity": None, "resolution_path": [ {"phase": "cache", "action": "store_hit", "source": "triple_store"} ], "meta": { "request_id": request_id, "timestamp": ts, "duration_ms": int((time.time() - start) * 1000), **({"debug": _debug_decision(mode=mode, top_confidence=stored_confidence, auto_accept_threshold=auto_accept_threshold, interactive_below_threshold=interactive_below_threshold, required_confidence=required_confidence, used_cache=True, cache_confidence=stored_confidence)} if debug.get("include_explanations") else {}), }, "error": None, } if DEBUG_LOGS: logger.info("cache confidence too low subject=%s mode=%s confidence=%.3f required=%.3f", subject, mode, stored_confidence, required_confidence) wd_candidates = await self.wikidata_lookup_fn( subject, language, 1 if mode == "quick" else max(1, min(max_candidates, 10)), ) if not wd_candidates: if DEBUG_LOGS: logger.info("wikidata miss subject=%s mode=%s", subject, mode) return { "status": "not_found", "entity": { "id": None, "label": None, "type": None, "description": None, "source": None, "uri": None, "attributes": {}, }, "confidence": 0.0, "candidates": [], "ambiguity": None, "resolution_path": [ {"phase": "query", "action": "wikidata_quick_resolve", "source": "remote"} ], "meta": { "request_id": request_id, "timestamp": ts, "duration_ms": int((time.time() - start) * 1000), **({"debug": _debug_decision(mode=mode, top_confidence=0.0, auto_accept_threshold=auto_accept_threshold, interactive_below_threshold=interactive_below_threshold, required_confidence=required_confidence, used_cache=False)} if debug.get("include_explanations") else {}), }, "error": None, } ranked_candidates = [] subject_embedding = None embedder = None if use_embeddings: embedder = WikidataSearch() subject_embedding = await embedder.embed_text(_candidate_text(subject, {"label": subject, "description": "", "aliases": []}, hints)) for wd in wd_candidates: candidate_embedding = None if use_embeddings and embedder is not None: candidate_embedding = await embedder.embed_text(_candidate_text(subject, wd, hints)) confidence, breakdown = _score_wikidata_candidate( subject, wd, context=context, hints=hints, use_embeddings=use_embeddings, subject_embedding=subject_embedding, candidate_embedding=candidate_embedding, ) ranked_candidates.append({**wd, "confidence": confidence, "score_breakdown": breakdown}) ranked_candidates.sort(key=lambda item: ((item.get("confidence") or 0.0), item.get("label") or ""), reverse=True) wd = ranked_candidates[0] entity = _entity_from_wikidata(subject, wd) if mode == "quick": wd["confidence"] = min(wd.get("confidence", 0.0), 0.6) if DEBUG_LOGS: logger.info( "wikidata hit subject=%s qid=%s atlas_id=%s type=%s", subject, wd.get("id"), entity.id, entity.type, ) await self.persist_entity_fn(entity) resolution_path = [ {"phase": "query", "action": "wikidata_quick_resolve", "source": "remote"}, {"phase": "ranking", "action": f"mode_{mode}", "source": "resolver"}, ] if use_embeddings: resolution_path.append( { "phase": "ranking", "action": "embedding_similarity", "source": "ollama", "note": "embedding similarity used to score candidate order", } ) status = "ambiguous" ambiguity = {"reason": "pre-maintenance", "dimension": 0.5} if mode == "quick": status = "ambiguous" elif (wd.get("confidence") or 0.0) >= auto_accept_threshold: status = "resolved" ambiguity = None elif interactive_below_threshold: status = "ambiguous" return { "status": status, "entity": { "id": entity.id, "label": entity.label, "type": entity.type, "description": entity.description, "source": "wikidata", "uri": None, "attributes": { "wikidata_id": wd.get("id"), "alias": subject, }, }, "confidence": wd.get("confidence", 0.6), "candidates": [ { "id": cand.get("id"), "label": cand.get("label"), "type": cand.get("type"), "source": "wikidata", "confidence": cand.get("confidence", 0.0), "score_breakdown": cand.get("score_breakdown", {}) if debug.get("include_explanations") else {}, } for cand in ranked_candidates ] if mode in {"ranked", "interactive", "hybrid", "llm_select"} else [], "ambiguity": ambiguity, "resolution_path": resolution_path + [{"phase": "persistence", "action": "store_save_minimal", "source": "triple_store"}], "meta": { "request_id": request_id, "timestamp": ts, "duration_ms": int((time.time() - start) * 1000), **({"debug": _debug_decision(mode=mode, top_confidence=wd.get("confidence", 0.0), auto_accept_threshold=auto_accept_threshold, interactive_below_threshold=interactive_below_threshold, required_confidence=required_confidence, used_cache=False)} if debug.get("include_explanations") else {}), }, "error": None, } except Exception as exc: detail = " | ".join(_flatten_exception_details(exc)) return { "status": "error", "entity": None, "confidence": 0.0, "candidates": [], "ambiguity": None, "resolution_path": [], "meta": { "request_id": str(uuid.uuid4()), "timestamp": datetime.datetime.now(datetime.timezone.utc).isoformat(), "duration_ms": 0, }, "error": {"code": "RESOLVE_FAILED", "message": detail}, }