소스 검색

news-mcp: dedupe repeated article entries in clusters

Lukas Goldschmidt 1 개월 전
부모
커밋
70062b3ce4
4개의 변경된 파일129개의 추가작업 그리고 1개의 파일을 삭제
  1. 2 0
      PROJECT.md
  2. 12 0
      README.md
  3. 18 1
      news_mcp/dedup/cluster.py
  4. 97 0
      scripts/dedup_articles_in_clusters.py

+ 2 - 0
PROJECT.md

@@ -13,6 +13,7 @@ Provide a signal-extraction MCP server that converts RSS into **deduplicated, en
 - optional embeddings backfill script for precomputing cluster vectors in SQLite
 - optional merge-analysis script for threshold experiments before any DB rewrite
 - optional merge pass for destructive consolidation after threshold review
+- optional article-dedup cleanup for repeated article variants inside a cluster
 - Groq enrichment (topic/entities/sentiment/keywords)
 - Tools expose semantic queries over cached clusters
 
@@ -37,3 +38,4 @@ Provide a signal-extraction MCP server that converts RSS into **deduplicated, en
 - Embeddings backfill script exists for older cluster rows before the server restart
 - Merge-analysis script exists to inspect candidate cluster pairs at multiple thresholds
 - Merge pass exists for destructive consolidation once thresholds look sane
+- Article-dedup cleanup exists for fixing duplicated article records already in SQLite

+ 12 - 0
README.md

@@ -206,4 +206,16 @@ If the groupings look right, run wet:
 
 This merges embedding-similar clusters within the same topic and removes the
 absorbed duplicates from SQLite.
+
+## Article dedup cleanup (optional)
+
+Some stored clusters may contain repeated article entries for the same
+underlying article id / URL path. To clean existing rows:
+
+```bash
+./.venv/bin/python scripts/dedup_articles_in_clusters.py --dry-run
+./.venv/bin/python scripts/dedup_articles_in_clusters.py
+```
+
+The live clustering path also deduplicates article entries when new data comes in.
 ```

+ 18 - 1
news_mcp/dedup/cluster.py

@@ -8,6 +8,7 @@ from news_mcp.config import NEWS_EMBEDDINGS_ENABLED, NEWS_EMBEDDING_SIMILARITY_T
 
 import re
 from difflib import SequenceMatcher
+from urllib.parse import urlparse
 
 
 def _normalize_title(title: str) -> str:
@@ -22,6 +23,20 @@ def _title_similarity(a: str, b: str) -> float:
     return SequenceMatcher(None, _normalize_title(a), _normalize_title(b)).ratio()
 
 
+def _article_key(article: Dict[str, Any]) -> str:
+    url = str(article.get("url") or "").strip()
+    if not url:
+        return str(article.get("title") or "")
+    try:
+        parsed = urlparse(url)
+        parts = [p for p in parsed.path.split("/") if p]
+        if parts:
+            return parts[-1]
+    except Exception:
+        pass
+    return url
+
+
 def _cluster_text(a: Dict[str, Any]) -> str:
     parts = [a.get("title", ""), a.get("summary", "") or ""]
     return "\n".join(p for p in parts if p).strip()
@@ -84,7 +99,9 @@ def dedup_and_cluster_articles(
 
         if best_idx is not None and best_sim >= threshold:
             c = clusters[best_idx]
-            c["articles"].append(a)
+            existing_keys = {_article_key(x) for x in c.get("articles", []) or []}
+            if _article_key(a) not in existing_keys:
+                c["articles"].append(a)
             if a["source"] not in c["sources"]:
                 c["sources"].append(a["source"])
             c["last_updated"] = max(str(c["last_updated"]), str(a["timestamp"]))

+ 97 - 0
scripts/dedup_articles_in_clusters.py

@@ -0,0 +1,97 @@
+from __future__ import annotations
+
+"""Deduplicate article entries inside stored clusters.
+
+This cleans existing SQLite payloads so a cluster only keeps one article record
+per canonical article key (preferably the URL path/article id).
+
+Usage:
+  ./.venv/bin/python scripts/dedup_articles_in_clusters.py --dry-run
+  ./.venv/bin/python scripts/dedup_articles_in_clusters.py
+"""
+
+import argparse
+import json
+import sys
+from pathlib import Path
+from typing import Any
+from urllib.parse import urlparse
+
+ROOT = Path(__file__).resolve().parents[1]
+sys.path.insert(0, str(ROOT))
+
+from news_mcp.config import DB_PATH
+from news_mcp.storage.sqlite_store import SQLiteClusterStore
+
+
+def _article_key(article: dict[str, Any]) -> str:
+    url = str(article.get("url") or "").strip()
+    if not url:
+        return str(article.get("title") or "")
+    try:
+        parsed = urlparse(url)
+        parts = [p for p in parsed.path.split("/") if p]
+        if parts:
+            return parts[-1]
+    except Exception:
+        pass
+    return url
+
+
+def _dedup_articles(articles: list[dict[str, Any]]) -> list[dict[str, Any]]:
+    seen = set()
+    out = []
+    for article in articles:
+        key = _article_key(article)
+        if key in seen:
+            continue
+        seen.add(key)
+        out.append(article)
+    return out
+
+
+def main() -> None:
+    parser = argparse.ArgumentParser(description="Deduplicate article entries inside stored clusters")
+    parser.add_argument("--db", type=Path, default=DB_PATH)
+    parser.add_argument("--dry-run", action="store_true")
+    parser.add_argument("--limit", type=int, default=None)
+    args = parser.parse_args()
+
+    store = SQLiteClusterStore(args.db)
+    with store._conn() as conn:  # noqa: SLF001 - maintenance script
+        rows = conn.execute("SELECT cluster_id, topic, payload FROM clusters ORDER BY updated_at ASC").fetchall()
+
+    if args.limit is not None:
+        rows = rows[: args.limit]
+
+    total = 0
+    updated = 0
+
+    print(f"starting article dedup: clusters={len(rows)} dry_run={args.dry_run}")
+
+    for cluster_id, topic, payload_json in rows:
+        total += 1
+        try:
+            cluster = json.loads(payload_json)
+        except Exception:
+            continue
+
+        articles = cluster.get("articles", []) or []
+        deduped = _dedup_articles([a for a in articles if isinstance(a, dict)])
+        if len(deduped) == len(articles):
+            continue
+
+        cluster = dict(cluster)
+        cluster["articles"] = deduped
+        if not args.dry_run:
+            store.upsert_clusters([cluster], topic=topic or cluster.get("topic", "other"))
+        updated += 1
+
+        if updated % 25 == 0:
+            print(f"updated={updated} processed={total}")
+
+    print({"total_scanned": total, "updated": updated, "dry_run": args.dry_run})
+
+
+if __name__ == "__main__":
+    main()