| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651 |
- from __future__ import annotations
- import asyncio
- import hashlib
- import re
- from datetime import datetime, timezone, timedelta
- from difflib import SequenceMatcher
- from typing import Any, Dict, List
- from urllib.parse import urlparse
- from news_mcp.config import (
- NEWS_EMBEDDINGS_ENABLED,
- NEWS_EMBEDDING_SIMILARITY_THRESHOLD,
- NEWS_CLUSTER_MAX_AGE_HOURS,
- )
- from news_mcp.dedup.embedding_support import cosine_similarity, ollama_embed
- from news_mcp.sources.news_feeds import normalize_topic_from_title
- # ---------------------------------------------------------------------------
- # Text helpers
- # ---------------------------------------------------------------------------
- def _normalize_title(title: str) -> str:
- t = title.lower().strip()
- t = re.sub(r"[^a-z0-9\s]", " ", t)
- t = re.sub(r"\s+", " ", t).strip()
- return t
- def _title_similarity(a: str, b: str) -> float:
- return SequenceMatcher(None, _normalize_title(a), _normalize_title(b)).ratio()
- def _article_key(article: Dict[str, Any]) -> str:
- url = str(article.get("url") or "").strip()
- if not url:
- return str(article.get("title") or "")
- try:
- parsed = urlparse(url)
- parts = [p for p in parsed.path.split("/") if p]
- if parts:
- return parts[-1]
- except Exception:
- pass
- return url
- def _cluster_text(a: Dict[str, Any]) -> str:
- parts = [a.get("title", ""), a.get("summary", "") or ""]
- return "\n".join(p for p in parts if p).strip()
- # ---------------------------------------------------------------------------
- # Token / Jaccard signal
- # ---------------------------------------------------------------------------
- _STOPWORDS = frozenset(
- {
- "a", "an", "the", "of", "to", "in", "on", "at", "for", "by", "with",
- "and", "or", "but", "if", "is", "are", "was", "were", "be", "been",
- "being", "as", "from", "that", "this", "these", "those", "it", "its",
- "into", "over", "under", "than", "then", "so", "such", "no", "not",
- "do", "does", "did", "will", "would", "can", "could", "should", "may",
- "might", "has", "have", "had", "after", "before", "amid", "vs", "via",
- "us", "uk",
- }
- )
- def _tokens(text: str) -> set[str]:
- tokens = re.findall(r"[a-z0-9][a-z0-9\-]+", text.lower())
- return {t for t in tokens if len(t) >= 3 and t not in _STOPWORDS}
- def _jaccard(a: set, b: set) -> float:
- if not a or not b:
- return 0.0
- inter = len(a & b)
- if inter == 0:
- return 0.0
- return inter / len(a | b)
- # ---------------------------------------------------------------------------
- # Composite similarity
- # ---------------------------------------------------------------------------
- DEFAULT_TITLE_THRESHOLD = 0.75
- DEFAULT_JACCARD_THRESHOLD = 0.55
- def _signals(article: Dict[str, Any], cluster: Dict[str, Any]) -> dict:
- """Per-pair similarity signals (title, jaccard, embedding cosine).
- Compares the article against ALL articles in the cluster and returns the
- best (max) signal across all comparisons. The cosine signal uses the
- cluster-level embedding; title and jaccard are computed per-article and
- the maximum is returned so that a match against any cluster member counts.
- """
- a_title = str(article.get("title") or "")
- c_title = str(cluster.get("headline") or "")
- a_emb = article.get("_embedding")
- c_emb = cluster.get("embedding")
- cosine = cosine_similarity(a_emb, c_emb) if a_emb and c_emb else 0.0
- best_title = 0.0
- best_jaccard = 0.0
- a_text = _cluster_text(article)
- a_toks = _tokens(a_text) if a_text else set()
- # Compare against every article in the cluster, take the best scores.
- cluster_articles = cluster.get("articles") or ([{"title": c_title}] if c_title else [])
- for ca in cluster_articles:
- if not isinstance(ca, dict):
- continue
- # title signal
- ca_title = str(ca.get("title") or "")
- if a_title and ca_title:
- t = _title_similarity(a_title, ca_title)
- if t > best_title:
- best_title = t
- # jaccard signal
- ca_text = _cluster_text(ca)
- if a_text and ca_text:
- j = _jaccard(a_toks, _tokens(ca_text))
- if j > best_jaccard:
- best_jaccard = j
- # early exit: if both title and jaccard are already very high
- if best_title >= 0.95 and best_jaccard >= 0.80:
- break
- return {"title": best_title, "jaccard": best_jaccard, "cosine": cosine}
- def _is_match(
- signals: dict,
- *,
- embeddings_enabled: bool,
- title_threshold: float = DEFAULT_TITLE_THRESHOLD,
- jaccard_threshold: float = DEFAULT_JACCARD_THRESHOLD,
- ) -> tuple[bool, str, float]:
- """Decide whether two items should merge based on the strongest signal.
- Cascade: cosine (if enabled) → title → jaccard → consensus → dual.
- Returns (matched, signal_name, signal_value).
- """
- cosine_threshold = NEWS_EMBEDDING_SIMILARITY_THRESHOLD
- if embeddings_enabled and signals["cosine"] >= cosine_threshold:
- return True, "cosine", signals["cosine"]
- if signals["title"] >= title_threshold:
- return True, "title", signals["title"]
- if signals["jaccard"] >= jaccard_threshold:
- return True, "jaccard", signals["jaccard"]
- if (
- embeddings_enabled
- and signals["cosine"] >= 0.80
- and (signals["jaccard"] >= 0.30 or signals["title"] >= 0.55)
- ):
- val = (signals["cosine"] + max(signals["jaccard"], signals["title"])) / 2.0
- return True, "consensus", val
- # Dual-signal: medium title + medium jaccard → credible match even without
- # embeddings. Catches cross-source variants where headlines differ
- # editorially (title ~0.55-0.74) but share substantial token overlap
- # (jaccard ~0.25-0.54).
- if signals["title"] >= 0.55 and signals["jaccard"] >= 0.25:
- val = (signals["title"] + signals["jaccard"]) / 2.0
- return True, "dual", val
- return False, "none", 0.0
- # ---------------------------------------------------------------------------
- # Stable cluster ID
- # ---------------------------------------------------------------------------
- def _stable_cluster_id(topic: str, articles: List[Dict[str, Any]]) -> str:
- """Deterministic cluster ID derived from the sorted set of article keys.
- The topic is intentionally excluded from the hash: the same article may be
- classified under different topics across cycles (heuristic vs LLM-enriched),
- but it must always map to the same cluster_id so that ON CONFLICT DO UPDATE
- in upsert_clusters correctly merges them instead of creating duplicates."""
- keys = sorted(_article_key(a) for a in articles if _article_key(a))
- if not keys:
- # Degenerate fallback — single article with empty url and title
- return hashlib.sha1(topic.encode("utf-8")).hexdigest()
- seed = keys[0]
- return hashlib.sha1(seed.encode("utf-8")).hexdigest()
- # ---------------------------------------------------------------------------
- # Temporal gating
- # ---------------------------------------------------------------------------
- def _parse_ts(ts_str: str) -> datetime | None:
- if not ts_str:
- return None
- try:
- s = str(ts_str).replace("Z", "+00:00")
- dt = datetime.fromisoformat(s)
- if dt.tzinfo is None:
- dt = dt.replace(tzinfo=timezone.utc)
- return dt.astimezone(timezone.utc)
- except Exception:
- pass
- try:
- from email.utils import parsedate_to_datetime
- dt = parsedate_to_datetime(str(ts_str))
- if dt.tzinfo is None:
- dt = dt.replace(tzinfo=timezone.utc)
- return dt.astimezone(timezone.utc)
- except Exception:
- return None
- def _cluster_is_within_age_window(cluster: Dict[str, Any], *, max_age_hours: float) -> bool:
- """Return True if the cluster's last_updated is within the merge window."""
- if max_age_hours <= 0:
- return True # 0 = no limit
- ts_str = cluster.get("last_updated") or cluster.get("timestamp") or ""
- dt = _parse_ts(ts_str)
- if dt is None:
- return True # be lenient with unparseable timestamps
- cutoff = datetime.now(timezone.utc) - timedelta(hours=max_age_hours)
- return dt >= cutoff
- # ---------------------------------------------------------------------------
- # Embedding pre-computation (async internally)
- # ---------------------------------------------------------------------------
- async def _compute_embeddings_concurrently(
- articles: List[Dict[str, Any]],
- ) -> Dict[str, list[float] | None]:
- """Compute embeddings for unique article texts concurrently.
- Returns a cache dict: text -> embedding vector or None.
- """
- unique_texts: list[str] = []
- seen: set[str] = set()
- for a in articles:
- text = _cluster_text(a)
- if text and text not in seen:
- seen.add(text)
- unique_texts.append(text)
- emb_tasks = [ollama_embed(text) for text in unique_texts]
- emb_results = await asyncio.gather(*emb_tasks, return_exceptions=True)
- cache: Dict[str, list[float] | None] = {}
- for text, result in zip(unique_texts, emb_results):
- if isinstance(result, list):
- cache[text] = result
- else:
- cache[text] = None
- return cache
- def _compute_embeddings_sync(
- articles: List[Dict[str, Any]],
- ) -> Dict[str, list[float] | None]:
- """Synchronous wrapper that runs the async embedding computation.
- Handles three cases:
- 1. Already inside an async event loop (called from poller) -> schedule
- as a task and run it to completion on the running loop.
- 2. No event loop at all (plain sync caller) -> use asyncio.run().
- """
- try:
- loop = asyncio.get_running_loop()
- except RuntimeError:
- # No running loop — safe to use asyncio.run()
- return asyncio.run(_compute_embeddings_concurrently(articles))
- # We're inside a running event loop (e.g. the poller). Create a new loop
- # in a thread to avoid blocking.
- import concurrent.futures
- with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
- future = pool.submit(
- asyncio.run, _compute_embeddings_concurrently(articles)
- )
- return future.result()
- # ---------------------------------------------------------------------------
- # Orphan merge: detect clusters sharing articles and merge them
- # ---------------------------------------------------------------------------
- def _merge_orphan_clusters(
- clusters: List[Dict[str, Any]],
- ) -> List[Dict[str, Any]]:
- """Post-clustering pass: merge clusters that share article keys.
- This handles the case where two articles about the same event didn't match
- during the main loop (e.g. embeddings were temporarily unavailable) and
- ended up in separate clusters. If two clusters share >= 1 article key, we
- merge them into one (keeping the earlier first_seen, recompute the stable
- ID from the union of articles).
- """
- if len(clusters) <= 1:
- return clusters
- # Build index: article_key -> list of cluster indices
- key_to_indices: dict[str, list[int]] = {}
- for idx, c in enumerate(clusters):
- for a in c.get("articles", []) or []:
- ak = _article_key(a)
- if ak:
- key_to_indices.setdefault(ak, []).append(idx)
- # Find connected components via Union-Find
- parent = list(range(len(clusters)))
- def find(x: int) -> int:
- while parent[x] != x:
- parent[x] = parent[parent[x]]
- x = parent[x]
- return x
- def union(a: int, b: int) -> None:
- ra, rb = find(a), find(b)
- if ra != rb:
- parent[ra] = rb
- for indices in key_to_indices.values():
- for i in range(1, len(indices)):
- union(indices[0], indices[i])
- # Group clusters by component
- components: dict[int, list[int]] = {}
- for idx in range(len(clusters)):
- root = find(idx)
- components.setdefault(root, []).append(idx)
- merged: List[Dict[str, Any]] = []
- for root, members in components.items():
- if len(members) == 1:
- merged.append(clusters[members[0]])
- continue
- # Merge all clusters in this component
- base = dict(clusters[members[0]])
- all_articles: list[dict] = list(base.get("articles", []) or [])
- all_sources: list[str] = list(base.get("sources", []) or [])
- first_seen = base.get("first_seen", "")
- last_updated = base.get("last_updated", "")
- for m_idx in members[1:]:
- other = clusters[m_idx]
- existing_keys = {_article_key(a) for a in all_articles}
- for a in other.get("articles", []) or []:
- ak = _article_key(a)
- if ak not in existing_keys:
- all_articles.append(a)
- existing_keys.add(ak)
- for s in other.get("sources", []) or []:
- if s not in all_sources:
- all_sources.append(s)
- fs = other.get("first_seen", "")
- if fs and (not first_seen or fs < first_seen):
- first_seen = fs
- lu = other.get("last_updated", "")
- if lu and (not last_updated or lu > last_updated):
- last_updated = lu
- base["articles"] = all_articles
- base["sources"] = all_sources
- base["first_seen"] = first_seen
- base["last_updated"] = last_updated
- # Keep the base cluster's original ID so the enrichment cache
- # (keyed by cluster_id) survives the merge.
- base.setdefault("cluster_id", _stable_cluster_id(base.get("topic", "other"), all_articles))
- merged.append(base)
- return merged
- # ---------------------------------------------------------------------------
- # Public API (sync — backward compatible with tests)
- # ---------------------------------------------------------------------------
- def dedup_and_cluster_articles(
- articles: List[Dict[str, Any]],
- similarity_threshold: float | None = None,
- *,
- existing_clusters: List[Dict[str, Any]] | None = None,
- max_age_hours: float = 0,
- ) -> Dict[str, List[Dict[str, Any]]]:
- """Deduplicate raw articles into clusters keyed by topic.
- v1.3: stable cluster IDs, temporal gating, and orphan merge.
- Args:
- articles: new articles to cluster.
- similarity_threshold: override for the title-similarity threshold.
- existing_clusters: optional list of recent clusters from the DB to
- merge against (cross-cycle merge). When provided, temporal
- gating via max_age_hours is applied to filter these.
- max_age_hours: only compare against existing_clusters updated within
- this many hours. 0 = no limit (compare against all provided).
- """
- title_threshold = similarity_threshold if similarity_threshold is not None else DEFAULT_TITLE_THRESHOLD
- # Pre-compute embeddings concurrently (sync boundary handles async internally)
- embedding_cache: Dict[str, list[float] | None] = {}
- if NEWS_EMBEDDINGS_ENABLED:
- embedding_cache = _compute_embeddings_sync(articles)
- by_topic: Dict[str, List[Dict[str, Any]]] = {}
- # Seed with existing clusters (filtered by age window).
- # Re-derive the topic via the same heuristic (normalize_topic_from_title)
- # that new articles use, so that existing and new clusters with the same
- # headline land in the same by_topic bucket regardless of what LLM
- # enrichment previously stored on the cluster.
- if existing_clusters:
- for c in existing_clusters:
- if not _cluster_is_within_age_window(c, max_age_hours=max_age_hours):
- continue
- seed_title = c.get("headline") or ""
- topic = normalize_topic_from_title(seed_title) if seed_title else (c.get("topic", "other") or "other")
- by_topic.setdefault(topic, []).append(dict(c))
- for a in articles:
- title = a.get("title") or ""
- if not title:
- continue
- topic = normalize_topic_from_title(title)
- article_text = _cluster_text(a)
- article_embedding = embedding_cache.get(article_text) if NEWS_EMBEDDINGS_ENABLED else None
- a_with_emb = dict(a)
- if article_embedding is not None:
- a_with_emb["_embedding"] = article_embedding
- by_topic.setdefault(topic, [])
- clusters = by_topic[topic]
- best_idx: int | None = None
- best_signal_name = "none"
- best_signal_value = 0.0
- for idx, c in enumerate(clusters):
- sigs = _signals(a_with_emb, c)
- matched, signal_name, signal_value = _is_match(
- sigs,
- embeddings_enabled=NEWS_EMBEDDINGS_ENABLED,
- title_threshold=title_threshold,
- )
- if matched and signal_value > best_signal_value:
- best_idx = idx
- best_signal_name = signal_name
- best_signal_value = signal_value
- if best_idx is not None:
- c = clusters[best_idx]
- existing_keys = {_article_key(x) for x in c.get("articles", []) or []}
- if _article_key(a) not in existing_keys:
- c["articles"].append(a)
- if a.get("source") and a["source"] not in c["sources"]:
- c["sources"].append(a["source"])
- c["last_updated"] = max(str(c.get("last_updated", "")), str(a.get("timestamp", "")))
- # Update cluster embedding to the new article's embedding so later
- # comparisons can match against the most recently added content.
- if NEWS_EMBEDDINGS_ENABLED and article_embedding is not None:
- c["embedding"] = article_embedding
- c["embedding_model"] = "ollama:nomic-embed-text"
- c.setdefault("_merge_signals", []).append(
- {"signal": best_signal_name, "value": round(best_signal_value, 3)}
- )
- else:
- cid = _stable_cluster_id(topic, [a])
- cluster_embedding = article_embedding if NEWS_EMBEDDINGS_ENABLED else None
- clusters.append(
- {
- "cluster_id": cid,
- "headline": title,
- "summary": a.get("summary", ""),
- "topic": topic,
- "entities": [],
- "sentiment": "neutral",
- "importance": 0.0,
- "sources": [a["source"]] if a.get("source") else [],
- "timestamp": a.get("timestamp"),
- "articles": [a],
- "first_seen": a.get("timestamp"),
- "last_updated": a.get("timestamp"),
- "embedding": cluster_embedding,
- "embedding_model": "ollama:nomic-embed-text" if cluster_embedding else None,
- }
- )
- # Post-clustering passes per topic
- for topic, clusters in by_topic.items():
- # Merge orphans (clusters that share articles)
- clusters = _merge_orphan_clusters(clusters)
- # Assign stable IDs only to clusters that don't already have one.
- # Pre-seeded clusters from the DB carry their original cluster_id —
- # keeping it stable across cycles so the enrichment cache (keyed by
- # cluster_id) continues to work even after new articles are merged in.
- for c in clusters:
- if not c.get("cluster_id"):
- c["cluster_id"] = _stable_cluster_id(topic, c.get("articles", []) or [])
- by_topic[topic] = clusters
- # Cross-topic dedup: merge clusters with overlapping headlines and entities
- by_topic = _merge_duplicate_clusters(by_topic)
- # Strip the internal merge audit trail before returning
- for clusters in by_topic.values():
- for c in clusters:
- c.pop("_merge_signals", None)
- return {topic: clusters for topic, clusters in by_topic.items()}
- def _merge_duplicate_clusters(
- by_topic: Dict[str, List[Dict[str, Any]]],
- ) -> Dict[str, List[Dict[str, Any]]]:
- """Cross-topic dedup: merge clusters whose headlines and entities overlap.
- Catches the case where the same event arrives from different feeds with
- different article keys, lands in separate clusters with different stable
- IDs, but has nearly identical headlines and shared entities.
- Merge criteria: title_similarity >= 0.90 AND at least one shared entity.
- This is intentionally conservative to avoid merging distinct events.
- """
- # Flatten all clusters with their topic
- all_clusters: list[tuple[str, dict]] = []
- for topic, clusters in by_topic.items():
- for c in clusters:
- all_clusters.append((topic, c))
- n = len(all_clusters)
- if n <= 1:
- return by_topic
- # Union-Find
- parent = list(range(n))
- def find(x: int) -> int:
- while parent[x] != x:
- parent[x] = parent[parent[x]]
- x = parent[x]
- return x
- def union(a: int, b: int) -> None:
- ra, rb = find(a), find(b)
- if ra != rb:
- parent[ra] = rb
- # Pre-extract normalized entity sets for each cluster
- cluster_ent_sets: list[set[str]] = []
- cluster_heads: list[str] = []
- for _, c in all_clusters:
- ents = {str(e).strip().lower() for e in (c.get("entities", []) or []) if str(e).strip()}
- cluster_ent_sets.append(ents)
- cluster_heads.append(str(c.get("headline", "") or ""))
- # Compare pairs — O(n^2) but n is small (clusters per cycle, not articles)
- TITLE_THRESHOLD = 0.90
- for i in range(n):
- for j in range(i + 1, n):
- # Quick skip: if headlines are completely different, no need for entity check
- if _title_similarity(cluster_heads[i], cluster_heads[j]) < TITLE_THRESHOLD:
- continue
- # Check entity overlap (at least one shared entity)
- if not (cluster_ent_sets[i] & cluster_ent_sets[j]):
- continue
- union(i, j)
- # Group by component
- components: dict[int, list[int]] = {}
- for idx in range(n):
- root = find(idx)
- components.setdefault(root, []).append(idx)
- # Merge each component
- merged_by_topic: Dict[str, List[Dict[str, Any]]] = {}
- for root, members in components.items():
- # Pick the base cluster (the one with the most sources, then most articles)
- best_idx = max(members, key=lambda i: (
- len(all_clusters[i][1].get("sources", []) or []),
- len(all_clusters[i][1].get("articles", []) or []),
- ))
- base_topic, base = all_clusters[best_idx]
- if len(members) == 1:
- merged_by_topic.setdefault(base_topic, []).append(base)
- continue
- # Merge all clusters in this component into base
- all_articles: list[dict] = list(base.get("articles", []) or [])
- all_sources: list[str] = list(base.get("sources", []) or [])
- all_entities: list[str] = list(base.get("entities", []) or [])
- all_keywords: list[str] = list(base.get("keywords", []) or [])
- first_seen = base.get("first_seen", "")
- last_updated = base.get("last_updated", "")
- existing_article_keys = {_article_key(a) for a in all_articles}
- existing_ent_lower = {str(e).strip().lower() for e in all_entities}
- existing_kw_lower = {str(k).strip().lower() for k in all_keywords}
- for m_idx in members:
- if m_idx == best_idx:
- continue
- other = all_clusters[m_idx][1]
- # Merge articles (dedup by key)
- for a in other.get("articles", []) or []:
- ak = _article_key(a)
- if ak not in existing_article_keys:
- all_articles.append(a)
- existing_article_keys.add(ak)
- # Merge sources
- for s in other.get("sources", []) or []:
- if s not in all_sources:
- all_sources.append(s)
- # Merge entities (dedup case-insensitive)
- for e in other.get("entities", []) or []:
- el = str(e).strip().lower()
- if el not in existing_ent_lower:
- all_entities.append(e)
- existing_ent_lower.add(el)
- # Merge keywords (dedup case-insensitive)
- for k in other.get("keywords", []) or []:
- kl = str(k).strip().lower()
- if kl not in existing_kw_lower:
- all_keywords.append(k)
- existing_kw_lower.add(kl)
- # Timestamps
- fs = other.get("first_seen", "")
- if fs and (not first_seen or fs < first_seen):
- first_seen = fs
- lu = other.get("last_updated", "")
- if lu and (not last_updated or lu > last_updated):
- last_updated = lu
- base["articles"] = all_articles
- base["sources"] = all_sources
- base["entities"] = all_entities
- base["keywords"] = all_keywords
- base["first_seen"] = first_seen
- base["last_updated"] = last_updated
- # Keep the base cluster's original ID so the enrichment cache
- # (keyed by cluster_id) survives the merge.
- base.setdefault("cluster_id", _stable_cluster_id(base.get("topic", "other"), all_articles))
- merged_by_topic.setdefault(base_topic, []).append(base)
- return merged_by_topic
|