| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292 |
- from __future__ import annotations
- import asyncio
- import hashlib
- import re
- from difflib import SequenceMatcher
- from typing import Any, Dict, List
- from urllib.parse import urlparse
- from news_mcp.config import NEWS_EMBEDDINGS_ENABLED, NEWS_EMBEDDING_SIMILARITY_THRESHOLD
- from news_mcp.dedup.embedding_support import cosine_similarity, ollama_embed
- from news_mcp.sources.news_feeds import normalize_topic_from_title
- # ---------------------------------------------------------------------------
- # Text helpers
- # ---------------------------------------------------------------------------
- def _normalize_title(title: str) -> str:
- t = title.lower().strip()
- t = re.sub(r"[^a-z0-9\s]", " ", t)
- t = re.sub(r"\s+", " ", t).strip()
- return t
- def _title_similarity(a: str, b: str) -> float:
- return SequenceMatcher(None, _normalize_title(a), _normalize_title(b)).ratio()
- def _article_key(article: Dict[str, Any]) -> str:
- url = str(article.get("url") or "").strip()
- if not url:
- return str(article.get("title") or "")
- try:
- parsed = urlparse(url)
- parts = [p for p in parsed.path.split("/") if p]
- if parts:
- return parts[-1]
- except Exception:
- pass
- return url
- def _cluster_text(a: Dict[str, Any]) -> str:
- parts = [a.get("title", ""), a.get("summary", "") or ""]
- return "\n".join(p for p in parts if p).strip()
- # ---------------------------------------------------------------------------
- # Token / Jaccard signal
- # ---------------------------------------------------------------------------
- _STOPWORDS = frozenset(
- {
- "a", "an", "the", "of", "to", "in", "on", "at", "for", "by", "with",
- "and", "or", "but", "if", "is", "are", "was", "were", "be", "been",
- "being", "as", "from", "that", "this", "these", "those", "it", "its",
- "into", "over", "under", "than", "then", "so", "such", "no", "not",
- "do", "does", "did", "will", "would", "can", "could", "should", "may",
- "might", "has", "have", "had", "after", "before", "amid", "vs", "via",
- "us", "uk",
- }
- )
- def _tokens(text: str) -> set[str]:
- tokens = re.findall(r"[a-z0-9][a-z0-9\-]+", text.lower())
- return {t for t in tokens if len(t) >= 3 and t not in _STOPWORDS}
- def _jaccard(a: set, b: set) -> float:
- if not a or not b:
- return 0.0
- inter = len(a & b)
- if inter == 0:
- return 0.0
- return inter / len(a | b)
- # ---------------------------------------------------------------------------
- # Composite similarity
- # ---------------------------------------------------------------------------
- DEFAULT_TITLE_THRESHOLD = 0.87
- DEFAULT_JACCARD_THRESHOLD = 0.55
- def _signals(article: Dict[str, Any], cluster: Dict[str, Any]) -> dict:
- """Per-pair similarity signals (title, jaccard, embedding cosine)."""
- a_title = str(article.get("title") or "")
- c_title = str(cluster.get("headline") or "")
- title_sim = _title_similarity(a_title, c_title) if a_title and c_title else 0.0
- a_text = _cluster_text(article)
- c_text_seed = (cluster.get("articles") or [{}])[0]
- c_text = _cluster_text(c_text_seed) if c_text_seed else c_title
- jaccard = _jaccard(_tokens(a_text), _tokens(c_text)) if a_text and c_text else 0.0
- a_emb = article.get("_embedding")
- c_emb = cluster.get("embedding")
- cosine = cosine_similarity(a_emb, c_emb) if a_emb and c_emb else 0.0
- return {"title": title_sim, "jaccard": jaccard, "cosine": cosine}
- def _is_match(signals: dict, *, embeddings_enabled: bool) -> tuple[bool, str, float]:
- """Decide whether two items should merge based on the strongest signal."""
- cosine_threshold = NEWS_EMBEDDING_SIMILARITY_THRESHOLD
- if embeddings_enabled and signals["cosine"] >= cosine_threshold:
- return True, "cosine", signals["cosine"]
- if signals["title"] >= DEFAULT_TITLE_THRESHOLD:
- return True, "title", signals["title"]
- if signals["jaccard"] >= DEFAULT_JACCARD_THRESHOLD:
- return True, "jaccard", signals["jaccard"]
- return False, "none", 0.0
- # ---------------------------------------------------------------------------
- # Embedding pre-computation (async internally)
- # ---------------------------------------------------------------------------
- async def _compute_embeddings_concurrently(
- articles: List[Dict[str, Any]],
- ) -> Dict[str, list[float] | None]:
- """Compute embeddings for unique article texts concurrently.
- Returns a cache dict: text -> embedding vector or None.
- """
- unique_texts: list[str] = []
- seen: set[str] = set()
- for a in articles:
- text = _cluster_text(a)
- if text and text not in seen:
- seen.add(text)
- unique_texts.append(text)
- emb_tasks = [ollama_embed(text) for text in unique_texts]
- emb_results = await asyncio.gather(*emb_tasks, return_exceptions=True)
- cache: Dict[str, list[float] | None] = {}
- for text, result in zip(unique_texts, emb_results):
- if isinstance(result, list):
- cache[text] = result
- else:
- cache[text] = None
- return cache
- def _compute_embeddings_sync(
- articles: List[Dict[str, Any]],
- ) -> Dict[str, list[float] | None]:
- """Synchronous wrapper that runs the async embedding computation.
- Handles three cases:
- 1. Already inside an async event loop (called from poller) -> schedule
- as a task and run it to completion on the running loop.
- 2. No event loop at all (plain sync caller) -> use asyncio.run().
- """
- try:
- loop = asyncio.get_running_loop()
- except RuntimeError:
- # No running loop — safe to use asyncio.run()
- return asyncio.run(_compute_embeddings_concurrently(articles))
- # We're inside a running event loop (e.g. the poller). Create a new loop
- # in a thread to avoid blocking.
- import concurrent.futures
- with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
- future = pool.submit(
- asyncio.run, _compute_embeddings_concurrently(articles)
- )
- return future.result()
- # ---------------------------------------------------------------------------
- # Public API (sync — backward compatible with tests)
- # ---------------------------------------------------------------------------
- def dedup_and_cluster_articles(
- articles: List[Dict[str, Any]],
- similarity_threshold: float | None = None,
- ) -> Dict[str, List[Dict[str, Any]]]:
- """Deduplicate raw articles into clusters keyed by topic.
- v1.2: embedding pre-computation is async/concurrent under the hood, but
- this public function remains synchronous for backward compatibility.
- A pair merges if ANY signal clears its threshold:
- * title fuzzy ratio
- * token Jaccard over headline+summary
- * Ollama embedding cosine when available
- """
- title_threshold = similarity_threshold if similarity_threshold is not None else DEFAULT_TITLE_THRESHOLD
- # Pre-compute embeddings concurrently (sync boundary handles async internally)
- embedding_cache: Dict[str, list[float] | None] = {}
- if NEWS_EMBEDDINGS_ENABLED:
- embedding_cache = _compute_embeddings_sync(articles)
- by_topic: Dict[str, List[Dict[str, Any]]] = {}
- for a in articles:
- title = a.get("title") or ""
- if not title:
- continue
- topic = normalize_topic_from_title(title)
- article_text = _cluster_text(a)
- article_embedding = embedding_cache.get(article_text) if NEWS_EMBEDDINGS_ENABLED else None
- a_with_emb = dict(a)
- if article_embedding is not None:
- a_with_emb["_embedding"] = article_embedding
- by_topic.setdefault(topic, [])
- clusters = by_topic[topic]
- best_idx: int | None = None
- best_signal_name = "none"
- best_signal_value = 0.0
- for idx, c in enumerate(clusters):
- sigs = _signals(a_with_emb, c)
- local_match = False
- if NEWS_EMBEDDINGS_ENABLED and sigs["cosine"] >= NEWS_EMBEDDING_SIMILARITY_THRESHOLD:
- local_match = True
- signal_name, signal_value = "cosine", sigs["cosine"]
- elif sigs["title"] >= title_threshold:
- local_match = True
- signal_name, signal_value = "title", sigs["title"]
- elif sigs["jaccard"] >= DEFAULT_JACCARD_THRESHOLD:
- local_match = True
- signal_name, signal_value = "jaccard", sigs["jaccard"]
- elif (
- NEWS_EMBEDDINGS_ENABLED
- and sigs["cosine"] >= 0.80
- and (sigs["jaccard"] >= 0.30 or sigs["title"] >= 0.55)
- ):
- local_match = True
- signal_name = "consensus"
- signal_value = (sigs["cosine"] + max(sigs["jaccard"], sigs["title"])) / 2.0
- else:
- signal_name, signal_value = "none", max(sigs["title"], sigs["jaccard"], sigs["cosine"])
- if local_match and signal_value > best_signal_value:
- best_idx = idx
- best_signal_name = signal_name
- best_signal_value = signal_value
- if best_idx is not None:
- c = clusters[best_idx]
- existing_keys = {_article_key(x) for x in c.get("articles", []) or []}
- if _article_key(a) not in existing_keys:
- c["articles"].append(a)
- if a.get("source") and a["source"] not in c["sources"]:
- c["sources"].append(a["source"])
- c["last_updated"] = max(str(c.get("last_updated", "")), str(a.get("timestamp", "")))
- c.setdefault("_merge_signals", []).append(
- {"signal": best_signal_name, "value": round(best_signal_value, 3)}
- )
- else:
- key = f"{topic}|{_normalize_title(title)}"
- cid = hashlib.sha1(key.encode("utf-8")).hexdigest()
- cluster_embedding = article_embedding if NEWS_EMBEDDINGS_ENABLED else None
- clusters.append(
- {
- "cluster_id": cid,
- "headline": title,
- "summary": a.get("summary", ""),
- "topic": topic,
- "entities": [],
- "sentiment": "neutral",
- "importance": 0.0,
- "sources": [a["source"]] if a.get("source") else [],
- "timestamp": a.get("timestamp"),
- "articles": [a],
- "first_seen": a.get("timestamp"),
- "last_updated": a.get("timestamp"),
- "embedding": cluster_embedding,
- "embedding_model": "ollama:nomic-embed-text" if cluster_embedding else None,
- }
- )
- # Strip the internal merge audit trail before returning
- for clusters in by_topic.values():
- for c in clusters:
- c.pop("_merge_signals", None)
- return {topic: clusters for topic, clusters in by_topic.items()}
|