cluster.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278
  1. from __future__ import annotations
  2. import hashlib
  3. import re
  4. from difflib import SequenceMatcher
  5. from typing import Any, Dict, List
  6. from urllib.parse import urlparse
  7. from news_mcp.config import NEWS_EMBEDDINGS_ENABLED, NEWS_EMBEDDING_SIMILARITY_THRESHOLD
  8. from news_mcp.dedup.embedding_support import cosine_similarity, ollama_embed
  9. from news_mcp.sources.news_feeds import normalize_topic_from_title
  10. # ---------------------------------------------------------------------------
  11. # Text helpers
  12. # ---------------------------------------------------------------------------
  13. def _normalize_title(title: str) -> str:
  14. t = title.lower().strip()
  15. # Remove punctuation-ish characters for similarity scoring.
  16. t = re.sub(r"[^a-z0-9\s]", " ", t)
  17. t = re.sub(r"\s+", " ", t).strip()
  18. return t
  19. def _title_similarity(a: str, b: str) -> float:
  20. return SequenceMatcher(None, _normalize_title(a), _normalize_title(b)).ratio()
  21. def _article_key(article: Dict[str, Any]) -> str:
  22. url = str(article.get("url") or "").strip()
  23. if not url:
  24. return str(article.get("title") or "")
  25. try:
  26. parsed = urlparse(url)
  27. parts = [p for p in parsed.path.split("/") if p]
  28. if parts:
  29. return parts[-1]
  30. except Exception:
  31. pass
  32. return url
  33. def _cluster_text(a: Dict[str, Any]) -> str:
  34. parts = [a.get("title", ""), a.get("summary", "") or ""]
  35. return "\n".join(p for p in parts if p).strip()
  36. # ---------------------------------------------------------------------------
  37. # Token / Jaccard signal (used as a fallback alongside title similarity when
  38. # embeddings are unavailable, and as a soft signal even when they are).
  39. # ---------------------------------------------------------------------------
  40. # Tiny stop-word set — we keep it small on purpose because the corpus is news
  41. # headlines, where every additional removal risks losing genuine signal.
  42. _STOPWORDS = frozenset(
  43. {
  44. "a", "an", "the", "of", "to", "in", "on", "at", "for", "by", "with",
  45. "and", "or", "but", "if", "is", "are", "was", "were", "be", "been",
  46. "being", "as", "from", "that", "this", "these", "those", "it", "its",
  47. "into", "over", "under", "than", "then", "so", "such", "no", "not",
  48. "do", "does", "did", "will", "would", "can", "could", "should", "may",
  49. "might", "has", "have", "had", "after", "before", "amid", "vs", "via",
  50. "us", "uk",
  51. }
  52. )
  53. def _tokens(text: str) -> set[str]:
  54. """Lowercase content tokens, stop-words removed, length>=3."""
  55. tokens = re.findall(r"[a-z0-9][a-z0-9\-]+", text.lower())
  56. return {t for t in tokens if len(t) >= 3 and t not in _STOPWORDS}
  57. def _jaccard(a: set, b: set) -> float:
  58. if not a or not b:
  59. return 0.0
  60. inter = len(a & b)
  61. if inter == 0:
  62. return 0.0
  63. return inter / len(a | b)
  64. # ---------------------------------------------------------------------------
  65. # Composite similarity
  66. # ---------------------------------------------------------------------------
  67. # Each signal has its own threshold. We accept a merge if ANY signal clears its
  68. # threshold, which makes clustering robust when one signal happens to be weak
  69. # (short headlines kill SequenceMatcher; single-word stories kill Jaccard;
  70. # Ollama outages kill cosine similarity).
  71. DEFAULT_TITLE_THRESHOLD = 0.87
  72. DEFAULT_JACCARD_THRESHOLD = 0.55
  73. def _signals(article: Dict[str, Any], cluster: Dict[str, Any]) -> dict:
  74. """Per-pair similarity signals (title, jaccard, embedding cosine).
  75. Embedding cosine is only computed when both sides have a vector; we never
  76. block on a fresh Ollama request here — that's the caller's job, so this
  77. function stays pure and easy to test.
  78. """
  79. a_title = str(article.get("title") or "")
  80. c_title = str(cluster.get("headline") or "")
  81. title_sim = _title_similarity(a_title, c_title) if a_title and c_title else 0.0
  82. a_text = _cluster_text(article)
  83. c_text_seed = (cluster.get("articles") or [{}])[0]
  84. c_text = _cluster_text(c_text_seed) if c_text_seed else c_title
  85. jaccard = _jaccard(_tokens(a_text), _tokens(c_text)) if a_text and c_text else 0.0
  86. a_emb = article.get("_embedding")
  87. c_emb = cluster.get("embedding")
  88. cosine = cosine_similarity(a_emb, c_emb) if a_emb and c_emb else 0.0
  89. return {"title": title_sim, "jaccard": jaccard, "cosine": cosine}
  90. def _is_match(signals: dict, *, embeddings_enabled: bool) -> tuple[bool, str, float]:
  91. """Decide whether two items should merge based on the strongest signal.
  92. Returns (matched, signal_name, signal_value). The signal_name lets callers
  93. log *why* something merged, which is huge for debugging clustering quality.
  94. """
  95. cosine_threshold = NEWS_EMBEDDING_SIMILARITY_THRESHOLD
  96. if embeddings_enabled and signals["cosine"] >= cosine_threshold:
  97. return True, "cosine", signals["cosine"]
  98. if signals["title"] >= DEFAULT_TITLE_THRESHOLD:
  99. return True, "title", signals["title"]
  100. if signals["jaccard"] >= DEFAULT_JACCARD_THRESHOLD:
  101. return True, "jaccard", signals["jaccard"]
  102. return False, "none", 0.0
  103. # ---------------------------------------------------------------------------
  104. # Public API
  105. # ---------------------------------------------------------------------------
  106. def dedup_and_cluster_articles(
  107. articles: List[Dict[str, Any]],
  108. similarity_threshold: float | None = None,
  109. ) -> Dict[str, List[Dict[str, Any]]]:
  110. """Deduplicate raw articles into clusters keyed by topic.
  111. v1.1 strategy: composite similarity.
  112. * title fuzzy ratio
  113. * token Jaccard over headline+summary (cheap, surprisingly resilient
  114. when titles are reworded heavily across outlets)
  115. * Ollama embedding cosine when available
  116. A pair merges if ANY signal clears its threshold. Falling back through
  117. multiple signals means a transient Ollama outage doesn't collapse the
  118. server back into title-only clustering, and a heavily-reworded headline
  119. can still merge via Jaccard or embeddings.
  120. The ``similarity_threshold`` argument is kept for backward compatibility
  121. with the test suite. When provided, it overrides the title threshold.
  122. """
  123. title_threshold = similarity_threshold if similarity_threshold is not None else DEFAULT_TITLE_THRESHOLD
  124. by_topic: Dict[str, List[Dict[str, Any]]] = {}
  125. embedding_cache: Dict[str, list[float] | None] = {}
  126. def _embedding_for_text(text: str) -> list[float] | None:
  127. if not NEWS_EMBEDDINGS_ENABLED or not text:
  128. return None
  129. if text in embedding_cache:
  130. return embedding_cache[text]
  131. emb = ollama_embed(text)
  132. # Cache None too so a single failure doesn't trigger repeated retries
  133. # within one ingestion cycle. The next refresh call clears this map.
  134. embedding_cache[text] = emb
  135. return emb
  136. for a in articles:
  137. title = a.get("title") or ""
  138. if not title:
  139. continue
  140. topic = normalize_topic_from_title(title)
  141. article_text = _cluster_text(a)
  142. article_embedding = _embedding_for_text(article_text)
  143. # Attach embedding on the article dict so _signals() can read it
  144. # without re-computing.
  145. a_with_emb = dict(a)
  146. if article_embedding is not None:
  147. a_with_emb["_embedding"] = article_embedding
  148. by_topic.setdefault(topic, [])
  149. clusters = by_topic[topic]
  150. best_idx: int | None = None
  151. best_signal_name = "none"
  152. best_signal_value = 0.0
  153. for idx, c in enumerate(clusters):
  154. sigs = _signals(a_with_emb, c)
  155. # Use the title threshold the caller explicitly passed (test override)
  156. # but otherwise rely on the module defaults.
  157. local_match = False
  158. if NEWS_EMBEDDINGS_ENABLED and sigs["cosine"] >= NEWS_EMBEDDING_SIMILARITY_THRESHOLD:
  159. local_match = True
  160. signal_name, signal_value = "cosine", sigs["cosine"]
  161. elif sigs["title"] >= title_threshold:
  162. local_match = True
  163. signal_name, signal_value = "title", sigs["title"]
  164. elif sigs["jaccard"] >= DEFAULT_JACCARD_THRESHOLD:
  165. local_match = True
  166. signal_name, signal_value = "jaccard", sigs["jaccard"]
  167. # Consensus rule: when no single signal clears its strict threshold
  168. # but two of them are simultaneously "strong-ish", treat that as a
  169. # match. This catches reworded headlines whose embedding is just
  170. # below the strict cosine cutoff. Numbers are intentionally
  171. # conservative — both signals must be clearly above noise.
  172. elif (
  173. NEWS_EMBEDDINGS_ENABLED
  174. and sigs["cosine"] >= 0.80
  175. and (sigs["jaccard"] >= 0.30 or sigs["title"] >= 0.55)
  176. ):
  177. local_match = True
  178. signal_name = "consensus"
  179. signal_value = (sigs["cosine"] + max(sigs["jaccard"], sigs["title"])) / 2.0
  180. else:
  181. signal_name, signal_value = "none", max(sigs["title"], sigs["jaccard"], sigs["cosine"])
  182. if local_match and signal_value > best_signal_value:
  183. best_idx = idx
  184. best_signal_name = signal_name
  185. best_signal_value = signal_value
  186. if best_idx is not None:
  187. c = clusters[best_idx]
  188. existing_keys = {_article_key(x) for x in c.get("articles", []) or []}
  189. if _article_key(a) not in existing_keys:
  190. c["articles"].append(a)
  191. if a.get("source") and a["source"] not in c["sources"]:
  192. c["sources"].append(a["source"])
  193. c["last_updated"] = max(str(c.get("last_updated", "")), str(a.get("timestamp", "")))
  194. # Keep a tiny audit trail per cluster on which signal grew it last.
  195. # Not surfaced through tools — lives in the payload only for debug.
  196. c.setdefault("_merge_signals", []).append(
  197. {"signal": best_signal_name, "value": round(best_signal_value, 3)}
  198. )
  199. else:
  200. # Stable cluster id: based on topic + normalized canonical title.
  201. key = f"{topic}|{_normalize_title(title)}"
  202. cid = hashlib.sha1(key.encode("utf-8")).hexdigest()
  203. cluster_embedding = article_embedding if NEWS_EMBEDDINGS_ENABLED else None
  204. clusters.append(
  205. {
  206. "cluster_id": cid,
  207. "headline": title,
  208. "summary": a.get("summary", ""),
  209. "topic": topic,
  210. "entities": [],
  211. "sentiment": "neutral",
  212. "importance": 0.0,
  213. "sources": [a["source"]] if a.get("source") else [],
  214. "timestamp": a.get("timestamp"),
  215. "articles": [a],
  216. "first_seen": a.get("timestamp"),
  217. "last_updated": a.get("timestamp"),
  218. "embedding": cluster_embedding,
  219. "embedding_model": "ollama:nomic-embed-text" if cluster_embedding else None,
  220. }
  221. )
  222. # Strip the internal merge audit trail before returning so it does not
  223. # accidentally bloat the SQLite payload. Storage layer doesn't filter it.
  224. for clusters in by_topic.values():
  225. for c in clusters:
  226. c.pop("_merge_signals", None)
  227. return {topic: clusters for topic, clusters in by_topic.items()}