cluster.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494
  1. from __future__ import annotations
  2. import asyncio
  3. import hashlib
  4. import re
  5. from datetime import datetime, timezone, timedelta
  6. from difflib import SequenceMatcher
  7. from typing import Any, Dict, List
  8. from urllib.parse import urlparse
  9. from news_mcp.config import (
  10. NEWS_EMBEDDINGS_ENABLED,
  11. NEWS_EMBEDDING_SIMILARITY_THRESHOLD,
  12. NEWS_CLUSTER_MAX_AGE_HOURS,
  13. )
  14. from news_mcp.dedup.embedding_support import cosine_similarity, ollama_embed
  15. from news_mcp.sources.news_feeds import normalize_topic_from_title
  16. # ---------------------------------------------------------------------------
  17. # Text helpers
  18. # ---------------------------------------------------------------------------
  19. def _normalize_title(title: str) -> str:
  20. t = title.lower().strip()
  21. t = re.sub(r"[^a-z0-9\s]", " ", t)
  22. t = re.sub(r"\s+", " ", t).strip()
  23. return t
  24. def _title_similarity(a: str, b: str) -> float:
  25. return SequenceMatcher(None, _normalize_title(a), _normalize_title(b)).ratio()
  26. def _article_key(article: Dict[str, Any]) -> str:
  27. url = str(article.get("url") or "").strip()
  28. if not url:
  29. return str(article.get("title") or "")
  30. try:
  31. parsed = urlparse(url)
  32. parts = [p for p in parsed.path.split("/") if p]
  33. if parts:
  34. return parts[-1]
  35. except Exception:
  36. pass
  37. return url
  38. def _cluster_text(a: Dict[str, Any]) -> str:
  39. parts = [a.get("title", ""), a.get("summary", "") or ""]
  40. return "\n".join(p for p in parts if p).strip()
  41. # ---------------------------------------------------------------------------
  42. # Token / Jaccard signal
  43. # ---------------------------------------------------------------------------
  44. _STOPWORDS = frozenset(
  45. {
  46. "a", "an", "the", "of", "to", "in", "on", "at", "for", "by", "with",
  47. "and", "or", "but", "if", "is", "are", "was", "were", "be", "been",
  48. "being", "as", "from", "that", "this", "these", "those", "it", "its",
  49. "into", "over", "under", "than", "then", "so", "such", "no", "not",
  50. "do", "does", "did", "will", "would", "can", "could", "should", "may",
  51. "might", "has", "have", "had", "after", "before", "amid", "vs", "via",
  52. "us", "uk",
  53. }
  54. )
  55. def _tokens(text: str) -> set[str]:
  56. tokens = re.findall(r"[a-z0-9][a-z0-9\-]+", text.lower())
  57. return {t for t in tokens if len(t) >= 3 and t not in _STOPWORDS}
  58. def _jaccard(a: set, b: set) -> float:
  59. if not a or not b:
  60. return 0.0
  61. inter = len(a & b)
  62. if inter == 0:
  63. return 0.0
  64. return inter / len(a | b)
  65. # ---------------------------------------------------------------------------
  66. # Composite similarity
  67. # ---------------------------------------------------------------------------
  68. DEFAULT_TITLE_THRESHOLD = 0.87
  69. DEFAULT_JACCARD_THRESHOLD = 0.55
  70. def _signals(article: Dict[str, Any], cluster: Dict[str, Any]) -> dict:
  71. """Per-pair similarity signals (title, jaccard, embedding cosine).
  72. Compares the article against ALL articles in the cluster and returns the
  73. best (max) signal across all comparisons. The cosine signal uses the
  74. cluster-level embedding; title and jaccard are computed per-article and
  75. the maximum is returned so that a match against any cluster member counts.
  76. """
  77. a_title = str(article.get("title") or "")
  78. c_title = str(cluster.get("headline") or "")
  79. a_emb = article.get("_embedding")
  80. c_emb = cluster.get("embedding")
  81. cosine = cosine_similarity(a_emb, c_emb) if a_emb and c_emb else 0.0
  82. best_title = 0.0
  83. best_jaccard = 0.0
  84. a_text = _cluster_text(article)
  85. a_toks = _tokens(a_text) if a_text else set()
  86. # Compare against every article in the cluster, take the best scores.
  87. cluster_articles = cluster.get("articles") or ([{"title": c_title}] if c_title else [])
  88. for ca in cluster_articles:
  89. if not isinstance(ca, dict):
  90. continue
  91. # title signal
  92. ca_title = str(ca.get("title") or "")
  93. if a_title and ca_title:
  94. t = _title_similarity(a_title, ca_title)
  95. if t > best_title:
  96. best_title = t
  97. # jaccard signal
  98. ca_text = _cluster_text(ca)
  99. if a_text and ca_text:
  100. j = _jaccard(a_toks, _tokens(ca_text))
  101. if j > best_jaccard:
  102. best_jaccard = j
  103. # early exit: if both title and jaccard are already very high
  104. if best_title >= 0.95 and best_jaccard >= 0.80:
  105. break
  106. return {"title": best_title, "jaccard": best_jaccard, "cosine": cosine}
  107. def _is_match(
  108. signals: dict,
  109. *,
  110. embeddings_enabled: bool,
  111. title_threshold: float = DEFAULT_TITLE_THRESHOLD,
  112. jaccard_threshold: float = DEFAULT_JACCARD_THRESHOLD,
  113. ) -> tuple[bool, str, float]:
  114. """Decide whether two items should merge based on the strongest signal.
  115. Cascade: cosine (if embeddings enabled) → title → jaccard → consensus.
  116. Returns (matched, signal_name, signal_value).
  117. """
  118. cosine_threshold = NEWS_EMBEDDING_SIMILARITY_THRESHOLD
  119. if embeddings_enabled and signals["cosine"] >= cosine_threshold:
  120. return True, "cosine", signals["cosine"]
  121. if signals["title"] >= title_threshold:
  122. return True, "title", signals["title"]
  123. if signals["jaccard"] >= jaccard_threshold:
  124. return True, "jaccard", signals["jaccard"]
  125. if (
  126. embeddings_enabled
  127. and signals["cosine"] >= 0.80
  128. and (signals["jaccard"] >= 0.30 or signals["title"] >= 0.55)
  129. ):
  130. val = (signals["cosine"] + max(signals["jaccard"], signals["title"])) / 2.0
  131. return True, "consensus", val
  132. return False, "none", 0.0
  133. # ---------------------------------------------------------------------------
  134. # Stable cluster ID
  135. # ---------------------------------------------------------------------------
  136. def _stable_cluster_id(topic: str, articles: List[Dict[str, Any]]) -> str:
  137. """Deterministic cluster ID derived from the topic and the sorted set of
  138. article keys. Using the minimum key (lexicographic) as the seed ensures
  139. that no matter which article arrives first, the same set of articles always
  140. maps to the same cluster_id."""
  141. keys = sorted(_article_key(a) for a in articles if _article_key(a))
  142. if not keys:
  143. # Degenerate fallback — single article with empty url and title
  144. return hashlib.sha1(topic.encode("utf-8")).hexdigest()
  145. seed = keys[0]
  146. return hashlib.sha1(f"{topic}|{seed}".encode("utf-8")).hexdigest()
  147. # ---------------------------------------------------------------------------
  148. # Temporal gating
  149. # ---------------------------------------------------------------------------
  150. def _parse_ts(ts_str: str) -> datetime | None:
  151. if not ts_str:
  152. return None
  153. try:
  154. s = str(ts_str).replace("Z", "+00:00")
  155. dt = datetime.fromisoformat(s)
  156. if dt.tzinfo is None:
  157. dt = dt.replace(tzinfo=timezone.utc)
  158. return dt.astimezone(timezone.utc)
  159. except Exception:
  160. pass
  161. try:
  162. from email.utils import parsedate_to_datetime
  163. dt = parsedate_to_datetime(str(ts_str))
  164. if dt.tzinfo is None:
  165. dt = dt.replace(tzinfo=timezone.utc)
  166. return dt.astimezone(timezone.utc)
  167. except Exception:
  168. return None
  169. def _cluster_is_within_age_window(cluster: Dict[str, Any], *, max_age_hours: float) -> bool:
  170. """Return True if the cluster's last_updated is within the merge window."""
  171. if max_age_hours <= 0:
  172. return True # 0 = no limit
  173. ts_str = cluster.get("last_updated") or cluster.get("timestamp") or ""
  174. dt = _parse_ts(ts_str)
  175. if dt is None:
  176. return True # be lenient with unparseable timestamps
  177. cutoff = datetime.now(timezone.utc) - timedelta(hours=max_age_hours)
  178. return dt >= cutoff
  179. # ---------------------------------------------------------------------------
  180. # Embedding pre-computation (async internally)
  181. # ---------------------------------------------------------------------------
  182. async def _compute_embeddings_concurrently(
  183. articles: List[Dict[str, Any]],
  184. ) -> Dict[str, list[float] | None]:
  185. """Compute embeddings for unique article texts concurrently.
  186. Returns a cache dict: text -> embedding vector or None.
  187. """
  188. unique_texts: list[str] = []
  189. seen: set[str] = set()
  190. for a in articles:
  191. text = _cluster_text(a)
  192. if text and text not in seen:
  193. seen.add(text)
  194. unique_texts.append(text)
  195. emb_tasks = [ollama_embed(text) for text in unique_texts]
  196. emb_results = await asyncio.gather(*emb_tasks, return_exceptions=True)
  197. cache: Dict[str, list[float] | None] = {}
  198. for text, result in zip(unique_texts, emb_results):
  199. if isinstance(result, list):
  200. cache[text] = result
  201. else:
  202. cache[text] = None
  203. return cache
  204. def _compute_embeddings_sync(
  205. articles: List[Dict[str, Any]],
  206. ) -> Dict[str, list[float] | None]:
  207. """Synchronous wrapper that runs the async embedding computation.
  208. Handles three cases:
  209. 1. Already inside an async event loop (called from poller) -> schedule
  210. as a task and run it to completion on the running loop.
  211. 2. No event loop at all (plain sync caller) -> use asyncio.run().
  212. """
  213. try:
  214. loop = asyncio.get_running_loop()
  215. except RuntimeError:
  216. # No running loop — safe to use asyncio.run()
  217. return asyncio.run(_compute_embeddings_concurrently(articles))
  218. # We're inside a running event loop (e.g. the poller). Create a new loop
  219. # in a thread to avoid blocking.
  220. import concurrent.futures
  221. with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
  222. future = pool.submit(
  223. asyncio.run, _compute_embeddings_concurrently(articles)
  224. )
  225. return future.result()
  226. # ---------------------------------------------------------------------------
  227. # Orphan merge: detect clusters sharing articles and merge them
  228. # ---------------------------------------------------------------------------
  229. def _merge_orphan_clusters(
  230. clusters: List[Dict[str, Any]],
  231. ) -> List[Dict[str, Any]]:
  232. """Post-clustering pass: merge clusters that share article keys.
  233. This handles the case where two articles about the same event didn't match
  234. during the main loop (e.g. embeddings were temporarily unavailable) and
  235. ended up in separate clusters. If two clusters share >= 1 article key, we
  236. merge them into one (keeping the earlier first_seen, recompute the stable
  237. ID from the union of articles).
  238. """
  239. if len(clusters) <= 1:
  240. return clusters
  241. # Build index: article_key -> list of cluster indices
  242. key_to_indices: dict[str, list[int]] = {}
  243. for idx, c in enumerate(clusters):
  244. for a in c.get("articles", []) or []:
  245. ak = _article_key(a)
  246. if ak:
  247. key_to_indices.setdefault(ak, []).append(idx)
  248. # Find connected components via Union-Find
  249. parent = list(range(len(clusters)))
  250. def find(x: int) -> int:
  251. while parent[x] != x:
  252. parent[x] = parent[parent[x]]
  253. x = parent[x]
  254. return x
  255. def union(a: int, b: int) -> None:
  256. ra, rb = find(a), find(b)
  257. if ra != rb:
  258. parent[ra] = rb
  259. for indices in key_to_indices.values():
  260. for i in range(1, len(indices)):
  261. union(indices[0], indices[i])
  262. # Group clusters by component
  263. components: dict[int, list[int]] = {}
  264. for idx in range(len(clusters)):
  265. root = find(idx)
  266. components.setdefault(root, []).append(idx)
  267. merged: List[Dict[str, Any]] = []
  268. for root, members in components.items():
  269. if len(members) == 1:
  270. merged.append(clusters[members[0]])
  271. continue
  272. # Merge all clusters in this component
  273. base = dict(clusters[members[0]])
  274. all_articles: list[dict] = list(base.get("articles", []) or [])
  275. all_sources: list[str] = list(base.get("sources", []) or [])
  276. first_seen = base.get("first_seen", "")
  277. last_updated = base.get("last_updated", "")
  278. for m_idx in members[1:]:
  279. other = clusters[m_idx]
  280. existing_keys = {_article_key(a) for a in all_articles}
  281. for a in other.get("articles", []) or []:
  282. ak = _article_key(a)
  283. if ak not in existing_keys:
  284. all_articles.append(a)
  285. existing_keys.add(ak)
  286. for s in other.get("sources", []) or []:
  287. if s not in all_sources:
  288. all_sources.append(s)
  289. fs = other.get("first_seen", "")
  290. if fs and (not first_seen or fs < first_seen):
  291. first_seen = fs
  292. lu = other.get("last_updated", "")
  293. if lu and (not last_updated or lu > last_updated):
  294. last_updated = lu
  295. base["articles"] = all_articles
  296. base["sources"] = all_sources
  297. base["first_seen"] = first_seen
  298. base["last_updated"] = last_updated
  299. base["cluster_id"] = _stable_cluster_id(base.get("topic", "other"), all_articles)
  300. merged.append(base)
  301. return merged
  302. # ---------------------------------------------------------------------------
  303. # Public API (sync — backward compatible with tests)
  304. # ---------------------------------------------------------------------------
  305. def dedup_and_cluster_articles(
  306. articles: List[Dict[str, Any]],
  307. similarity_threshold: float | None = None,
  308. *,
  309. existing_clusters: List[Dict[str, Any]] | None = None,
  310. max_age_hours: float = 0,
  311. ) -> Dict[str, List[Dict[str, Any]]]:
  312. """Deduplicate raw articles into clusters keyed by topic.
  313. v1.3: stable cluster IDs, temporal gating, and orphan merge.
  314. Args:
  315. articles: new articles to cluster.
  316. similarity_threshold: override for the title-similarity threshold.
  317. existing_clusters: optional list of recent clusters from the DB to
  318. merge against (cross-cycle merge). When provided, temporal
  319. gating via max_age_hours is applied to filter these.
  320. max_age_hours: only compare against existing_clusters updated within
  321. this many hours. 0 = no limit (compare against all provided).
  322. """
  323. title_threshold = similarity_threshold if similarity_threshold is not None else DEFAULT_TITLE_THRESHOLD
  324. # Pre-compute embeddings concurrently (sync boundary handles async internally)
  325. embedding_cache: Dict[str, list[float] | None] = {}
  326. if NEWS_EMBEDDINGS_ENABLED:
  327. embedding_cache = _compute_embeddings_sync(articles)
  328. by_topic: Dict[str, List[Dict[str, Any]]] = {}
  329. # Seed with existing clusters (filtered by age window)
  330. if existing_clusters:
  331. for c in existing_clusters:
  332. if not _cluster_is_within_age_window(c, max_age_hours=max_age_hours):
  333. continue
  334. topic = c.get("topic", "other") or "other"
  335. by_topic.setdefault(topic, []).append(dict(c))
  336. for a in articles:
  337. title = a.get("title") or ""
  338. if not title:
  339. continue
  340. topic = normalize_topic_from_title(title)
  341. article_text = _cluster_text(a)
  342. article_embedding = embedding_cache.get(article_text) if NEWS_EMBEDDINGS_ENABLED else None
  343. a_with_emb = dict(a)
  344. if article_embedding is not None:
  345. a_with_emb["_embedding"] = article_embedding
  346. by_topic.setdefault(topic, [])
  347. clusters = by_topic[topic]
  348. best_idx: int | None = None
  349. best_signal_name = "none"
  350. best_signal_value = 0.0
  351. for idx, c in enumerate(clusters):
  352. sigs = _signals(a_with_emb, c)
  353. matched, signal_name, signal_value = _is_match(
  354. sigs,
  355. embeddings_enabled=NEWS_EMBEDDINGS_ENABLED,
  356. title_threshold=title_threshold,
  357. )
  358. if matched and signal_value > best_signal_value:
  359. best_idx = idx
  360. best_signal_name = signal_name
  361. best_signal_value = signal_value
  362. if best_idx is not None:
  363. c = clusters[best_idx]
  364. existing_keys = {_article_key(x) for x in c.get("articles", []) or []}
  365. if _article_key(a) not in existing_keys:
  366. c["articles"].append(a)
  367. if a.get("source") and a["source"] not in c["sources"]:
  368. c["sources"].append(a["source"])
  369. c["last_updated"] = max(str(c.get("last_updated", "")), str(a.get("timestamp", "")))
  370. # Update cluster embedding to the new article's embedding so later
  371. # comparisons can match against the most recently added content.
  372. if NEWS_EMBEDDINGS_ENABLED and article_embedding is not None:
  373. c["embedding"] = article_embedding
  374. c["embedding_model"] = "ollama:nomic-embed-text"
  375. c.setdefault("_merge_signals", []).append(
  376. {"signal": best_signal_name, "value": round(best_signal_value, 3)}
  377. )
  378. else:
  379. cid = _stable_cluster_id(topic, [a])
  380. cluster_embedding = article_embedding if NEWS_EMBEDDINGS_ENABLED else None
  381. clusters.append(
  382. {
  383. "cluster_id": cid,
  384. "headline": title,
  385. "summary": a.get("summary", ""),
  386. "topic": topic,
  387. "entities": [],
  388. "sentiment": "neutral",
  389. "importance": 0.0,
  390. "sources": [a["source"]] if a.get("source") else [],
  391. "timestamp": a.get("timestamp"),
  392. "articles": [a],
  393. "first_seen": a.get("timestamp"),
  394. "last_updated": a.get("timestamp"),
  395. "embedding": cluster_embedding,
  396. "embedding_model": "ollama:nomic-embed-text" if cluster_embedding else None,
  397. }
  398. )
  399. # Post-clustering passes per topic
  400. for topic, clusters in by_topic.items():
  401. # Merge orphans (clusters that share articles)
  402. clusters = _merge_orphan_clusters(clusters)
  403. # Recompute stable IDs from the final article sets
  404. for c in clusters:
  405. c["cluster_id"] = _stable_cluster_id(topic, c.get("articles", []) or [])
  406. by_topic[topic] = clusters
  407. # Strip the internal merge audit trail before returning
  408. for clusters in by_topic.values():
  409. for c in clusters:
  410. c.pop("_merge_signals", None)
  411. return {topic: clusters for topic, clusters in by_topic.items()}