cluster.py 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641
  1. from __future__ import annotations
  2. import asyncio
  3. import hashlib
  4. import re
  5. from datetime import datetime, timezone, timedelta
  6. from difflib import SequenceMatcher
  7. from typing import Any, Dict, List
  8. from news_mcp.config import (
  9. NEWS_EMBEDDINGS_ENABLED,
  10. NEWS_EMBEDDING_SIMILARITY_THRESHOLD,
  11. NEWS_CLUSTER_MAX_AGE_HOURS,
  12. )
  13. from news_mcp.article_identity import article_key, article_content_hash
  14. from news_mcp.dedup.embedding_support import cosine_similarity, ollama_embed
  15. from news_mcp.sources.news_feeds import normalize_topic_from_title
  16. # ---------------------------------------------------------------------------
  17. # Text helpers
  18. # ---------------------------------------------------------------------------
  19. def _normalize_title(title: str) -> str:
  20. t = title.lower().strip()
  21. t = re.sub(r"[^a-z0-9\s]", " ", t)
  22. t = re.sub(r"\s+", " ", t).strip()
  23. return t
  24. def _title_similarity(a: str, b: str) -> float:
  25. return SequenceMatcher(None, _normalize_title(a), _normalize_title(b)).ratio()
  26. # For internal use — canonical name is article_key(article) from article_identity
  27. _article_key = article_key
  28. def _cluster_text(a: Dict[str, Any]) -> str:
  29. parts = [a.get("title", ""), a.get("summary", "") or ""]
  30. return "\n".join(p for p in parts if p).strip()
  31. # ---------------------------------------------------------------------------
  32. # Token / Jaccard signal
  33. # ---------------------------------------------------------------------------
  34. _STOPWORDS = frozenset(
  35. {
  36. "a", "an", "the", "of", "to", "in", "on", "at", "for", "by", "with",
  37. "and", "or", "but", "if", "is", "are", "was", "were", "be", "been",
  38. "being", "as", "from", "that", "this", "these", "those", "it", "its",
  39. "into", "over", "under", "than", "then", "so", "such", "no", "not",
  40. "do", "does", "did", "will", "would", "can", "could", "should", "may",
  41. "might", "has", "have", "had", "after", "before", "amid", "vs", "via",
  42. "us", "uk",
  43. }
  44. )
  45. def _tokens(text: str) -> set[str]:
  46. tokens = re.findall(r"[a-z0-9][a-z0-9\-]+", text.lower())
  47. return {t for t in tokens if len(t) >= 3 and t not in _STOPWORDS}
  48. def _jaccard(a: set, b: set) -> float:
  49. if not a or not b:
  50. return 0.0
  51. inter = len(a & b)
  52. if inter == 0:
  53. return 0.0
  54. return inter / len(a | b)
  55. # ---------------------------------------------------------------------------
  56. # Composite similarity
  57. # ---------------------------------------------------------------------------
  58. DEFAULT_TITLE_THRESHOLD = 0.75
  59. DEFAULT_JACCARD_THRESHOLD = 0.55
  60. def _signals(article: Dict[str, Any], cluster: Dict[str, Any]) -> dict:
  61. """Per-pair similarity signals (title, jaccard, embedding cosine).
  62. Compares the article against ALL articles in the cluster and returns the
  63. best (max) signal across all comparisons. The cosine signal uses the
  64. cluster-level embedding; title and jaccard are computed per-article and
  65. the maximum is returned so that a match against any cluster member counts.
  66. """
  67. a_title = str(article.get("title") or "")
  68. c_title = str(cluster.get("headline") or "")
  69. a_emb = article.get("_embedding")
  70. c_emb = cluster.get("embedding")
  71. cosine = cosine_similarity(a_emb, c_emb) if a_emb and c_emb else 0.0
  72. best_title = 0.0
  73. best_jaccard = 0.0
  74. a_text = _cluster_text(article)
  75. a_toks = _tokens(a_text) if a_text else set()
  76. # Compare against every article in the cluster, take the best scores.
  77. cluster_articles = cluster.get("articles") or ([{"title": c_title}] if c_title else [])
  78. for ca in cluster_articles:
  79. if not isinstance(ca, dict):
  80. continue
  81. # title signal
  82. ca_title = str(ca.get("title") or "")
  83. if a_title and ca_title:
  84. t = _title_similarity(a_title, ca_title)
  85. if t > best_title:
  86. best_title = t
  87. # jaccard signal
  88. ca_text = _cluster_text(ca)
  89. if a_text and ca_text:
  90. j = _jaccard(a_toks, _tokens(ca_text))
  91. if j > best_jaccard:
  92. best_jaccard = j
  93. # early exit: if both title and jaccard are already very high
  94. if best_title >= 0.95 and best_jaccard >= 0.80:
  95. break
  96. return {"title": best_title, "jaccard": best_jaccard, "cosine": cosine}
  97. def _is_match(
  98. signals: dict,
  99. *,
  100. embeddings_enabled: bool,
  101. title_threshold: float = DEFAULT_TITLE_THRESHOLD,
  102. jaccard_threshold: float = DEFAULT_JACCARD_THRESHOLD,
  103. ) -> tuple[bool, str, float]:
  104. """Decide whether two items should merge based on the strongest signal.
  105. Cascade: cosine (if enabled) → title → jaccard → consensus → dual.
  106. Returns (matched, signal_name, signal_value).
  107. """
  108. cosine_threshold = NEWS_EMBEDDING_SIMILARITY_THRESHOLD
  109. if embeddings_enabled and signals["cosine"] >= cosine_threshold:
  110. return True, "cosine", signals["cosine"]
  111. if signals["title"] >= title_threshold:
  112. return True, "title", signals["title"]
  113. if signals["jaccard"] >= jaccard_threshold:
  114. return True, "jaccard", signals["jaccard"]
  115. if (
  116. embeddings_enabled
  117. and signals["cosine"] >= 0.80
  118. and (signals["jaccard"] >= 0.30 or signals["title"] >= 0.55)
  119. ):
  120. val = (signals["cosine"] + max(signals["jaccard"], signals["title"])) / 2.0
  121. return True, "consensus", val
  122. # Dual-signal: medium title + medium jaccard → credible match even without
  123. # embeddings. Catches cross-source variants where headlines differ
  124. # editorially (title ~0.55-0.74) but share substantial token overlap
  125. # (jaccard ~0.25-0.54).
  126. if signals["title"] >= 0.55 and signals["jaccard"] >= 0.25:
  127. val = (signals["title"] + signals["jaccard"]) / 2.0
  128. return True, "dual", val
  129. return False, "none", 0.0
  130. # ---------------------------------------------------------------------------
  131. # Stable cluster ID
  132. # ---------------------------------------------------------------------------
  133. def _stable_cluster_id(topic: str, articles: List[Dict[str, Any]]) -> str:
  134. """Deterministic cluster ID derived from the sorted set of article keys.
  135. The topic is intentionally excluded from the hash: the same article may be
  136. classified under different topics across cycles (heuristic vs LLM-enriched),
  137. but it must always map to the same cluster_id so that ON CONFLICT DO UPDATE
  138. in upsert_clusters correctly merges them instead of creating duplicates."""
  139. keys = sorted(_article_key(a) for a in articles if _article_key(a))
  140. if not keys:
  141. # Degenerate fallback — single article with empty url and title
  142. return hashlib.sha1(topic.encode("utf-8")).hexdigest()
  143. seed = keys[0]
  144. return hashlib.sha1(seed.encode("utf-8")).hexdigest()
  145. # ---------------------------------------------------------------------------
  146. # Temporal gating
  147. # ---------------------------------------------------------------------------
  148. def _parse_ts(ts_str: str) -> datetime | None:
  149. if not ts_str:
  150. return None
  151. try:
  152. s = str(ts_str).replace("Z", "+00:00")
  153. dt = datetime.fromisoformat(s)
  154. if dt.tzinfo is None:
  155. dt = dt.replace(tzinfo=timezone.utc)
  156. return dt.astimezone(timezone.utc)
  157. except Exception:
  158. pass
  159. try:
  160. from email.utils import parsedate_to_datetime
  161. dt = parsedate_to_datetime(str(ts_str))
  162. if dt.tzinfo is None:
  163. dt = dt.replace(tzinfo=timezone.utc)
  164. return dt.astimezone(timezone.utc)
  165. except Exception:
  166. return None
  167. def _cluster_is_within_age_window(cluster: Dict[str, Any], *, max_age_hours: float) -> bool:
  168. """Return True if the cluster's last_updated is within the merge window."""
  169. if max_age_hours <= 0:
  170. return True # 0 = no limit
  171. ts_str = cluster.get("last_updated") or cluster.get("timestamp") or ""
  172. dt = _parse_ts(ts_str)
  173. if dt is None:
  174. return True # be lenient with unparseable timestamps
  175. cutoff = datetime.now(timezone.utc) - timedelta(hours=max_age_hours)
  176. return dt >= cutoff
  177. # ---------------------------------------------------------------------------
  178. # Embedding pre-computation (async internally)
  179. # ---------------------------------------------------------------------------
  180. async def _compute_embeddings_concurrently(
  181. articles: List[Dict[str, Any]],
  182. ) -> Dict[str, list[float] | None]:
  183. """Compute embeddings for unique article texts concurrently.
  184. Returns a cache dict: text -> embedding vector or None.
  185. """
  186. unique_texts: list[str] = []
  187. seen: set[str] = set()
  188. for a in articles:
  189. text = _cluster_text(a)
  190. if text and text not in seen:
  191. seen.add(text)
  192. unique_texts.append(text)
  193. emb_tasks = [ollama_embed(text) for text in unique_texts]
  194. emb_results = await asyncio.gather(*emb_tasks, return_exceptions=True)
  195. cache: Dict[str, list[float] | None] = {}
  196. for text, result in zip(unique_texts, emb_results):
  197. if isinstance(result, list):
  198. cache[text] = result
  199. else:
  200. cache[text] = None
  201. return cache
  202. def _compute_embeddings_sync(
  203. articles: List[Dict[str, Any]],
  204. ) -> Dict[str, list[float] | None]:
  205. """Synchronous wrapper that runs the async embedding computation.
  206. Handles three cases:
  207. 1. Already inside an async event loop (called from poller) -> schedule
  208. as a task and run it to completion on the running loop.
  209. 2. No event loop at all (plain sync caller) -> use asyncio.run().
  210. """
  211. try:
  212. loop = asyncio.get_running_loop()
  213. except RuntimeError:
  214. # No running loop — safe to use asyncio.run()
  215. return asyncio.run(_compute_embeddings_concurrently(articles))
  216. # We're inside a running event loop (e.g. the poller). Create a new loop
  217. # in a thread to avoid blocking.
  218. import concurrent.futures
  219. with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
  220. future = pool.submit(
  221. asyncio.run, _compute_embeddings_concurrently(articles)
  222. )
  223. return future.result()
  224. # ---------------------------------------------------------------------------
  225. # Orphan merge: detect clusters sharing articles and merge them
  226. # ---------------------------------------------------------------------------
  227. def _merge_orphan_clusters(
  228. clusters: List[Dict[str, Any]],
  229. ) -> List[Dict[str, Any]]:
  230. """Post-clustering pass: merge clusters that share article keys.
  231. This handles the case where two articles about the same event didn't match
  232. during the main loop (e.g. embeddings were temporarily unavailable) and
  233. ended up in separate clusters. If two clusters share >= 1 article key, we
  234. merge them into one (keeping the earlier first_seen, recompute the stable
  235. ID from the union of articles).
  236. """
  237. if len(clusters) <= 1:
  238. return clusters
  239. # Build index: article_key -> list of cluster indices
  240. key_to_indices: dict[str, list[int]] = {}
  241. for idx, c in enumerate(clusters):
  242. for a in c.get("articles", []) or []:
  243. ak = _article_key(a)
  244. if ak:
  245. key_to_indices.setdefault(ak, []).append(idx)
  246. # Find connected components via Union-Find
  247. parent = list(range(len(clusters)))
  248. def find(x: int) -> int:
  249. while parent[x] != x:
  250. parent[x] = parent[parent[x]]
  251. x = parent[x]
  252. return x
  253. def union(a: int, b: int) -> None:
  254. ra, rb = find(a), find(b)
  255. if ra != rb:
  256. parent[ra] = rb
  257. for indices in key_to_indices.values():
  258. for i in range(1, len(indices)):
  259. union(indices[0], indices[i])
  260. # Group clusters by component
  261. components: dict[int, list[int]] = {}
  262. for idx in range(len(clusters)):
  263. root = find(idx)
  264. components.setdefault(root, []).append(idx)
  265. merged: List[Dict[str, Any]] = []
  266. for root, members in components.items():
  267. if len(members) == 1:
  268. merged.append(clusters[members[0]])
  269. continue
  270. # Merge all clusters in this component
  271. base = dict(clusters[members[0]])
  272. all_articles: list[dict] = list(base.get("articles", []) or [])
  273. all_sources: list[str] = list(base.get("sources", []) or [])
  274. first_seen = base.get("first_seen", "")
  275. last_updated = base.get("last_updated", "")
  276. for m_idx in members[1:]:
  277. other = clusters[m_idx]
  278. existing_keys = {_article_key(a) for a in all_articles}
  279. for a in other.get("articles", []) or []:
  280. ak = _article_key(a)
  281. if ak not in existing_keys:
  282. all_articles.append(a)
  283. existing_keys.add(ak)
  284. for s in other.get("sources", []) or []:
  285. if s not in all_sources:
  286. all_sources.append(s)
  287. fs = other.get("first_seen", "")
  288. if fs and (not first_seen or fs < first_seen):
  289. first_seen = fs
  290. lu = other.get("last_updated", "")
  291. if lu and (not last_updated or lu > last_updated):
  292. last_updated = lu
  293. base["articles"] = all_articles
  294. base["sources"] = all_sources
  295. base["first_seen"] = first_seen
  296. base["last_updated"] = last_updated
  297. # Keep the base cluster's original ID so the enrichment cache
  298. # (keyed by cluster_id) survives the merge.
  299. base.setdefault("cluster_id", _stable_cluster_id(base.get("topic", "other"), all_articles))
  300. merged.append(base)
  301. return merged
  302. # ---------------------------------------------------------------------------
  303. # Public API (sync — backward compatible with tests)
  304. # ---------------------------------------------------------------------------
  305. def dedup_and_cluster_articles(
  306. articles: List[Dict[str, Any]],
  307. similarity_threshold: float | None = None,
  308. *,
  309. existing_clusters: List[Dict[str, Any]] | None = None,
  310. max_age_hours: float = 0,
  311. ) -> Dict[str, List[Dict[str, Any]]]:
  312. """Deduplicate raw articles into clusters keyed by topic.
  313. v1.3: stable cluster IDs, temporal gating, and orphan merge.
  314. Args:
  315. articles: new articles to cluster.
  316. similarity_threshold: override for the title-similarity threshold.
  317. existing_clusters: optional list of recent clusters from the DB to
  318. merge against (cross-cycle merge). When provided, temporal
  319. gating via max_age_hours is applied to filter these.
  320. max_age_hours: only compare against existing_clusters updated within
  321. this many hours. 0 = no limit (compare against all provided).
  322. """
  323. title_threshold = similarity_threshold if similarity_threshold is not None else DEFAULT_TITLE_THRESHOLD
  324. # Pre-compute embeddings concurrently (sync boundary handles async internally)
  325. embedding_cache: Dict[str, list[float] | None] = {}
  326. if NEWS_EMBEDDINGS_ENABLED:
  327. embedding_cache = _compute_embeddings_sync(articles)
  328. by_topic: Dict[str, List[Dict[str, Any]]] = {}
  329. # Seed with existing clusters (filtered by age window).
  330. # Re-derive the topic via the same heuristic (normalize_topic_from_title)
  331. # that new articles use, so that existing and new clusters with the same
  332. # headline land in the same by_topic bucket regardless of what LLM
  333. # enrichment previously stored on the cluster.
  334. if existing_clusters:
  335. for c in existing_clusters:
  336. if not _cluster_is_within_age_window(c, max_age_hours=max_age_hours):
  337. continue
  338. seed_title = c.get("headline") or ""
  339. topic = normalize_topic_from_title(seed_title) if seed_title else (c.get("topic", "other") or "other")
  340. by_topic.setdefault(topic, []).append(dict(c))
  341. for a in articles:
  342. title = a.get("title") or ""
  343. if not title:
  344. continue
  345. topic = normalize_topic_from_title(title)
  346. article_text = _cluster_text(a)
  347. article_embedding = embedding_cache.get(article_text) if NEWS_EMBEDDINGS_ENABLED else None
  348. a_with_emb = dict(a)
  349. if article_embedding is not None:
  350. a_with_emb["_embedding"] = article_embedding
  351. by_topic.setdefault(topic, [])
  352. clusters = by_topic[topic]
  353. best_idx: int | None = None
  354. best_signal_name = "none"
  355. best_signal_value = 0.0
  356. for idx, c in enumerate(clusters):
  357. sigs = _signals(a_with_emb, c)
  358. matched, signal_name, signal_value = _is_match(
  359. sigs,
  360. embeddings_enabled=NEWS_EMBEDDINGS_ENABLED,
  361. title_threshold=title_threshold,
  362. )
  363. if matched and signal_value > best_signal_value:
  364. best_idx = idx
  365. best_signal_name = signal_name
  366. best_signal_value = signal_value
  367. if best_idx is not None:
  368. c = clusters[best_idx]
  369. existing_keys = {_article_key(x) for x in c.get("articles", []) or []}
  370. if _article_key(a) not in existing_keys:
  371. c["articles"].append(a)
  372. if a.get("source") and a["source"] not in c["sources"]:
  373. c["sources"].append(a["source"])
  374. c["last_updated"] = max(str(c.get("last_updated", "")), str(a.get("timestamp", "")))
  375. # Update cluster embedding to the new article's embedding so later
  376. # comparisons can match against the most recently added content.
  377. if NEWS_EMBEDDINGS_ENABLED and article_embedding is not None:
  378. c["embedding"] = article_embedding
  379. c["embedding_model"] = "ollama:nomic-embed-text"
  380. c.setdefault("_merge_signals", []).append(
  381. {"signal": best_signal_name, "value": round(best_signal_value, 3)}
  382. )
  383. else:
  384. cid = _stable_cluster_id(topic, [a])
  385. cluster_embedding = article_embedding if NEWS_EMBEDDINGS_ENABLED else None
  386. clusters.append(
  387. {
  388. "cluster_id": cid,
  389. "headline": title,
  390. "summary": a.get("summary", ""),
  391. "topic": topic,
  392. "entities": [],
  393. "sentiment": "neutral",
  394. "importance": 0.0,
  395. "sources": [a["source"]] if a.get("source") else [],
  396. "timestamp": a.get("timestamp"),
  397. "articles": [a],
  398. "first_seen": a.get("timestamp"),
  399. "last_updated": a.get("timestamp"),
  400. "embedding": cluster_embedding,
  401. "embedding_model": "ollama:nomic-embed-text" if cluster_embedding else None,
  402. }
  403. )
  404. # Post-clustering passes per topic
  405. for topic, clusters in by_topic.items():
  406. # Merge orphans (clusters that share articles)
  407. clusters = _merge_orphan_clusters(clusters)
  408. # Assign stable IDs only to clusters that don't already have one.
  409. # Pre-seeded clusters from the DB carry their original cluster_id —
  410. # keeping it stable across cycles so the enrichment cache (keyed by
  411. # cluster_id) continues to work even after new articles are merged in.
  412. for c in clusters:
  413. if not c.get("cluster_id"):
  414. c["cluster_id"] = _stable_cluster_id(topic, c.get("articles", []) or [])
  415. by_topic[topic] = clusters
  416. # Cross-topic dedup: merge clusters with overlapping headlines and entities
  417. by_topic = _merge_duplicate_clusters(by_topic)
  418. # Strip the internal merge audit trail before returning
  419. for clusters in by_topic.values():
  420. for c in clusters:
  421. c.pop("_merge_signals", None)
  422. return {topic: clusters for topic, clusters in by_topic.items()}
  423. def _merge_duplicate_clusters(
  424. by_topic: Dict[str, List[Dict[str, Any]]],
  425. ) -> Dict[str, List[Dict[str, Any]]]:
  426. """Cross-topic dedup: merge clusters whose headlines and entities overlap.
  427. Catches the case where the same event arrives from different feeds with
  428. different article keys, lands in separate clusters with different stable
  429. IDs, but has nearly identical headlines and shared entities.
  430. Merge criteria: title_similarity >= 0.90 AND at least one shared entity.
  431. This is intentionally conservative to avoid merging distinct events.
  432. """
  433. # Flatten all clusters with their topic
  434. all_clusters: list[tuple[str, dict]] = []
  435. for topic, clusters in by_topic.items():
  436. for c in clusters:
  437. all_clusters.append((topic, c))
  438. n = len(all_clusters)
  439. if n <= 1:
  440. return by_topic
  441. # Union-Find
  442. parent = list(range(n))
  443. def find(x: int) -> int:
  444. while parent[x] != x:
  445. parent[x] = parent[parent[x]]
  446. x = parent[x]
  447. return x
  448. def union(a: int, b: int) -> None:
  449. ra, rb = find(a), find(b)
  450. if ra != rb:
  451. parent[ra] = rb
  452. # Pre-extract normalized entity sets for each cluster
  453. cluster_ent_sets: list[set[str]] = []
  454. cluster_heads: list[str] = []
  455. for _, c in all_clusters:
  456. ents = {str(e).strip().lower() for e in (c.get("entities", []) or []) if str(e).strip()}
  457. cluster_ent_sets.append(ents)
  458. cluster_heads.append(str(c.get("headline", "") or ""))
  459. # Compare pairs — O(n^2) but n is small (clusters per cycle, not articles)
  460. TITLE_THRESHOLD = 0.90
  461. for i in range(n):
  462. for j in range(i + 1, n):
  463. # Quick skip: if headlines are completely different, no need for entity check
  464. if _title_similarity(cluster_heads[i], cluster_heads[j]) < TITLE_THRESHOLD:
  465. continue
  466. # Check entity overlap (at least one shared entity)
  467. if not (cluster_ent_sets[i] & cluster_ent_sets[j]):
  468. continue
  469. union(i, j)
  470. # Group by component
  471. components: dict[int, list[int]] = {}
  472. for idx in range(n):
  473. root = find(idx)
  474. components.setdefault(root, []).append(idx)
  475. # Merge each component
  476. merged_by_topic: Dict[str, List[Dict[str, Any]]] = {}
  477. for root, members in components.items():
  478. # Pick the base cluster (the one with the most sources, then most articles)
  479. best_idx = max(members, key=lambda i: (
  480. len(all_clusters[i][1].get("sources", []) or []),
  481. len(all_clusters[i][1].get("articles", []) or []),
  482. ))
  483. base_topic, base = all_clusters[best_idx]
  484. if len(members) == 1:
  485. merged_by_topic.setdefault(base_topic, []).append(base)
  486. continue
  487. # Merge all clusters in this component into base
  488. all_articles: list[dict] = list(base.get("articles", []) or [])
  489. all_sources: list[str] = list(base.get("sources", []) or [])
  490. all_entities: list[str] = list(base.get("entities", []) or [])
  491. all_keywords: list[str] = list(base.get("keywords", []) or [])
  492. first_seen = base.get("first_seen", "")
  493. last_updated = base.get("last_updated", "")
  494. existing_article_keys = {_article_key(a) for a in all_articles}
  495. existing_ent_lower = {str(e).strip().lower() for e in all_entities}
  496. existing_kw_lower = {str(k).strip().lower() for k in all_keywords}
  497. for m_idx in members:
  498. if m_idx == best_idx:
  499. continue
  500. other = all_clusters[m_idx][1]
  501. # Merge articles (dedup by key)
  502. for a in other.get("articles", []) or []:
  503. ak = _article_key(a)
  504. if ak not in existing_article_keys:
  505. all_articles.append(a)
  506. existing_article_keys.add(ak)
  507. # Merge sources
  508. for s in other.get("sources", []) or []:
  509. if s not in all_sources:
  510. all_sources.append(s)
  511. # Merge entities (dedup case-insensitive)
  512. for e in other.get("entities", []) or []:
  513. el = str(e).strip().lower()
  514. if el not in existing_ent_lower:
  515. all_entities.append(e)
  516. existing_ent_lower.add(el)
  517. # Merge keywords (dedup case-insensitive)
  518. for k in other.get("keywords", []) or []:
  519. kl = str(k).strip().lower()
  520. if kl not in existing_kw_lower:
  521. all_keywords.append(k)
  522. existing_kw_lower.add(kl)
  523. # Timestamps
  524. fs = other.get("first_seen", "")
  525. if fs and (not first_seen or fs < first_seen):
  526. first_seen = fs
  527. lu = other.get("last_updated", "")
  528. if lu and (not last_updated or lu > last_updated):
  529. last_updated = lu
  530. base["articles"] = all_articles
  531. base["sources"] = all_sources
  532. base["entities"] = all_entities
  533. base["keywords"] = all_keywords
  534. base["first_seen"] = first_seen
  535. base["last_updated"] = last_updated
  536. # Keep the base cluster's original ID so the enrichment cache
  537. # (keyed by cluster_id) survives the merge.
  538. base.setdefault("cluster_id", _stable_cluster_id(base.get("topic", "other"), all_articles))
  539. merged_by_topic.setdefault(base_topic, []).append(base)
  540. return merged_by_topic