| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278 |
- from __future__ import annotations
- import hashlib
- import re
- from difflib import SequenceMatcher
- from typing import Any, Dict, List
- from urllib.parse import urlparse
- from news_mcp.config import NEWS_EMBEDDINGS_ENABLED, NEWS_EMBEDDING_SIMILARITY_THRESHOLD
- from news_mcp.dedup.embedding_support import cosine_similarity, ollama_embed
- from news_mcp.sources.news_feeds import normalize_topic_from_title
- # ---------------------------------------------------------------------------
- # Text helpers
- # ---------------------------------------------------------------------------
- def _normalize_title(title: str) -> str:
- t = title.lower().strip()
- # Remove punctuation-ish characters for similarity scoring.
- t = re.sub(r"[^a-z0-9\s]", " ", t)
- t = re.sub(r"\s+", " ", t).strip()
- return t
- def _title_similarity(a: str, b: str) -> float:
- return SequenceMatcher(None, _normalize_title(a), _normalize_title(b)).ratio()
- def _article_key(article: Dict[str, Any]) -> str:
- url = str(article.get("url") or "").strip()
- if not url:
- return str(article.get("title") or "")
- try:
- parsed = urlparse(url)
- parts = [p for p in parsed.path.split("/") if p]
- if parts:
- return parts[-1]
- except Exception:
- pass
- return url
- def _cluster_text(a: Dict[str, Any]) -> str:
- parts = [a.get("title", ""), a.get("summary", "") or ""]
- return "\n".join(p for p in parts if p).strip()
- # ---------------------------------------------------------------------------
- # Token / Jaccard signal (used as a fallback alongside title similarity when
- # embeddings are unavailable, and as a soft signal even when they are).
- # ---------------------------------------------------------------------------
- # Tiny stop-word set — we keep it small on purpose because the corpus is news
- # headlines, where every additional removal risks losing genuine signal.
- _STOPWORDS = frozenset(
- {
- "a", "an", "the", "of", "to", "in", "on", "at", "for", "by", "with",
- "and", "or", "but", "if", "is", "are", "was", "were", "be", "been",
- "being", "as", "from", "that", "this", "these", "those", "it", "its",
- "into", "over", "under", "than", "then", "so", "such", "no", "not",
- "do", "does", "did", "will", "would", "can", "could", "should", "may",
- "might", "has", "have", "had", "after", "before", "amid", "vs", "via",
- "us", "uk",
- }
- )
- def _tokens(text: str) -> set[str]:
- """Lowercase content tokens, stop-words removed, length>=3."""
- tokens = re.findall(r"[a-z0-9][a-z0-9\-]+", text.lower())
- return {t for t in tokens if len(t) >= 3 and t not in _STOPWORDS}
- def _jaccard(a: set, b: set) -> float:
- if not a or not b:
- return 0.0
- inter = len(a & b)
- if inter == 0:
- return 0.0
- return inter / len(a | b)
- # ---------------------------------------------------------------------------
- # Composite similarity
- # ---------------------------------------------------------------------------
- # Each signal has its own threshold. We accept a merge if ANY signal clears its
- # threshold, which makes clustering robust when one signal happens to be weak
- # (short headlines kill SequenceMatcher; single-word stories kill Jaccard;
- # Ollama outages kill cosine similarity).
- DEFAULT_TITLE_THRESHOLD = 0.87
- DEFAULT_JACCARD_THRESHOLD = 0.55
- def _signals(article: Dict[str, Any], cluster: Dict[str, Any]) -> dict:
- """Per-pair similarity signals (title, jaccard, embedding cosine).
- Embedding cosine is only computed when both sides have a vector; we never
- block on a fresh Ollama request here — that's the caller's job, so this
- function stays pure and easy to test.
- """
- a_title = str(article.get("title") or "")
- c_title = str(cluster.get("headline") or "")
- title_sim = _title_similarity(a_title, c_title) if a_title and c_title else 0.0
- a_text = _cluster_text(article)
- c_text_seed = (cluster.get("articles") or [{}])[0]
- c_text = _cluster_text(c_text_seed) if c_text_seed else c_title
- jaccard = _jaccard(_tokens(a_text), _tokens(c_text)) if a_text and c_text else 0.0
- a_emb = article.get("_embedding")
- c_emb = cluster.get("embedding")
- cosine = cosine_similarity(a_emb, c_emb) if a_emb and c_emb else 0.0
- return {"title": title_sim, "jaccard": jaccard, "cosine": cosine}
- def _is_match(signals: dict, *, embeddings_enabled: bool) -> tuple[bool, str, float]:
- """Decide whether two items should merge based on the strongest signal.
- Returns (matched, signal_name, signal_value). The signal_name lets callers
- log *why* something merged, which is huge for debugging clustering quality.
- """
- cosine_threshold = NEWS_EMBEDDING_SIMILARITY_THRESHOLD
- if embeddings_enabled and signals["cosine"] >= cosine_threshold:
- return True, "cosine", signals["cosine"]
- if signals["title"] >= DEFAULT_TITLE_THRESHOLD:
- return True, "title", signals["title"]
- if signals["jaccard"] >= DEFAULT_JACCARD_THRESHOLD:
- return True, "jaccard", signals["jaccard"]
- return False, "none", 0.0
- # ---------------------------------------------------------------------------
- # Public API
- # ---------------------------------------------------------------------------
- def dedup_and_cluster_articles(
- articles: List[Dict[str, Any]],
- similarity_threshold: float | None = None,
- ) -> Dict[str, List[Dict[str, Any]]]:
- """Deduplicate raw articles into clusters keyed by topic.
- v1.1 strategy: composite similarity.
- * title fuzzy ratio
- * token Jaccard over headline+summary (cheap, surprisingly resilient
- when titles are reworded heavily across outlets)
- * Ollama embedding cosine when available
- A pair merges if ANY signal clears its threshold. Falling back through
- multiple signals means a transient Ollama outage doesn't collapse the
- server back into title-only clustering, and a heavily-reworded headline
- can still merge via Jaccard or embeddings.
- The ``similarity_threshold`` argument is kept for backward compatibility
- with the test suite. When provided, it overrides the title threshold.
- """
- title_threshold = similarity_threshold if similarity_threshold is not None else DEFAULT_TITLE_THRESHOLD
- by_topic: Dict[str, List[Dict[str, Any]]] = {}
- embedding_cache: Dict[str, list[float] | None] = {}
- def _embedding_for_text(text: str) -> list[float] | None:
- if not NEWS_EMBEDDINGS_ENABLED or not text:
- return None
- if text in embedding_cache:
- return embedding_cache[text]
- emb = ollama_embed(text)
- # Cache None too so a single failure doesn't trigger repeated retries
- # within one ingestion cycle. The next refresh call clears this map.
- embedding_cache[text] = emb
- return emb
- for a in articles:
- title = a.get("title") or ""
- if not title:
- continue
- topic = normalize_topic_from_title(title)
- article_text = _cluster_text(a)
- article_embedding = _embedding_for_text(article_text)
- # Attach embedding on the article dict so _signals() can read it
- # without re-computing.
- a_with_emb = dict(a)
- if article_embedding is not None:
- a_with_emb["_embedding"] = article_embedding
- by_topic.setdefault(topic, [])
- clusters = by_topic[topic]
- best_idx: int | None = None
- best_signal_name = "none"
- best_signal_value = 0.0
- for idx, c in enumerate(clusters):
- sigs = _signals(a_with_emb, c)
- # Use the title threshold the caller explicitly passed (test override)
- # but otherwise rely on the module defaults.
- local_match = False
- if NEWS_EMBEDDINGS_ENABLED and sigs["cosine"] >= NEWS_EMBEDDING_SIMILARITY_THRESHOLD:
- local_match = True
- signal_name, signal_value = "cosine", sigs["cosine"]
- elif sigs["title"] >= title_threshold:
- local_match = True
- signal_name, signal_value = "title", sigs["title"]
- elif sigs["jaccard"] >= DEFAULT_JACCARD_THRESHOLD:
- local_match = True
- signal_name, signal_value = "jaccard", sigs["jaccard"]
- # Consensus rule: when no single signal clears its strict threshold
- # but two of them are simultaneously "strong-ish", treat that as a
- # match. This catches reworded headlines whose embedding is just
- # below the strict cosine cutoff. Numbers are intentionally
- # conservative — both signals must be clearly above noise.
- elif (
- NEWS_EMBEDDINGS_ENABLED
- and sigs["cosine"] >= 0.80
- and (sigs["jaccard"] >= 0.30 or sigs["title"] >= 0.55)
- ):
- local_match = True
- signal_name = "consensus"
- signal_value = (sigs["cosine"] + max(sigs["jaccard"], sigs["title"])) / 2.0
- else:
- signal_name, signal_value = "none", max(sigs["title"], sigs["jaccard"], sigs["cosine"])
- if local_match and signal_value > best_signal_value:
- best_idx = idx
- best_signal_name = signal_name
- best_signal_value = signal_value
- if best_idx is not None:
- c = clusters[best_idx]
- existing_keys = {_article_key(x) for x in c.get("articles", []) or []}
- if _article_key(a) not in existing_keys:
- c["articles"].append(a)
- if a.get("source") and a["source"] not in c["sources"]:
- c["sources"].append(a["source"])
- c["last_updated"] = max(str(c.get("last_updated", "")), str(a.get("timestamp", "")))
- # Keep a tiny audit trail per cluster on which signal grew it last.
- # Not surfaced through tools — lives in the payload only for debug.
- c.setdefault("_merge_signals", []).append(
- {"signal": best_signal_name, "value": round(best_signal_value, 3)}
- )
- else:
- # Stable cluster id: based on topic + normalized canonical title.
- key = f"{topic}|{_normalize_title(title)}"
- cid = hashlib.sha1(key.encode("utf-8")).hexdigest()
- cluster_embedding = article_embedding if NEWS_EMBEDDINGS_ENABLED else None
- clusters.append(
- {
- "cluster_id": cid,
- "headline": title,
- "summary": a.get("summary", ""),
- "topic": topic,
- "entities": [],
- "sentiment": "neutral",
- "importance": 0.0,
- "sources": [a["source"]] if a.get("source") else [],
- "timestamp": a.get("timestamp"),
- "articles": [a],
- "first_seen": a.get("timestamp"),
- "last_updated": a.get("timestamp"),
- "embedding": cluster_embedding,
- "embedding_model": "ollama:nomic-embed-text" if cluster_embedding else None,
- }
- )
- # Strip the internal merge audit trail before returning so it does not
- # accidentally bloat the SQLite payload. Storage layer doesn't filter it.
- for clusters in by_topic.values():
- for c in clusters:
- c.pop("_merge_signals", None)
- return {topic: clusters for topic, clusters in by_topic.items()}
|