from __future__ import annotations import asyncio import hashlib import re from difflib import SequenceMatcher from typing import Any, Dict, List from urllib.parse import urlparse from news_mcp.config import NEWS_EMBEDDINGS_ENABLED, NEWS_EMBEDDING_SIMILARITY_THRESHOLD from news_mcp.dedup.embedding_support import cosine_similarity, ollama_embed from news_mcp.sources.news_feeds import normalize_topic_from_title # --------------------------------------------------------------------------- # Text helpers # --------------------------------------------------------------------------- def _normalize_title(title: str) -> str: t = title.lower().strip() t = re.sub(r"[^a-z0-9\s]", " ", t) t = re.sub(r"\s+", " ", t).strip() return t def _title_similarity(a: str, b: str) -> float: return SequenceMatcher(None, _normalize_title(a), _normalize_title(b)).ratio() def _article_key(article: Dict[str, Any]) -> str: url = str(article.get("url") or "").strip() if not url: return str(article.get("title") or "") try: parsed = urlparse(url) parts = [p for p in parsed.path.split("/") if p] if parts: return parts[-1] except Exception: pass return url def _cluster_text(a: Dict[str, Any]) -> str: parts = [a.get("title", ""), a.get("summary", "") or ""] return "\n".join(p for p in parts if p).strip() # --------------------------------------------------------------------------- # Token / Jaccard signal # --------------------------------------------------------------------------- _STOPWORDS = frozenset( { "a", "an", "the", "of", "to", "in", "on", "at", "for", "by", "with", "and", "or", "but", "if", "is", "are", "was", "were", "be", "been", "being", "as", "from", "that", "this", "these", "those", "it", "its", "into", "over", "under", "than", "then", "so", "such", "no", "not", "do", "does", "did", "will", "would", "can", "could", "should", "may", "might", "has", "have", "had", "after", "before", "amid", "vs", "via", "us", "uk", } ) def _tokens(text: str) -> set[str]: tokens = re.findall(r"[a-z0-9][a-z0-9\-]+", text.lower()) return {t for t in tokens if len(t) >= 3 and t not in _STOPWORDS} def _jaccard(a: set, b: set) -> float: if not a or not b: return 0.0 inter = len(a & b) if inter == 0: return 0.0 return inter / len(a | b) # --------------------------------------------------------------------------- # Composite similarity # --------------------------------------------------------------------------- DEFAULT_TITLE_THRESHOLD = 0.87 DEFAULT_JACCARD_THRESHOLD = 0.55 def _signals(article: Dict[str, Any], cluster: Dict[str, Any]) -> dict: """Per-pair similarity signals (title, jaccard, embedding cosine).""" a_title = str(article.get("title") or "") c_title = str(cluster.get("headline") or "") title_sim = _title_similarity(a_title, c_title) if a_title and c_title else 0.0 a_text = _cluster_text(article) c_text_seed = (cluster.get("articles") or [{}])[0] c_text = _cluster_text(c_text_seed) if c_text_seed else c_title jaccard = _jaccard(_tokens(a_text), _tokens(c_text)) if a_text and c_text else 0.0 a_emb = article.get("_embedding") c_emb = cluster.get("embedding") cosine = cosine_similarity(a_emb, c_emb) if a_emb and c_emb else 0.0 return {"title": title_sim, "jaccard": jaccard, "cosine": cosine} def _is_match(signals: dict, *, embeddings_enabled: bool) -> tuple[bool, str, float]: """Decide whether two items should merge based on the strongest signal.""" cosine_threshold = NEWS_EMBEDDING_SIMILARITY_THRESHOLD if embeddings_enabled and signals["cosine"] >= cosine_threshold: return True, "cosine", signals["cosine"] if signals["title"] >= DEFAULT_TITLE_THRESHOLD: return True, "title", signals["title"] if signals["jaccard"] >= DEFAULT_JACCARD_THRESHOLD: return True, "jaccard", signals["jaccard"] return False, "none", 0.0 # --------------------------------------------------------------------------- # Embedding pre-computation (async internally) # --------------------------------------------------------------------------- async def _compute_embeddings_concurrently( articles: List[Dict[str, Any]], ) -> Dict[str, list[float] | None]: """Compute embeddings for unique article texts concurrently. Returns a cache dict: text -> embedding vector or None. """ unique_texts: list[str] = [] seen: set[str] = set() for a in articles: text = _cluster_text(a) if text and text not in seen: seen.add(text) unique_texts.append(text) emb_tasks = [ollama_embed(text) for text in unique_texts] emb_results = await asyncio.gather(*emb_tasks, return_exceptions=True) cache: Dict[str, list[float] | None] = {} for text, result in zip(unique_texts, emb_results): if isinstance(result, list): cache[text] = result else: cache[text] = None return cache def _compute_embeddings_sync( articles: List[Dict[str, Any]], ) -> Dict[str, list[float] | None]: """Synchronous wrapper that runs the async embedding computation. Handles three cases: 1. Already inside an async event loop (called from poller) -> schedule as a task and run it to completion on the running loop. 2. No event loop at all (plain sync caller) -> use asyncio.run(). """ try: loop = asyncio.get_running_loop() except RuntimeError: # No running loop — safe to use asyncio.run() return asyncio.run(_compute_embeddings_concurrently(articles)) # We're inside a running event loop (e.g. the poller). Create a new loop # in a thread to avoid blocking. import concurrent.futures with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool: future = pool.submit( asyncio.run, _compute_embeddings_concurrently(articles) ) return future.result() # --------------------------------------------------------------------------- # Public API (sync — backward compatible with tests) # --------------------------------------------------------------------------- def dedup_and_cluster_articles( articles: List[Dict[str, Any]], similarity_threshold: float | None = None, ) -> Dict[str, List[Dict[str, Any]]]: """Deduplicate raw articles into clusters keyed by topic. v1.2: embedding pre-computation is async/concurrent under the hood, but this public function remains synchronous for backward compatibility. A pair merges if ANY signal clears its threshold: * title fuzzy ratio * token Jaccard over headline+summary * Ollama embedding cosine when available """ title_threshold = similarity_threshold if similarity_threshold is not None else DEFAULT_TITLE_THRESHOLD # Pre-compute embeddings concurrently (sync boundary handles async internally) embedding_cache: Dict[str, list[float] | None] = {} if NEWS_EMBEDDINGS_ENABLED: embedding_cache = _compute_embeddings_sync(articles) by_topic: Dict[str, List[Dict[str, Any]]] = {} for a in articles: title = a.get("title") or "" if not title: continue topic = normalize_topic_from_title(title) article_text = _cluster_text(a) article_embedding = embedding_cache.get(article_text) if NEWS_EMBEDDINGS_ENABLED else None a_with_emb = dict(a) if article_embedding is not None: a_with_emb["_embedding"] = article_embedding by_topic.setdefault(topic, []) clusters = by_topic[topic] best_idx: int | None = None best_signal_name = "none" best_signal_value = 0.0 for idx, c in enumerate(clusters): sigs = _signals(a_with_emb, c) local_match = False if NEWS_EMBEDDINGS_ENABLED and sigs["cosine"] >= NEWS_EMBEDDING_SIMILARITY_THRESHOLD: local_match = True signal_name, signal_value = "cosine", sigs["cosine"] elif sigs["title"] >= title_threshold: local_match = True signal_name, signal_value = "title", sigs["title"] elif sigs["jaccard"] >= DEFAULT_JACCARD_THRESHOLD: local_match = True signal_name, signal_value = "jaccard", sigs["jaccard"] elif ( NEWS_EMBEDDINGS_ENABLED and sigs["cosine"] >= 0.80 and (sigs["jaccard"] >= 0.30 or sigs["title"] >= 0.55) ): local_match = True signal_name = "consensus" signal_value = (sigs["cosine"] + max(sigs["jaccard"], sigs["title"])) / 2.0 else: signal_name, signal_value = "none", max(sigs["title"], sigs["jaccard"], sigs["cosine"]) if local_match and signal_value > best_signal_value: best_idx = idx best_signal_name = signal_name best_signal_value = signal_value if best_idx is not None: c = clusters[best_idx] existing_keys = {_article_key(x) for x in c.get("articles", []) or []} if _article_key(a) not in existing_keys: c["articles"].append(a) if a.get("source") and a["source"] not in c["sources"]: c["sources"].append(a["source"]) c["last_updated"] = max(str(c.get("last_updated", "")), str(a.get("timestamp", ""))) c.setdefault("_merge_signals", []).append( {"signal": best_signal_name, "value": round(best_signal_value, 3)} ) else: key = f"{topic}|{_normalize_title(title)}" cid = hashlib.sha1(key.encode("utf-8")).hexdigest() cluster_embedding = article_embedding if NEWS_EMBEDDINGS_ENABLED else None clusters.append( { "cluster_id": cid, "headline": title, "summary": a.get("summary", ""), "topic": topic, "entities": [], "sentiment": "neutral", "importance": 0.0, "sources": [a["source"]] if a.get("source") else [], "timestamp": a.get("timestamp"), "articles": [a], "first_seen": a.get("timestamp"), "last_updated": a.get("timestamp"), "embedding": cluster_embedding, "embedding_model": "ollama:nomic-embed-text" if cluster_embedding else None, } ) # Strip the internal merge audit trail before returning for clusters in by_topic.values(): for c in clusters: c.pop("_merge_signals", None) return {topic: clusters for topic, clusters in by_topic.items()}