|
@@ -3,11 +3,16 @@ from __future__ import annotations
|
|
|
import asyncio
|
|
import asyncio
|
|
|
import hashlib
|
|
import hashlib
|
|
|
import re
|
|
import re
|
|
|
|
|
+from datetime import datetime, timezone, timedelta
|
|
|
from difflib import SequenceMatcher
|
|
from difflib import SequenceMatcher
|
|
|
from typing import Any, Dict, List
|
|
from typing import Any, Dict, List
|
|
|
from urllib.parse import urlparse
|
|
from urllib.parse import urlparse
|
|
|
|
|
|
|
|
-from news_mcp.config import NEWS_EMBEDDINGS_ENABLED, NEWS_EMBEDDING_SIMILARITY_THRESHOLD
|
|
|
|
|
|
|
+from news_mcp.config import (
|
|
|
|
|
+ NEWS_EMBEDDINGS_ENABLED,
|
|
|
|
|
+ NEWS_EMBEDDING_SIMILARITY_THRESHOLD,
|
|
|
|
|
+ NEWS_CLUSTER_MAX_AGE_HOURS,
|
|
|
|
|
+)
|
|
|
from news_mcp.dedup.embedding_support import cosine_similarity, ollama_embed
|
|
from news_mcp.dedup.embedding_support import cosine_similarity, ollama_embed
|
|
|
from news_mcp.sources.news_feeds import normalize_topic_from_title
|
|
from news_mcp.sources.news_feeds import normalize_topic_from_title
|
|
|
|
|
|
|
@@ -117,6 +122,60 @@ def _is_match(signals: dict, *, embeddings_enabled: bool) -> tuple[bool, str, fl
|
|
|
return False, "none", 0.0
|
|
return False, "none", 0.0
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
|
|
+# Stable cluster ID
|
|
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
|
|
+
|
|
|
|
|
+def _stable_cluster_id(topic: str, articles: List[Dict[str, Any]]) -> str:
|
|
|
|
|
+ """Deterministic cluster ID derived from the topic and the sorted set of
|
|
|
|
|
+ article keys. Using the minimum key (lexicographic) as the seed ensures
|
|
|
|
|
+ that no matter which article arrives first, the same set of articles always
|
|
|
|
|
+ maps to the same cluster_id."""
|
|
|
|
|
+ keys = sorted(_article_key(a) for a in articles if _article_key(a))
|
|
|
|
|
+ if not keys:
|
|
|
|
|
+ # Degenerate fallback — single article with empty url and title
|
|
|
|
|
+ return hashlib.sha1(topic.encode("utf-8")).hexdigest()
|
|
|
|
|
+ seed = keys[0]
|
|
|
|
|
+ return hashlib.sha1(f"{topic}|{seed}".encode("utf-8")).hexdigest()
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
|
|
+# Temporal gating
|
|
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
|
|
+
|
|
|
|
|
+def _parse_ts(ts_str: str) -> datetime | None:
|
|
|
|
|
+ if not ts_str:
|
|
|
|
|
+ return None
|
|
|
|
|
+ try:
|
|
|
|
|
+ s = str(ts_str).replace("Z", "+00:00")
|
|
|
|
|
+ dt = datetime.fromisoformat(s)
|
|
|
|
|
+ if dt.tzinfo is None:
|
|
|
|
|
+ dt = dt.replace(tzinfo=timezone.utc)
|
|
|
|
|
+ return dt.astimezone(timezone.utc)
|
|
|
|
|
+ except Exception:
|
|
|
|
|
+ pass
|
|
|
|
|
+ try:
|
|
|
|
|
+ from email.utils import parsedate_to_datetime
|
|
|
|
|
+ dt = parsedate_to_datetime(str(ts_str))
|
|
|
|
|
+ if dt.tzinfo is None:
|
|
|
|
|
+ dt = dt.replace(tzinfo=timezone.utc)
|
|
|
|
|
+ return dt.astimezone(timezone.utc)
|
|
|
|
|
+ except Exception:
|
|
|
|
|
+ return None
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def _cluster_is_within_age_window(cluster: Dict[str, Any], *, max_age_hours: float) -> bool:
|
|
|
|
|
+ """Return True if the cluster's last_updated is within the merge window."""
|
|
|
|
|
+ if max_age_hours <= 0:
|
|
|
|
|
+ return True # 0 = no limit
|
|
|
|
|
+ ts_str = cluster.get("last_updated") or cluster.get("timestamp") or ""
|
|
|
|
|
+ dt = _parse_ts(ts_str)
|
|
|
|
|
+ if dt is None:
|
|
|
|
|
+ return True # be lenient with unparseable timestamps
|
|
|
|
|
+ cutoff = datetime.now(timezone.utc) - timedelta(hours=max_age_hours)
|
|
|
|
|
+ return dt >= cutoff
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
# ---------------------------------------------------------------------------
|
|
# ---------------------------------------------------------------------------
|
|
|
# Embedding pre-computation (async internally)
|
|
# Embedding pre-computation (async internally)
|
|
|
# ---------------------------------------------------------------------------
|
|
# ---------------------------------------------------------------------------
|
|
@@ -175,6 +234,97 @@ def _compute_embeddings_sync(
|
|
|
return future.result()
|
|
return future.result()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
|
|
+# Orphan merge: detect clusters sharing articles and merge them
|
|
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
|
|
+
|
|
|
|
|
+def _merge_orphan_clusters(
|
|
|
|
|
+ clusters: List[Dict[str, Any]],
|
|
|
|
|
+) -> List[Dict[str, Any]]:
|
|
|
|
|
+ """Post-clustering pass: merge clusters that share article keys.
|
|
|
|
|
+
|
|
|
|
|
+ This handles the case where two articles about the same event didn't match
|
|
|
|
|
+ during the main loop (e.g. embeddings were temporarily unavailable) and
|
|
|
|
|
+ ended up in separate clusters. If two clusters share >= 1 article key, we
|
|
|
|
|
+ merge them into one (keeping the earlier first_seen, recompute the stable
|
|
|
|
|
+ ID from the union of articles).
|
|
|
|
|
+ """
|
|
|
|
|
+ if len(clusters) <= 1:
|
|
|
|
|
+ return clusters
|
|
|
|
|
+
|
|
|
|
|
+ # Build index: article_key -> list of cluster indices
|
|
|
|
|
+ key_to_indices: dict[str, list[int]] = {}
|
|
|
|
|
+ for idx, c in enumerate(clusters):
|
|
|
|
|
+ for a in c.get("articles", []) or []:
|
|
|
|
|
+ ak = _article_key(a)
|
|
|
|
|
+ if ak:
|
|
|
|
|
+ key_to_indices.setdefault(ak, []).append(idx)
|
|
|
|
|
+
|
|
|
|
|
+ # Find connected components via Union-Find
|
|
|
|
|
+ parent = list(range(len(clusters)))
|
|
|
|
|
+
|
|
|
|
|
+ def find(x: int) -> int:
|
|
|
|
|
+ while parent[x] != x:
|
|
|
|
|
+ parent[x] = parent[parent[x]]
|
|
|
|
|
+ x = parent[x]
|
|
|
|
|
+ return x
|
|
|
|
|
+
|
|
|
|
|
+ def union(a: int, b: int) -> None:
|
|
|
|
|
+ ra, rb = find(a), find(b)
|
|
|
|
|
+ if ra != rb:
|
|
|
|
|
+ parent[ra] = rb
|
|
|
|
|
+
|
|
|
|
|
+ for indices in key_to_indices.values():
|
|
|
|
|
+ for i in range(1, len(indices)):
|
|
|
|
|
+ union(indices[0], indices[i])
|
|
|
|
|
+
|
|
|
|
|
+ # Group clusters by component
|
|
|
|
|
+ components: dict[int, list[int]] = {}
|
|
|
|
|
+ for idx in range(len(clusters)):
|
|
|
|
|
+ root = find(idx)
|
|
|
|
|
+ components.setdefault(root, []).append(idx)
|
|
|
|
|
+
|
|
|
|
|
+ merged: List[Dict[str, Any]] = []
|
|
|
|
|
+ for root, members in components.items():
|
|
|
|
|
+ if len(members) == 1:
|
|
|
|
|
+ merged.append(clusters[members[0]])
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ # Merge all clusters in this component
|
|
|
|
|
+ base = dict(clusters[members[0]])
|
|
|
|
|
+ all_articles: list[dict] = list(base.get("articles", []) or [])
|
|
|
|
|
+ all_sources: list[str] = list(base.get("sources", []) or [])
|
|
|
|
|
+ first_seen = base.get("first_seen", "")
|
|
|
|
|
+ last_updated = base.get("last_updated", "")
|
|
|
|
|
+
|
|
|
|
|
+ for m_idx in members[1:]:
|
|
|
|
|
+ other = clusters[m_idx]
|
|
|
|
|
+ existing_keys = {_article_key(a) for a in all_articles}
|
|
|
|
|
+ for a in other.get("articles", []) or []:
|
|
|
|
|
+ ak = _article_key(a)
|
|
|
|
|
+ if ak not in existing_keys:
|
|
|
|
|
+ all_articles.append(a)
|
|
|
|
|
+ existing_keys.add(ak)
|
|
|
|
|
+ for s in other.get("sources", []) or []:
|
|
|
|
|
+ if s not in all_sources:
|
|
|
|
|
+ all_sources.append(s)
|
|
|
|
|
+ fs = other.get("first_seen", "")
|
|
|
|
|
+ if fs and (not first_seen or fs < first_seen):
|
|
|
|
|
+ first_seen = fs
|
|
|
|
|
+ lu = other.get("last_updated", "")
|
|
|
|
|
+ if lu and (not last_updated or lu > last_updated):
|
|
|
|
|
+ last_updated = lu
|
|
|
|
|
+
|
|
|
|
|
+ base["articles"] = all_articles
|
|
|
|
|
+ base["sources"] = all_sources
|
|
|
|
|
+ base["first_seen"] = first_seen
|
|
|
|
|
+ base["last_updated"] = last_updated
|
|
|
|
|
+ base["cluster_id"] = _stable_cluster_id(base.get("topic", "other"), all_articles)
|
|
|
|
|
+ merged.append(base)
|
|
|
|
|
+
|
|
|
|
|
+ return merged
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
# ---------------------------------------------------------------------------
|
|
# ---------------------------------------------------------------------------
|
|
|
# Public API (sync — backward compatible with tests)
|
|
# Public API (sync — backward compatible with tests)
|
|
|
# ---------------------------------------------------------------------------
|
|
# ---------------------------------------------------------------------------
|
|
@@ -183,16 +333,22 @@ def _compute_embeddings_sync(
|
|
|
def dedup_and_cluster_articles(
|
|
def dedup_and_cluster_articles(
|
|
|
articles: List[Dict[str, Any]],
|
|
articles: List[Dict[str, Any]],
|
|
|
similarity_threshold: float | None = None,
|
|
similarity_threshold: float | None = None,
|
|
|
|
|
+ *,
|
|
|
|
|
+ existing_clusters: List[Dict[str, Any]] | None = None,
|
|
|
|
|
+ max_age_hours: float = 0,
|
|
|
) -> Dict[str, List[Dict[str, Any]]]:
|
|
) -> Dict[str, List[Dict[str, Any]]]:
|
|
|
"""Deduplicate raw articles into clusters keyed by topic.
|
|
"""Deduplicate raw articles into clusters keyed by topic.
|
|
|
|
|
|
|
|
- v1.2: embedding pre-computation is async/concurrent under the hood, but
|
|
|
|
|
- this public function remains synchronous for backward compatibility.
|
|
|
|
|
|
|
+ v1.3: stable cluster IDs, temporal gating, and orphan merge.
|
|
|
|
|
|
|
|
- A pair merges if ANY signal clears its threshold:
|
|
|
|
|
- * title fuzzy ratio
|
|
|
|
|
- * token Jaccard over headline+summary
|
|
|
|
|
- * Ollama embedding cosine when available
|
|
|
|
|
|
|
+ Args:
|
|
|
|
|
+ articles: new articles to cluster.
|
|
|
|
|
+ similarity_threshold: override for the title-similarity threshold.
|
|
|
|
|
+ existing_clusters: optional list of recent clusters from the DB to
|
|
|
|
|
+ merge against (cross-cycle merge). When provided, temporal
|
|
|
|
|
+ gating via max_age_hours is applied to filter these.
|
|
|
|
|
+ max_age_hours: only compare against existing_clusters updated within
|
|
|
|
|
+ this many hours. 0 = no limit (compare against all provided).
|
|
|
"""
|
|
"""
|
|
|
|
|
|
|
|
title_threshold = similarity_threshold if similarity_threshold is not None else DEFAULT_TITLE_THRESHOLD
|
|
title_threshold = similarity_threshold if similarity_threshold is not None else DEFAULT_TITLE_THRESHOLD
|
|
@@ -204,6 +360,14 @@ def dedup_and_cluster_articles(
|
|
|
|
|
|
|
|
by_topic: Dict[str, List[Dict[str, Any]]] = {}
|
|
by_topic: Dict[str, List[Dict[str, Any]]] = {}
|
|
|
|
|
|
|
|
|
|
+ # Seed with existing clusters (filtered by age window)
|
|
|
|
|
+ if existing_clusters:
|
|
|
|
|
+ for c in existing_clusters:
|
|
|
|
|
+ if not _cluster_is_within_age_window(c, max_age_hours=max_age_hours):
|
|
|
|
|
+ continue
|
|
|
|
|
+ topic = c.get("topic", "other") or "other"
|
|
|
|
|
+ by_topic.setdefault(topic, []).append(dict(c))
|
|
|
|
|
+
|
|
|
for a in articles:
|
|
for a in articles:
|
|
|
title = a.get("title") or ""
|
|
title = a.get("title") or ""
|
|
|
if not title:
|
|
if not title:
|
|
@@ -262,8 +426,7 @@ def dedup_and_cluster_articles(
|
|
|
{"signal": best_signal_name, "value": round(best_signal_value, 3)}
|
|
{"signal": best_signal_name, "value": round(best_signal_value, 3)}
|
|
|
)
|
|
)
|
|
|
else:
|
|
else:
|
|
|
- key = f"{topic}|{_normalize_title(title)}"
|
|
|
|
|
- cid = hashlib.sha1(key.encode("utf-8")).hexdigest()
|
|
|
|
|
|
|
+ cid = _stable_cluster_id(topic, [a])
|
|
|
cluster_embedding = article_embedding if NEWS_EMBEDDINGS_ENABLED else None
|
|
cluster_embedding = article_embedding if NEWS_EMBEDDINGS_ENABLED else None
|
|
|
clusters.append(
|
|
clusters.append(
|
|
|
{
|
|
{
|
|
@@ -284,6 +447,15 @@ def dedup_and_cluster_articles(
|
|
|
}
|
|
}
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
|
|
+ # Post-clustering passes per topic
|
|
|
|
|
+ for topic, clusters in by_topic.items():
|
|
|
|
|
+ # Merge orphans (clusters that share articles)
|
|
|
|
|
+ clusters = _merge_orphan_clusters(clusters)
|
|
|
|
|
+ # Recompute stable IDs from the final article sets
|
|
|
|
|
+ for c in clusters:
|
|
|
|
|
+ c["cluster_id"] = _stable_cluster_id(topic, c.get("articles", []) or [])
|
|
|
|
|
+ by_topic[topic] = clusters
|
|
|
|
|
+
|
|
|
# Strip the internal merge audit trail before returning
|
|
# Strip the internal merge audit trail before returning
|
|
|
for clusters in by_topic.values():
|
|
for clusters in by_topic.values():
|
|
|
for c in clusters:
|
|
for c in clusters:
|