| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444 |
- from __future__ import annotations
- import hashlib
- import os
- import logging
- from dataclasses import dataclass
- from typing import Any
- import time
- import uuid
- import datetime
- import math
- from .atlas_model import Entity, Identifier
- from .atlas_store import load_entity_by_subject, save_entity_minimal
- from .wikidata import WikidataSearch
- ATLAS = "http://world.eu.org/atlas_ontology#"
- DEFAULT_ENDPOINT = os.getenv("ATLAS_VIRTUOSO_MCP_SSE_URL", "http://192.168.0.249:8501/mcp/sse")
- DEFAULT_UPDATE_ENDPOINT = os.getenv("ATLAS_VIRTUOSO_MCP_SSE_URL", DEFAULT_ENDPOINT)
- DEBUG_LOGS = os.getenv("ATLAS_DEBUG_LOGS", "false").lower() in {"1", "true", "yes", "on"}
- logger = logging.getLogger(__name__)
- def _hash_id(subject: str) -> str:
- return hashlib.sha1(subject.strip().lower().encode("utf-8")).hexdigest()[:16]
- def _entity_iri(atlas_id: str) -> str:
- return f"atlas_data:entity_{atlas_id}"
- async def _wikidata_lookup(subject: str, language: str = "en", limit: int = 1) -> list[dict[str, Any]]:
- search = WikidataSearch({"search": subject, "limit": limit, "language": language})
- result = await search.quick_resolve(subject, limit=limit)
- return result.get("results", []) or []
- def _candidate_text(subject: str, wd: dict[str, Any], hints: dict[str, Any] | None = None) -> str:
- hints = hints or {}
- aliases = hints.get("aliases") or []
- parts = [subject, wd.get("label") or "", wd.get("description") or "", " ".join(str(a) for a in aliases)]
- return " | ".join(part for part in parts if part)
- def _cosine_similarity(a: list[float] | None, b: list[float] | None) -> float:
- if not a or not b or len(a) != len(b):
- return 0.0
- dot = sum(x * y for x, y in zip(a, b))
- norm_a = math.sqrt(sum(x * x for x in a))
- norm_b = math.sqrt(sum(y * y for y in b))
- if not norm_a or not norm_b:
- return 0.0
- return dot / (norm_a * norm_b)
- def _infer_atlas_type(label: str | None, description: str | None) -> str:
- text = f"{label or ''} {description or ''}".lower()
- if any(k in text for k in ["president", "person", "singer", "composer", "human", "actor", "writer"]):
- return "atlas:Person"
- if any(k in text for k in ["city", "town", "village", "country", "state", "location", "place"]):
- return "atlas:Location"
- if any(k in text for k in ["company", "organization", "organisation", "institution", "foundation", "band"]):
- return "atlas:Organization"
- return "atlas:Other"
- def _score_wikidata_candidate(
- subject: str,
- wd: dict[str, Any],
- *,
- context: dict[str, Any] | None = None,
- hints: dict[str, Any] | None = None,
- use_embeddings: bool = False,
- subject_embedding: list[float] | None = None,
- candidate_embedding: list[float] | None = None,
- ) -> tuple[float, dict[str, float]]:
- context = context or {}
- hints = hints or {}
- score = 0.0
- breakdown: dict[str, float] = {}
- subject_norm = subject.strip().lower()
- label = (wd.get("label") or "").strip()
- description = (wd.get("description") or "").strip()
- label_norm = label.lower()
- description_norm = description.lower()
- if label_norm == subject_norm:
- score += 0.75
- breakdown["exact_label"] = 0.75
- elif subject_norm and subject_norm in label_norm:
- score += 0.45
- breakdown["partial_label"] = 0.45
- for alias in hints.get("aliases") or []:
- alias_norm = str(alias).strip().lower()
- if alias_norm and alias_norm == label_norm:
- score += 0.15
- breakdown["alias_match"] = 0.15
- break
- expected_type = (hints.get("expected_type") or "").strip().lower()
- inferred_type = _infer_atlas_type(label, description).lower()
- if expected_type and expected_type in inferred_type:
- score += 0.1
- breakdown["expected_type"] = 0.1
- realm = (context.get("realm") or "").strip().lower()
- if realm and realm in description_norm:
- score += 0.1
- breakdown["realm"] = 0.1
- if wd.get("id"):
- score += 0.05
- breakdown["has_qid"] = 0.05
- if use_embeddings:
- sim = _cosine_similarity(subject_embedding, candidate_embedding)
- if sim > 0:
- emb_score = max(0.0, min(0.25, sim * 0.25))
- score += emb_score
- breakdown["embedding_similarity"] = emb_score
- score = min(score, 0.99)
- return score, breakdown
- def _entity_from_wikidata(subject: str, wd: dict[str, Any]) -> Entity:
- atlas_id = _hash_id(subject)
- label = wd.get("label") or subject
- description = wd.get("description")
- qid = wd.get("id")
- entity_type = _infer_atlas_type(label, description)
- ent = Entity(
- id=atlas_id,
- label=label,
- description=description,
- type=entity_type,
- aliases=[subject] if subject.lower() != label.lower() else [],
- identifiers=[Identifier(scheme="wikidata-qid", value=qid)] if qid else [],
- needs_curation=True,
- )
- return ent
- def _flatten_exception_details(exc: BaseException) -> list[str]:
- parts = [f"{type(exc).__name__}: {exc}"]
- nested = getattr(exc, "exceptions", None)
- if nested:
- for sub in nested:
- parts.extend(_flatten_exception_details(sub))
- return parts
- async def _persist_entity(entity: Entity) -> None:
- await save_entity_minimal(entity, DEFAULT_UPDATE_ENDPOINT)
- async def _load_entity(subject: str) -> dict[str, Any] | None:
- return await load_entity_by_subject(subject, DEFAULT_ENDPOINT)
- def _required_confidence(mode: str, constraints: dict[str, Any]) -> float:
- requested = constraints.get("min_confidence")
- if requested is not None:
- return float(requested)
- if mode == "quick":
- return 0.55
- if mode in {"ranked", "hybrid", "llm_select"}:
- return 0.85
- if mode == "interactive":
- return 0.0
- return 0.5
- def _is_ambiguous_subject(subject: str, wd_candidates: list[dict[str, Any]]) -> bool:
- if len(wd_candidates) < 2:
- return False
- subject_norm = subject.strip().lower()
- labels = [(cand.get("label") or "").strip().lower() for cand in wd_candidates]
- exact_matches = sum(1 for label in labels if label == subject_norm)
- return exact_matches >= 2 or (exact_matches == 1 and any(label == subject_norm for label in labels[1:]))
- def _cache_can_satisfy(stored: dict[str, Any], mode: str, constraints: dict[str, Any]) -> bool:
- stored_confidence = float(stored.get("confidence") or 0.0)
- return stored_confidence >= _required_confidence(mode, constraints)
- def _debug_decision(
- *,
- mode: str,
- top_confidence: float,
- auto_accept_threshold: float,
- interactive_below_threshold: bool,
- required_confidence: float,
- used_cache: bool,
- cache_confidence: float | None = None,
- ) -> dict[str, Any]:
- return {
- "mode": mode,
- "top_confidence": top_confidence,
- "auto_accept_threshold": auto_accept_threshold,
- "interactive_below_threshold": interactive_below_threshold,
- "required_confidence": required_confidence,
- "used_cache": used_cache,
- "cache_confidence": cache_confidence,
- "decision": (
- "cache_hit"
- if used_cache
- else "resolved"
- if top_confidence >= auto_accept_threshold
- else "ambiguous_below_threshold"
- ),
- }
- @dataclass
- class ResolveService:
- load_entity_fn: Any = _load_entity
- wikidata_lookup_fn: Any = _wikidata_lookup
- persist_entity_fn: Any = _persist_entity
- async def resolve(self, *, subject: str, context: dict[str, Any] | None = None,
- constraints: dict[str, Any] | None = None,
- hints: dict[str, Any] | None = None,
- debug: dict[str, Any] | None = None,
- strategy: dict[str, Any] | None = None) -> dict[str, Any]:
- context = context or {}
- constraints = constraints or {}
- hints = hints or {}
- debug = debug or {}
- strategy = strategy or {}
- language = (context.get("language") or "en").strip() or "en"
- mode = (strategy.get("mode") or "quick").strip().lower() or "quick"
- use_embeddings = bool(strategy.get("use_embeddings", False))
- max_candidates = int(constraints.get("max_candidates") or 5)
- auto_accept_threshold = float(strategy.get("auto_accept_threshold") or 0.85)
- interactive_below_threshold = bool(strategy.get("interactive_below_threshold", True))
- required_confidence = _required_confidence(mode, constraints)
- try:
- request_id = str(uuid.uuid4())
- ts = datetime.datetime.now(datetime.timezone.utc).isoformat()
- start = time.time()
- subject = (subject or "").strip()
- if not subject:
- return {
- "status": "not_found",
- "entity": None,
- "confidence": 0.0,
- "candidates": [],
- "ambiguity": None,
- "resolution_path": [],
- "meta": {"request_id": request_id, "timestamp": ts, "duration_ms": 0},
- "error": None,
- }
- if DEBUG_LOGS:
- logger.info("resolve start subject=%s", subject)
- stored = await self.load_entity_fn(subject)
- if stored:
- if DEBUG_LOGS:
- logger.info("store hit subject=%s atlas_id=%s", subject, stored.get("atlas_id"))
- stored_confidence = float(stored.get("confidence") or (0.9 if not stored.get("needs_curation", False) else 0.6))
- if _cache_can_satisfy(stored, mode, constraints):
- return {
- "status": "resolved",
- "entity": {
- "id": stored.get("atlas_id"),
- "label": stored.get("label"),
- "type": stored.get("type"),
- "description": stored.get("description"),
- "source": None,
- "uri": None,
- "attributes": {},
- },
- "confidence": stored_confidence,
- "candidates": [],
- "ambiguity": None,
- "resolution_path": [
- {"phase": "cache", "action": "store_hit", "source": "triple_store"}
- ],
- "meta": {
- "request_id": request_id,
- "timestamp": ts,
- "duration_ms": int((time.time() - start) * 1000),
- **({"debug": _debug_decision(mode=mode, top_confidence=stored_confidence, auto_accept_threshold=auto_accept_threshold, interactive_below_threshold=interactive_below_threshold, required_confidence=required_confidence, used_cache=True, cache_confidence=stored_confidence)} if debug.get("include_explanations") else {}),
- },
- "error": None,
- }
- if DEBUG_LOGS:
- logger.info("cache confidence too low subject=%s mode=%s confidence=%.3f required=%.3f", subject, mode, stored_confidence, required_confidence)
- wd_candidates = await self.wikidata_lookup_fn(
- subject,
- language,
- 1 if mode == "quick" else max(1, min(max_candidates, 10)),
- )
- if not wd_candidates:
- if DEBUG_LOGS:
- logger.info("wikidata miss subject=%s mode=%s", subject, mode)
- return {
- "status": "not_found",
- "entity": {
- "id": None,
- "label": None,
- "type": None,
- "description": None,
- "source": None,
- "uri": None,
- "attributes": {},
- },
- "confidence": 0.0,
- "candidates": [],
- "ambiguity": None,
- "resolution_path": [
- {"phase": "query", "action": "wikidata_quick_resolve", "source": "remote"}
- ],
- "meta": {
- "request_id": request_id,
- "timestamp": ts,
- "duration_ms": int((time.time() - start) * 1000),
- **({"debug": _debug_decision(mode=mode, top_confidence=0.0, auto_accept_threshold=auto_accept_threshold, interactive_below_threshold=interactive_below_threshold, required_confidence=required_confidence, used_cache=False)} if debug.get("include_explanations") else {}),
- },
- "error": None,
- }
- ranked_candidates = []
- subject_embedding = None
- embedder = None
- if use_embeddings:
- embedder = WikidataSearch()
- subject_embedding = await embedder.embed_text(_candidate_text(subject, {"label": subject, "description": "", "aliases": []}, hints))
- for wd in wd_candidates:
- candidate_embedding = None
- if use_embeddings and embedder is not None:
- candidate_embedding = await embedder.embed_text(_candidate_text(subject, wd, hints))
- confidence, breakdown = _score_wikidata_candidate(
- subject,
- wd,
- context=context,
- hints=hints,
- use_embeddings=use_embeddings,
- subject_embedding=subject_embedding,
- candidate_embedding=candidate_embedding,
- )
- ranked_candidates.append({**wd, "confidence": confidence, "score_breakdown": breakdown})
- ranked_candidates.sort(key=lambda item: ((item.get("confidence") or 0.0), item.get("label") or ""), reverse=True)
- wd = ranked_candidates[0]
- entity = _entity_from_wikidata(subject, wd)
- if mode == "quick":
- wd["confidence"] = min(wd.get("confidence", 0.0), 0.6)
- if DEBUG_LOGS:
- logger.info(
- "wikidata hit subject=%s qid=%s atlas_id=%s type=%s",
- subject,
- wd.get("id"),
- entity.id,
- entity.type,
- )
- await self.persist_entity_fn(entity)
- resolution_path = [
- {"phase": "query", "action": "wikidata_quick_resolve", "source": "remote"},
- {"phase": "ranking", "action": f"mode_{mode}", "source": "resolver"},
- ]
- if use_embeddings:
- resolution_path.append(
- {
- "phase": "ranking",
- "action": "embedding_similarity",
- "source": "ollama",
- "note": "embedding similarity used to score candidate order",
- }
- )
- status = "ambiguous"
- ambiguity = {"reason": "pre-maintenance", "dimension": 0.5}
- if mode == "quick":
- status = "ambiguous"
- elif (wd.get("confidence") or 0.0) >= auto_accept_threshold:
- status = "resolved"
- ambiguity = None
- elif interactive_below_threshold:
- status = "ambiguous"
- return {
- "status": status,
- "entity": {
- "id": entity.id,
- "label": entity.label,
- "type": entity.type,
- "description": entity.description,
- "source": "wikidata",
- "uri": None,
- "attributes": {
- "wikidata_id": wd.get("id"),
- "alias": subject,
- },
- },
- "confidence": wd.get("confidence", 0.6),
- "candidates": [
- {
- "id": cand.get("id"),
- "label": cand.get("label"),
- "type": cand.get("type"),
- "source": "wikidata",
- "confidence": cand.get("confidence", 0.0),
- "score_breakdown": cand.get("score_breakdown", {}) if debug.get("include_explanations") else {},
- }
- for cand in ranked_candidates
- ] if mode in {"ranked", "interactive", "hybrid", "llm_select"} else [],
- "ambiguity": ambiguity,
- "resolution_path": resolution_path + [{"phase": "persistence", "action": "store_save_minimal", "source": "triple_store"}],
- "meta": {
- "request_id": request_id,
- "timestamp": ts,
- "duration_ms": int((time.time() - start) * 1000),
- **({"debug": _debug_decision(mode=mode, top_confidence=wd.get("confidence", 0.0), auto_accept_threshold=auto_accept_threshold, interactive_below_threshold=interactive_below_threshold, required_confidence=required_confidence, used_cache=False)} if debug.get("include_explanations") else {}),
- },
- "error": None,
- }
- except Exception as exc:
- detail = " | ".join(_flatten_exception_details(exc))
- return {
- "status": "error",
- "entity": None,
- "confidence": 0.0,
- "candidates": [],
- "ambiguity": None,
- "resolution_path": [],
- "meta": {
- "request_id": str(uuid.uuid4()),
- "timestamp": datetime.datetime.now(datetime.timezone.utc).isoformat(),
- "duration_ms": 0,
- },
- "error": {"code": "RESOLVE_FAILED", "message": detail},
- }
|