| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697 |
- from __future__ import annotations
- """Deduplicate article entries inside stored clusters.
- This cleans existing SQLite payloads so a cluster only keeps one article record
- per canonical article key (preferably the URL path/article id).
- Usage:
- ./.venv/bin/python scripts/dedup_articles_in_clusters.py --dry-run
- ./.venv/bin/python scripts/dedup_articles_in_clusters.py
- """
- import argparse
- import json
- import sys
- from pathlib import Path
- from typing import Any
- from urllib.parse import urlparse
- ROOT = Path(__file__).resolve().parents[1]
- sys.path.insert(0, str(ROOT))
- from news_mcp.config import DB_PATH
- from news_mcp.storage.sqlite_store import SQLiteClusterStore
- def _article_key(article: dict[str, Any]) -> str:
- url = str(article.get("url") or "").strip()
- if not url:
- return str(article.get("title") or "")
- try:
- parsed = urlparse(url)
- parts = [p for p in parsed.path.split("/") if p]
- if parts:
- return parts[-1]
- except Exception:
- pass
- return url
- def _dedup_articles(articles: list[dict[str, Any]]) -> list[dict[str, Any]]:
- seen = set()
- out = []
- for article in articles:
- key = _article_key(article)
- if key in seen:
- continue
- seen.add(key)
- out.append(article)
- return out
- def main() -> None:
- parser = argparse.ArgumentParser(description="Deduplicate article entries inside stored clusters")
- parser.add_argument("--db", type=Path, default=DB_PATH)
- parser.add_argument("--dry-run", action="store_true")
- parser.add_argument("--limit", type=int, default=None)
- args = parser.parse_args()
- store = SQLiteClusterStore(args.db)
- with store._conn() as conn: # noqa: SLF001 - maintenance script
- rows = conn.execute("SELECT cluster_id, topic, payload FROM clusters ORDER BY updated_at ASC").fetchall()
- if args.limit is not None:
- rows = rows[: args.limit]
- total = 0
- updated = 0
- print(f"starting article dedup: clusters={len(rows)} dry_run={args.dry_run}")
- for cluster_id, topic, payload_json in rows:
- total += 1
- try:
- cluster = json.loads(payload_json)
- except Exception:
- continue
- articles = cluster.get("articles", []) or []
- deduped = _dedup_articles([a for a in articles if isinstance(a, dict)])
- if len(deduped) == len(articles):
- continue
- cluster = dict(cluster)
- cluster["articles"] = deduped
- if not args.dry_run:
- store.upsert_clusters([cluster], topic=topic or cluster.get("topic", "other"))
- updated += 1
- if updated % 25 == 0:
- print(f"updated={updated} processed={total}")
- print({"total_scanned": total, "updated": updated, "dry_run": args.dry_run})
- if __name__ == "__main__":
- main()
|