dedup_articles_in_clusters.py 2.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. from __future__ import annotations
  2. """Deduplicate article entries inside stored clusters.
  3. This cleans existing SQLite payloads so a cluster only keeps one article record
  4. per canonical article key (preferably the URL path/article id).
  5. Usage:
  6. ./.venv/bin/python scripts/dedup_articles_in_clusters.py --dry-run
  7. ./.venv/bin/python scripts/dedup_articles_in_clusters.py
  8. """
  9. import argparse
  10. import json
  11. import sys
  12. from pathlib import Path
  13. from typing import Any
  14. from urllib.parse import urlparse
  15. ROOT = Path(__file__).resolve().parents[1]
  16. sys.path.insert(0, str(ROOT))
  17. from news_mcp.config import DB_PATH
  18. from news_mcp.storage.sqlite_store import SQLiteClusterStore
  19. def _article_key(article: dict[str, Any]) -> str:
  20. url = str(article.get("url") or "").strip()
  21. if not url:
  22. return str(article.get("title") or "")
  23. try:
  24. parsed = urlparse(url)
  25. parts = [p for p in parsed.path.split("/") if p]
  26. if parts:
  27. return parts[-1]
  28. except Exception:
  29. pass
  30. return url
  31. def _dedup_articles(articles: list[dict[str, Any]]) -> list[dict[str, Any]]:
  32. seen = set()
  33. out = []
  34. for article in articles:
  35. key = _article_key(article)
  36. if key in seen:
  37. continue
  38. seen.add(key)
  39. out.append(article)
  40. return out
  41. def main() -> None:
  42. parser = argparse.ArgumentParser(description="Deduplicate article entries inside stored clusters")
  43. parser.add_argument("--db", type=Path, default=DB_PATH)
  44. parser.add_argument("--dry-run", action="store_true")
  45. parser.add_argument("--limit", type=int, default=None)
  46. args = parser.parse_args()
  47. store = SQLiteClusterStore(args.db)
  48. with store._conn() as conn: # noqa: SLF001 - maintenance script
  49. rows = conn.execute("SELECT cluster_id, topic, payload FROM clusters ORDER BY updated_at ASC").fetchall()
  50. if args.limit is not None:
  51. rows = rows[: args.limit]
  52. total = 0
  53. updated = 0
  54. print(f"starting article dedup: clusters={len(rows)} dry_run={args.dry_run}")
  55. for cluster_id, topic, payload_json in rows:
  56. total += 1
  57. try:
  58. cluster = json.loads(payload_json)
  59. except Exception:
  60. continue
  61. articles = cluster.get("articles", []) or []
  62. deduped = _dedup_articles([a for a in articles if isinstance(a, dict)])
  63. if len(deduped) == len(articles):
  64. continue
  65. cluster = dict(cluster)
  66. cluster["articles"] = deduped
  67. if not args.dry_run:
  68. store.upsert_clusters([cluster], topic=topic or cluster.get("topic", "other"))
  69. updated += 1
  70. if updated % 25 == 0:
  71. print(f"updated={updated} processed={total}")
  72. print({"total_scanned": total, "updated": updated, "dry_run": args.dry_run})
  73. if __name__ == "__main__":
  74. main()