cluster.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292
  1. from __future__ import annotations
  2. import asyncio
  3. import hashlib
  4. import re
  5. from difflib import SequenceMatcher
  6. from typing import Any, Dict, List
  7. from urllib.parse import urlparse
  8. from news_mcp.config import NEWS_EMBEDDINGS_ENABLED, NEWS_EMBEDDING_SIMILARITY_THRESHOLD
  9. from news_mcp.dedup.embedding_support import cosine_similarity, ollama_embed
  10. from news_mcp.sources.news_feeds import normalize_topic_from_title
  11. # ---------------------------------------------------------------------------
  12. # Text helpers
  13. # ---------------------------------------------------------------------------
  14. def _normalize_title(title: str) -> str:
  15. t = title.lower().strip()
  16. t = re.sub(r"[^a-z0-9\s]", " ", t)
  17. t = re.sub(r"\s+", " ", t).strip()
  18. return t
  19. def _title_similarity(a: str, b: str) -> float:
  20. return SequenceMatcher(None, _normalize_title(a), _normalize_title(b)).ratio()
  21. def _article_key(article: Dict[str, Any]) -> str:
  22. url = str(article.get("url") or "").strip()
  23. if not url:
  24. return str(article.get("title") or "")
  25. try:
  26. parsed = urlparse(url)
  27. parts = [p for p in parsed.path.split("/") if p]
  28. if parts:
  29. return parts[-1]
  30. except Exception:
  31. pass
  32. return url
  33. def _cluster_text(a: Dict[str, Any]) -> str:
  34. parts = [a.get("title", ""), a.get("summary", "") or ""]
  35. return "\n".join(p for p in parts if p).strip()
  36. # ---------------------------------------------------------------------------
  37. # Token / Jaccard signal
  38. # ---------------------------------------------------------------------------
  39. _STOPWORDS = frozenset(
  40. {
  41. "a", "an", "the", "of", "to", "in", "on", "at", "for", "by", "with",
  42. "and", "or", "but", "if", "is", "are", "was", "were", "be", "been",
  43. "being", "as", "from", "that", "this", "these", "those", "it", "its",
  44. "into", "over", "under", "than", "then", "so", "such", "no", "not",
  45. "do", "does", "did", "will", "would", "can", "could", "should", "may",
  46. "might", "has", "have", "had", "after", "before", "amid", "vs", "via",
  47. "us", "uk",
  48. }
  49. )
  50. def _tokens(text: str) -> set[str]:
  51. tokens = re.findall(r"[a-z0-9][a-z0-9\-]+", text.lower())
  52. return {t for t in tokens if len(t) >= 3 and t not in _STOPWORDS}
  53. def _jaccard(a: set, b: set) -> float:
  54. if not a or not b:
  55. return 0.0
  56. inter = len(a & b)
  57. if inter == 0:
  58. return 0.0
  59. return inter / len(a | b)
  60. # ---------------------------------------------------------------------------
  61. # Composite similarity
  62. # ---------------------------------------------------------------------------
  63. DEFAULT_TITLE_THRESHOLD = 0.87
  64. DEFAULT_JACCARD_THRESHOLD = 0.55
  65. def _signals(article: Dict[str, Any], cluster: Dict[str, Any]) -> dict:
  66. """Per-pair similarity signals (title, jaccard, embedding cosine)."""
  67. a_title = str(article.get("title") or "")
  68. c_title = str(cluster.get("headline") or "")
  69. title_sim = _title_similarity(a_title, c_title) if a_title and c_title else 0.0
  70. a_text = _cluster_text(article)
  71. c_text_seed = (cluster.get("articles") or [{}])[0]
  72. c_text = _cluster_text(c_text_seed) if c_text_seed else c_title
  73. jaccard = _jaccard(_tokens(a_text), _tokens(c_text)) if a_text and c_text else 0.0
  74. a_emb = article.get("_embedding")
  75. c_emb = cluster.get("embedding")
  76. cosine = cosine_similarity(a_emb, c_emb) if a_emb and c_emb else 0.0
  77. return {"title": title_sim, "jaccard": jaccard, "cosine": cosine}
  78. def _is_match(signals: dict, *, embeddings_enabled: bool) -> tuple[bool, str, float]:
  79. """Decide whether two items should merge based on the strongest signal."""
  80. cosine_threshold = NEWS_EMBEDDING_SIMILARITY_THRESHOLD
  81. if embeddings_enabled and signals["cosine"] >= cosine_threshold:
  82. return True, "cosine", signals["cosine"]
  83. if signals["title"] >= DEFAULT_TITLE_THRESHOLD:
  84. return True, "title", signals["title"]
  85. if signals["jaccard"] >= DEFAULT_JACCARD_THRESHOLD:
  86. return True, "jaccard", signals["jaccard"]
  87. return False, "none", 0.0
  88. # ---------------------------------------------------------------------------
  89. # Embedding pre-computation (async internally)
  90. # ---------------------------------------------------------------------------
  91. async def _compute_embeddings_concurrently(
  92. articles: List[Dict[str, Any]],
  93. ) -> Dict[str, list[float] | None]:
  94. """Compute embeddings for unique article texts concurrently.
  95. Returns a cache dict: text -> embedding vector or None.
  96. """
  97. unique_texts: list[str] = []
  98. seen: set[str] = set()
  99. for a in articles:
  100. text = _cluster_text(a)
  101. if text and text not in seen:
  102. seen.add(text)
  103. unique_texts.append(text)
  104. emb_tasks = [ollama_embed(text) for text in unique_texts]
  105. emb_results = await asyncio.gather(*emb_tasks, return_exceptions=True)
  106. cache: Dict[str, list[float] | None] = {}
  107. for text, result in zip(unique_texts, emb_results):
  108. if isinstance(result, list):
  109. cache[text] = result
  110. else:
  111. cache[text] = None
  112. return cache
  113. def _compute_embeddings_sync(
  114. articles: List[Dict[str, Any]],
  115. ) -> Dict[str, list[float] | None]:
  116. """Synchronous wrapper that runs the async embedding computation.
  117. Handles three cases:
  118. 1. Already inside an async event loop (called from poller) -> schedule
  119. as a task and run it to completion on the running loop.
  120. 2. No event loop at all (plain sync caller) -> use asyncio.run().
  121. """
  122. try:
  123. loop = asyncio.get_running_loop()
  124. except RuntimeError:
  125. # No running loop — safe to use asyncio.run()
  126. return asyncio.run(_compute_embeddings_concurrently(articles))
  127. # We're inside a running event loop (e.g. the poller). Create a new loop
  128. # in a thread to avoid blocking.
  129. import concurrent.futures
  130. with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
  131. future = pool.submit(
  132. asyncio.run, _compute_embeddings_concurrently(articles)
  133. )
  134. return future.result()
  135. # ---------------------------------------------------------------------------
  136. # Public API (sync — backward compatible with tests)
  137. # ---------------------------------------------------------------------------
  138. def dedup_and_cluster_articles(
  139. articles: List[Dict[str, Any]],
  140. similarity_threshold: float | None = None,
  141. ) -> Dict[str, List[Dict[str, Any]]]:
  142. """Deduplicate raw articles into clusters keyed by topic.
  143. v1.2: embedding pre-computation is async/concurrent under the hood, but
  144. this public function remains synchronous for backward compatibility.
  145. A pair merges if ANY signal clears its threshold:
  146. * title fuzzy ratio
  147. * token Jaccard over headline+summary
  148. * Ollama embedding cosine when available
  149. """
  150. title_threshold = similarity_threshold if similarity_threshold is not None else DEFAULT_TITLE_THRESHOLD
  151. # Pre-compute embeddings concurrently (sync boundary handles async internally)
  152. embedding_cache: Dict[str, list[float] | None] = {}
  153. if NEWS_EMBEDDINGS_ENABLED:
  154. embedding_cache = _compute_embeddings_sync(articles)
  155. by_topic: Dict[str, List[Dict[str, Any]]] = {}
  156. for a in articles:
  157. title = a.get("title") or ""
  158. if not title:
  159. continue
  160. topic = normalize_topic_from_title(title)
  161. article_text = _cluster_text(a)
  162. article_embedding = embedding_cache.get(article_text) if NEWS_EMBEDDINGS_ENABLED else None
  163. a_with_emb = dict(a)
  164. if article_embedding is not None:
  165. a_with_emb["_embedding"] = article_embedding
  166. by_topic.setdefault(topic, [])
  167. clusters = by_topic[topic]
  168. best_idx: int | None = None
  169. best_signal_name = "none"
  170. best_signal_value = 0.0
  171. for idx, c in enumerate(clusters):
  172. sigs = _signals(a_with_emb, c)
  173. local_match = False
  174. if NEWS_EMBEDDINGS_ENABLED and sigs["cosine"] >= NEWS_EMBEDDING_SIMILARITY_THRESHOLD:
  175. local_match = True
  176. signal_name, signal_value = "cosine", sigs["cosine"]
  177. elif sigs["title"] >= title_threshold:
  178. local_match = True
  179. signal_name, signal_value = "title", sigs["title"]
  180. elif sigs["jaccard"] >= DEFAULT_JACCARD_THRESHOLD:
  181. local_match = True
  182. signal_name, signal_value = "jaccard", sigs["jaccard"]
  183. elif (
  184. NEWS_EMBEDDINGS_ENABLED
  185. and sigs["cosine"] >= 0.80
  186. and (sigs["jaccard"] >= 0.30 or sigs["title"] >= 0.55)
  187. ):
  188. local_match = True
  189. signal_name = "consensus"
  190. signal_value = (sigs["cosine"] + max(sigs["jaccard"], sigs["title"])) / 2.0
  191. else:
  192. signal_name, signal_value = "none", max(sigs["title"], sigs["jaccard"], sigs["cosine"])
  193. if local_match and signal_value > best_signal_value:
  194. best_idx = idx
  195. best_signal_name = signal_name
  196. best_signal_value = signal_value
  197. if best_idx is not None:
  198. c = clusters[best_idx]
  199. existing_keys = {_article_key(x) for x in c.get("articles", []) or []}
  200. if _article_key(a) not in existing_keys:
  201. c["articles"].append(a)
  202. if a.get("source") and a["source"] not in c["sources"]:
  203. c["sources"].append(a["source"])
  204. c["last_updated"] = max(str(c.get("last_updated", "")), str(a.get("timestamp", "")))
  205. c.setdefault("_merge_signals", []).append(
  206. {"signal": best_signal_name, "value": round(best_signal_value, 3)}
  207. )
  208. else:
  209. key = f"{topic}|{_normalize_title(title)}"
  210. cid = hashlib.sha1(key.encode("utf-8")).hexdigest()
  211. cluster_embedding = article_embedding if NEWS_EMBEDDINGS_ENABLED else None
  212. clusters.append(
  213. {
  214. "cluster_id": cid,
  215. "headline": title,
  216. "summary": a.get("summary", ""),
  217. "topic": topic,
  218. "entities": [],
  219. "sentiment": "neutral",
  220. "importance": 0.0,
  221. "sources": [a["source"]] if a.get("source") else [],
  222. "timestamp": a.get("timestamp"),
  223. "articles": [a],
  224. "first_seen": a.get("timestamp"),
  225. "last_updated": a.get("timestamp"),
  226. "embedding": cluster_embedding,
  227. "embedding_model": "ollama:nomic-embed-text" if cluster_embedding else None,
  228. }
  229. )
  230. # Strip the internal merge audit trail before returning
  231. for clusters in by_topic.values():
  232. for c in clusters:
  233. c.pop("_merge_signals", None)
  234. return {topic: clusters for topic, clusters in by_topic.items()}