from __future__ import annotations import hashlib import re from difflib import SequenceMatcher from typing import Any, Dict, List from urllib.parse import urlparse from news_mcp.config import NEWS_EMBEDDINGS_ENABLED, NEWS_EMBEDDING_SIMILARITY_THRESHOLD from news_mcp.dedup.embedding_support import cosine_similarity, ollama_embed from news_mcp.sources.news_feeds import normalize_topic_from_title # --------------------------------------------------------------------------- # Text helpers # --------------------------------------------------------------------------- def _normalize_title(title: str) -> str: t = title.lower().strip() # Remove punctuation-ish characters for similarity scoring. t = re.sub(r"[^a-z0-9\s]", " ", t) t = re.sub(r"\s+", " ", t).strip() return t def _title_similarity(a: str, b: str) -> float: return SequenceMatcher(None, _normalize_title(a), _normalize_title(b)).ratio() def _article_key(article: Dict[str, Any]) -> str: url = str(article.get("url") or "").strip() if not url: return str(article.get("title") or "") try: parsed = urlparse(url) parts = [p for p in parsed.path.split("/") if p] if parts: return parts[-1] except Exception: pass return url def _cluster_text(a: Dict[str, Any]) -> str: parts = [a.get("title", ""), a.get("summary", "") or ""] return "\n".join(p for p in parts if p).strip() # --------------------------------------------------------------------------- # Token / Jaccard signal (used as a fallback alongside title similarity when # embeddings are unavailable, and as a soft signal even when they are). # --------------------------------------------------------------------------- # Tiny stop-word set — we keep it small on purpose because the corpus is news # headlines, where every additional removal risks losing genuine signal. _STOPWORDS = frozenset( { "a", "an", "the", "of", "to", "in", "on", "at", "for", "by", "with", "and", "or", "but", "if", "is", "are", "was", "were", "be", "been", "being", "as", "from", "that", "this", "these", "those", "it", "its", "into", "over", "under", "than", "then", "so", "such", "no", "not", "do", "does", "did", "will", "would", "can", "could", "should", "may", "might", "has", "have", "had", "after", "before", "amid", "vs", "via", "us", "uk", } ) def _tokens(text: str) -> set[str]: """Lowercase content tokens, stop-words removed, length>=3.""" tokens = re.findall(r"[a-z0-9][a-z0-9\-]+", text.lower()) return {t for t in tokens if len(t) >= 3 and t not in _STOPWORDS} def _jaccard(a: set, b: set) -> float: if not a or not b: return 0.0 inter = len(a & b) if inter == 0: return 0.0 return inter / len(a | b) # --------------------------------------------------------------------------- # Composite similarity # --------------------------------------------------------------------------- # Each signal has its own threshold. We accept a merge if ANY signal clears its # threshold, which makes clustering robust when one signal happens to be weak # (short headlines kill SequenceMatcher; single-word stories kill Jaccard; # Ollama outages kill cosine similarity). DEFAULT_TITLE_THRESHOLD = 0.87 DEFAULT_JACCARD_THRESHOLD = 0.55 def _signals(article: Dict[str, Any], cluster: Dict[str, Any]) -> dict: """Per-pair similarity signals (title, jaccard, embedding cosine). Embedding cosine is only computed when both sides have a vector; we never block on a fresh Ollama request here — that's the caller's job, so this function stays pure and easy to test. """ a_title = str(article.get("title") or "") c_title = str(cluster.get("headline") or "") title_sim = _title_similarity(a_title, c_title) if a_title and c_title else 0.0 a_text = _cluster_text(article) c_text_seed = (cluster.get("articles") or [{}])[0] c_text = _cluster_text(c_text_seed) if c_text_seed else c_title jaccard = _jaccard(_tokens(a_text), _tokens(c_text)) if a_text and c_text else 0.0 a_emb = article.get("_embedding") c_emb = cluster.get("embedding") cosine = cosine_similarity(a_emb, c_emb) if a_emb and c_emb else 0.0 return {"title": title_sim, "jaccard": jaccard, "cosine": cosine} def _is_match(signals: dict, *, embeddings_enabled: bool) -> tuple[bool, str, float]: """Decide whether two items should merge based on the strongest signal. Returns (matched, signal_name, signal_value). The signal_name lets callers log *why* something merged, which is huge for debugging clustering quality. """ cosine_threshold = NEWS_EMBEDDING_SIMILARITY_THRESHOLD if embeddings_enabled and signals["cosine"] >= cosine_threshold: return True, "cosine", signals["cosine"] if signals["title"] >= DEFAULT_TITLE_THRESHOLD: return True, "title", signals["title"] if signals["jaccard"] >= DEFAULT_JACCARD_THRESHOLD: return True, "jaccard", signals["jaccard"] return False, "none", 0.0 # --------------------------------------------------------------------------- # Public API # --------------------------------------------------------------------------- def dedup_and_cluster_articles( articles: List[Dict[str, Any]], similarity_threshold: float | None = None, ) -> Dict[str, List[Dict[str, Any]]]: """Deduplicate raw articles into clusters keyed by topic. v1.1 strategy: composite similarity. * title fuzzy ratio * token Jaccard over headline+summary (cheap, surprisingly resilient when titles are reworded heavily across outlets) * Ollama embedding cosine when available A pair merges if ANY signal clears its threshold. Falling back through multiple signals means a transient Ollama outage doesn't collapse the server back into title-only clustering, and a heavily-reworded headline can still merge via Jaccard or embeddings. The ``similarity_threshold`` argument is kept for backward compatibility with the test suite. When provided, it overrides the title threshold. """ title_threshold = similarity_threshold if similarity_threshold is not None else DEFAULT_TITLE_THRESHOLD by_topic: Dict[str, List[Dict[str, Any]]] = {} embedding_cache: Dict[str, list[float] | None] = {} def _embedding_for_text(text: str) -> list[float] | None: if not NEWS_EMBEDDINGS_ENABLED or not text: return None if text in embedding_cache: return embedding_cache[text] emb = ollama_embed(text) # Cache None too so a single failure doesn't trigger repeated retries # within one ingestion cycle. The next refresh call clears this map. embedding_cache[text] = emb return emb for a in articles: title = a.get("title") or "" if not title: continue topic = normalize_topic_from_title(title) article_text = _cluster_text(a) article_embedding = _embedding_for_text(article_text) # Attach embedding on the article dict so _signals() can read it # without re-computing. a_with_emb = dict(a) if article_embedding is not None: a_with_emb["_embedding"] = article_embedding by_topic.setdefault(topic, []) clusters = by_topic[topic] best_idx: int | None = None best_signal_name = "none" best_signal_value = 0.0 for idx, c in enumerate(clusters): sigs = _signals(a_with_emb, c) # Use the title threshold the caller explicitly passed (test override) # but otherwise rely on the module defaults. local_match = False if NEWS_EMBEDDINGS_ENABLED and sigs["cosine"] >= NEWS_EMBEDDING_SIMILARITY_THRESHOLD: local_match = True signal_name, signal_value = "cosine", sigs["cosine"] elif sigs["title"] >= title_threshold: local_match = True signal_name, signal_value = "title", sigs["title"] elif sigs["jaccard"] >= DEFAULT_JACCARD_THRESHOLD: local_match = True signal_name, signal_value = "jaccard", sigs["jaccard"] # Consensus rule: when no single signal clears its strict threshold # but two of them are simultaneously "strong-ish", treat that as a # match. This catches reworded headlines whose embedding is just # below the strict cosine cutoff. Numbers are intentionally # conservative — both signals must be clearly above noise. elif ( NEWS_EMBEDDINGS_ENABLED and sigs["cosine"] >= 0.80 and (sigs["jaccard"] >= 0.30 or sigs["title"] >= 0.55) ): local_match = True signal_name = "consensus" signal_value = (sigs["cosine"] + max(sigs["jaccard"], sigs["title"])) / 2.0 else: signal_name, signal_value = "none", max(sigs["title"], sigs["jaccard"], sigs["cosine"]) if local_match and signal_value > best_signal_value: best_idx = idx best_signal_name = signal_name best_signal_value = signal_value if best_idx is not None: c = clusters[best_idx] existing_keys = {_article_key(x) for x in c.get("articles", []) or []} if _article_key(a) not in existing_keys: c["articles"].append(a) if a.get("source") and a["source"] not in c["sources"]: c["sources"].append(a["source"]) c["last_updated"] = max(str(c.get("last_updated", "")), str(a.get("timestamp", ""))) # Keep a tiny audit trail per cluster on which signal grew it last. # Not surfaced through tools — lives in the payload only for debug. c.setdefault("_merge_signals", []).append( {"signal": best_signal_name, "value": round(best_signal_value, 3)} ) else: # Stable cluster id: based on topic + normalized canonical title. key = f"{topic}|{_normalize_title(title)}" cid = hashlib.sha1(key.encode("utf-8")).hexdigest() cluster_embedding = article_embedding if NEWS_EMBEDDINGS_ENABLED else None clusters.append( { "cluster_id": cid, "headline": title, "summary": a.get("summary", ""), "topic": topic, "entities": [], "sentiment": "neutral", "importance": 0.0, "sources": [a["source"]] if a.get("source") else [], "timestamp": a.get("timestamp"), "articles": [a], "first_seen": a.get("timestamp"), "last_updated": a.get("timestamp"), "embedding": cluster_embedding, "embedding_model": "ollama:nomic-embed-text" if cluster_embedding else None, } ) # Strip the internal merge audit trail before returning so it does not # accidentally bloat the SQLite payload. Storage layer doesn't filter it. for clusters in by_topic.values(): for c in clusters: c.pop("_merge_signals", None) return {topic: clusters for topic, clusters in by_topic.items()}