|
@@ -493,9 +493,144 @@ def dedup_and_cluster_articles(
|
|
|
c["cluster_id"] = _stable_cluster_id(topic, c.get("articles", []) or [])
|
|
c["cluster_id"] = _stable_cluster_id(topic, c.get("articles", []) or [])
|
|
|
by_topic[topic] = clusters
|
|
by_topic[topic] = clusters
|
|
|
|
|
|
|
|
|
|
+ # Cross-topic dedup: merge clusters with overlapping headlines and entities
|
|
|
|
|
+ by_topic = _merge_duplicate_clusters(by_topic)
|
|
|
|
|
+
|
|
|
# Strip the internal merge audit trail before returning
|
|
# Strip the internal merge audit trail before returning
|
|
|
for clusters in by_topic.values():
|
|
for clusters in by_topic.values():
|
|
|
for c in clusters:
|
|
for c in clusters:
|
|
|
c.pop("_merge_signals", None)
|
|
c.pop("_merge_signals", None)
|
|
|
|
|
|
|
|
return {topic: clusters for topic, clusters in by_topic.items()}
|
|
return {topic: clusters for topic, clusters in by_topic.items()}
|
|
|
|
|
+
|
|
|
|
|
+def _merge_duplicate_clusters(
|
|
|
|
|
+ by_topic: Dict[str, List[Dict[str, Any]]],
|
|
|
|
|
+) -> Dict[str, List[Dict[str, Any]]]:
|
|
|
|
|
+ """Cross-topic dedup: merge clusters whose headlines and entities overlap.
|
|
|
|
|
+
|
|
|
|
|
+ Catches the case where the same event arrives from different feeds with
|
|
|
|
|
+ different article keys, lands in separate clusters with different stable
|
|
|
|
|
+ IDs, but has nearly identical headlines and shared entities.
|
|
|
|
|
+
|
|
|
|
|
+ Merge criteria: title_similarity >= 0.90 AND at least one shared entity.
|
|
|
|
|
+ This is intentionally conservative to avoid merging distinct events.
|
|
|
|
|
+ """
|
|
|
|
|
+ # Flatten all clusters with their topic
|
|
|
|
|
+ all_clusters: list[tuple[str, dict]] = []
|
|
|
|
|
+ for topic, clusters in by_topic.items():
|
|
|
|
|
+ for c in clusters:
|
|
|
|
|
+ all_clusters.append((topic, c))
|
|
|
|
|
+
|
|
|
|
|
+ n = len(all_clusters)
|
|
|
|
|
+ if n <= 1:
|
|
|
|
|
+ return by_topic
|
|
|
|
|
+
|
|
|
|
|
+ # Union-Find
|
|
|
|
|
+ parent = list(range(n))
|
|
|
|
|
+
|
|
|
|
|
+ def find(x: int) -> int:
|
|
|
|
|
+ while parent[x] != x:
|
|
|
|
|
+ parent[x] = parent[parent[x]]
|
|
|
|
|
+ x = parent[x]
|
|
|
|
|
+ return x
|
|
|
|
|
+
|
|
|
|
|
+ def union(a: int, b: int) -> None:
|
|
|
|
|
+ ra, rb = find(a), find(b)
|
|
|
|
|
+ if ra != rb:
|
|
|
|
|
+ parent[ra] = rb
|
|
|
|
|
+
|
|
|
|
|
+ # Pre-extract normalized entity sets for each cluster
|
|
|
|
|
+ cluster_ent_sets: list[set[str]] = []
|
|
|
|
|
+ cluster_heads: list[str] = []
|
|
|
|
|
+ for _, c in all_clusters:
|
|
|
|
|
+ ents = {str(e).strip().lower() for e in (c.get("entities", []) or []) if str(e).strip()}
|
|
|
|
|
+ cluster_ent_sets.append(ents)
|
|
|
|
|
+ cluster_heads.append(str(c.get("headline", "") or ""))
|
|
|
|
|
+
|
|
|
|
|
+ # Compare pairs — O(n^2) but n is small (clusters per cycle, not articles)
|
|
|
|
|
+ TITLE_THRESHOLD = 0.90
|
|
|
|
|
+ for i in range(n):
|
|
|
|
|
+ for j in range(i + 1, n):
|
|
|
|
|
+ # Quick skip: if headlines are completely different, no need for entity check
|
|
|
|
|
+ if _title_similarity(cluster_heads[i], cluster_heads[j]) < TITLE_THRESHOLD:
|
|
|
|
|
+ continue
|
|
|
|
|
+ # Check entity overlap (at least one shared entity)
|
|
|
|
|
+ if not (cluster_ent_sets[i] & cluster_ent_sets[j]):
|
|
|
|
|
+ continue
|
|
|
|
|
+ union(i, j)
|
|
|
|
|
+
|
|
|
|
|
+ # Group by component
|
|
|
|
|
+ components: dict[int, list[int]] = {}
|
|
|
|
|
+ for idx in range(n):
|
|
|
|
|
+ root = find(idx)
|
|
|
|
|
+ components.setdefault(root, []).append(idx)
|
|
|
|
|
+
|
|
|
|
|
+ # Merge each component
|
|
|
|
|
+ merged_by_topic: Dict[str, List[Dict[str, Any]]] = {}
|
|
|
|
|
+ for root, members in components.items():
|
|
|
|
|
+ # Pick the base cluster (the one with the most sources, then most articles)
|
|
|
|
|
+ best_idx = max(members, key=lambda i: (
|
|
|
|
|
+ len(all_clusters[i][1].get("sources", []) or []),
|
|
|
|
|
+ len(all_clusters[i][1].get("articles", []) or []),
|
|
|
|
|
+ ))
|
|
|
|
|
+ base_topic, base = all_clusters[best_idx]
|
|
|
|
|
+
|
|
|
|
|
+ if len(members) == 1:
|
|
|
|
|
+ merged_by_topic.setdefault(base_topic, []).append(base)
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ # Merge all clusters in this component into base
|
|
|
|
|
+ all_articles: list[dict] = list(base.get("articles", []) or [])
|
|
|
|
|
+ all_sources: list[str] = list(base.get("sources", []) or [])
|
|
|
|
|
+ all_entities: list[str] = list(base.get("entities", []) or [])
|
|
|
|
|
+ all_keywords: list[str] = list(base.get("keywords", []) or [])
|
|
|
|
|
+ first_seen = base.get("first_seen", "")
|
|
|
|
|
+ last_updated = base.get("last_updated", "")
|
|
|
|
|
+ existing_article_keys = {_article_key(a) for a in all_articles}
|
|
|
|
|
+ existing_ent_lower = {str(e).strip().lower() for e in all_entities}
|
|
|
|
|
+ existing_kw_lower = {str(k).strip().lower() for k in all_keywords}
|
|
|
|
|
+
|
|
|
|
|
+ for m_idx in members:
|
|
|
|
|
+ if m_idx == best_idx:
|
|
|
|
|
+ continue
|
|
|
|
|
+ other = all_clusters[m_idx][1]
|
|
|
|
|
+ # Merge articles (dedup by key)
|
|
|
|
|
+ for a in other.get("articles", []) or []:
|
|
|
|
|
+ ak = _article_key(a)
|
|
|
|
|
+ if ak not in existing_article_keys:
|
|
|
|
|
+ all_articles.append(a)
|
|
|
|
|
+ existing_article_keys.add(ak)
|
|
|
|
|
+ # Merge sources
|
|
|
|
|
+ for s in other.get("sources", []) or []:
|
|
|
|
|
+ if s not in all_sources:
|
|
|
|
|
+ all_sources.append(s)
|
|
|
|
|
+ # Merge entities (dedup case-insensitive)
|
|
|
|
|
+ for e in other.get("entities", []) or []:
|
|
|
|
|
+ el = str(e).strip().lower()
|
|
|
|
|
+ if el not in existing_ent_lower:
|
|
|
|
|
+ all_entities.append(e)
|
|
|
|
|
+ existing_ent_lower.add(el)
|
|
|
|
|
+ # Merge keywords (dedup case-insensitive)
|
|
|
|
|
+ for k in other.get("keywords", []) or []:
|
|
|
|
|
+ kl = str(k).strip().lower()
|
|
|
|
|
+ if kl not in existing_kw_lower:
|
|
|
|
|
+ all_keywords.append(k)
|
|
|
|
|
+ existing_kw_lower.add(kl)
|
|
|
|
|
+ # Timestamps
|
|
|
|
|
+ fs = other.get("first_seen", "")
|
|
|
|
|
+ if fs and (not first_seen or fs < first_seen):
|
|
|
|
|
+ first_seen = fs
|
|
|
|
|
+ lu = other.get("last_updated", "")
|
|
|
|
|
+ if lu and (not last_updated or lu > last_updated):
|
|
|
|
|
+ last_updated = lu
|
|
|
|
|
+
|
|
|
|
|
+ base["articles"] = all_articles
|
|
|
|
|
+ base["sources"] = all_sources
|
|
|
|
|
+ base["entities"] = all_entities
|
|
|
|
|
+ base["keywords"] = all_keywords
|
|
|
|
|
+ base["first_seen"] = first_seen
|
|
|
|
|
+ base["last_updated"] = last_updated
|
|
|
|
|
+ base["cluster_id"] = _stable_cluster_id(base.get("topic", "other"), all_articles)
|
|
|
|
|
+ merged_by_topic.setdefault(base_topic, []).append(base)
|
|
|
|
|
+
|
|
|
|
|
+ return merged_by_topic
|