dedup_articles_in_clusters.py 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. from __future__ import annotations
  2. """Deduplicate article entries inside stored clusters.
  3. This cleans existing SQLite payloads so a cluster only keeps one article record
  4. per canonical article key (preferably the URL path/article id).
  5. Usage:
  6. ./.venv/bin/python scripts/dedup_articles_in_clusters.py --dry-run
  7. ./.venv/bin/python scripts/dedup_articles_in_clusters.py
  8. """
  9. import argparse
  10. import json
  11. import sys
  12. from pathlib import Path
  13. ROOT = Path(__file__).resolve().parents[1]
  14. sys.path.insert(0, str(ROOT))
  15. from news_mcp.config import DB_PATH
  16. from news_mcp.storage.sqlite_store import SQLiteClusterStore, sanitize_cluster_payload
  17. def main() -> None:
  18. parser = argparse.ArgumentParser(description="Deduplicate article entries inside stored clusters")
  19. parser.add_argument("--db", type=Path, default=DB_PATH)
  20. parser.add_argument("--dry-run", action="store_true")
  21. parser.add_argument("--limit", type=int, default=None)
  22. args = parser.parse_args()
  23. store = SQLiteClusterStore(args.db)
  24. with store._conn() as conn: # noqa: SLF001 - maintenance script
  25. rows = conn.execute("SELECT cluster_id, topic, payload FROM clusters ORDER BY updated_at ASC").fetchall()
  26. if args.limit is not None:
  27. rows = rows[: args.limit]
  28. total = 0
  29. updated = 0
  30. print(f"starting article dedup: clusters={len(rows)} dry_run={args.dry_run}")
  31. for cluster_id, topic, payload_json in rows:
  32. total += 1
  33. try:
  34. cluster = json.loads(payload_json)
  35. except Exception:
  36. continue
  37. sanitized = sanitize_cluster_payload(cluster)
  38. original_articles = cluster.get("articles", []) or []
  39. deduped = sanitized.get("articles", []) or []
  40. if deduped == original_articles:
  41. continue
  42. cluster = sanitized
  43. if not args.dry_run:
  44. store.upsert_clusters([cluster], topic=topic or cluster.get("topic", "other"))
  45. updated += 1
  46. if updated % 25 == 0:
  47. print(f"updated={updated} processed={total}")
  48. print({"total_scanned": total, "updated": updated, "dry_run": args.dry_run})
  49. if __name__ == "__main__":
  50. main()