normalize_cluster_timestamps.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. #!/usr/bin/env python3
  2. """Backfill: normalize all cluster payload timestamps to ISO 8601 UTC.
  3. Reads every cluster payload from the DB, normalizes timestamp fields
  4. (cluster-level and article-level), and writes back only changed payloads.
  5. Usage:
  6. ./.venv/bin/python scripts/normalize_cluster_timestamps.py [--dry-run] [--db PATH]
  7. Safe to re-run: only writes rows whose payload actually changed.
  8. """
  9. from __future__ import annotations
  10. import argparse
  11. import json
  12. import sqlite3
  13. import sys
  14. from datetime import datetime, timezone, timedelta
  15. from email.utils import parsedate_to_datetime
  16. from pathlib import Path
  17. from typing import Any
  18. ROOT = Path(__file__).resolve().parents[1]
  19. sys.path.insert(0, str(ROOT))
  20. from news_mcp.config import DB_PATH
  21. def _normalize_ts(ts: Any) -> str:
  22. """Parse any timestamp string and return ISO 8601 UTC. Returns "" if unparseable."""
  23. if ts is None:
  24. return ""
  25. if isinstance(ts, (int, float)):
  26. try:
  27. dt = datetime.fromtimestamp(float(ts), tz=timezone.utc)
  28. return dt.strftime("%Y-%m-%dT%H:%M:%S+00:00")
  29. except Exception:
  30. return ""
  31. text = str(ts).strip()
  32. if not text:
  33. return ""
  34. # ISO 8601
  35. try:
  36. dt = datetime.fromisoformat(text.replace("Z", "+00:00"))
  37. if dt.tzinfo is None:
  38. dt = dt.replace(tzinfo=timezone.utc)
  39. return dt.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S+00:00")
  40. except Exception:
  41. pass
  42. # RFC 2822 / HTTP-date
  43. try:
  44. dt = parsedate_to_datetime(text)
  45. if dt.tzinfo is None:
  46. dt = dt.replace(tzinfo=timezone.utc)
  47. return dt.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S+00:00")
  48. except Exception:
  49. pass
  50. return text # leave unparseable values as-is
  51. def _normalize_payload(payload: dict[str, Any]) -> dict[str, Any]:
  52. """Return a new dict with all timestamp fields normalized. Returns same ref if no changes."""
  53. changed = False
  54. out = dict(payload)
  55. # Article-level timestamps
  56. articles = out.get("articles") or []
  57. if articles:
  58. new_articles = []
  59. for a in articles:
  60. if isinstance(a, dict) and "timestamp" in a and a["timestamp"]:
  61. normed = _normalize_ts(a["timestamp"])
  62. if normed != a["timestamp"]:
  63. a = dict(a)
  64. a["timestamp"] = normed
  65. changed = True
  66. new_articles.append(a)
  67. out["articles"] = new_articles
  68. # Cluster-level timestamp fields
  69. for field in ("timestamp", "first_seen", "last_updated"):
  70. if field in out and out[field]:
  71. normed = _normalize_ts(out[field])
  72. if normed != out[field]:
  73. out[field] = normed
  74. changed = True
  75. return out
  76. def main() -> None:
  77. parser = argparse.ArgumentParser(description="Normalize all cluster payload timestamps to ISO 8601")
  78. parser.add_argument("--db", type=Path, default=DB_PATH)
  79. parser.add_argument("--dry-run", action="store_true")
  80. parser.add_argument("--limit", type=int, default=None)
  81. args = parser.parse_args()
  82. db_path = str(args.db)
  83. if not Path(db_path).exists():
  84. print(f"ERROR: database file not found: {db_path}")
  85. print("Check NEWS_MCP_DB_PATH env var or pass --db /path/to/db")
  86. sys.exit(1)
  87. # Quick check: does the clusters table exist?
  88. with sqlite3.connect(db_path) as probe:
  89. tables = {row[0] for row in probe.execute(
  90. "SELECT name FROM sqlite_master WHERE type='table'"
  91. ).fetchall()}
  92. if "clusters" not in tables:
  93. print(f"ERROR: 'clusters' table not found in {db_path}")
  94. print(f" Tables present: {sorted(tables) or '(none)'}")
  95. print(" This may not be the correct database file.")
  96. sys.exit(1)
  97. print(f"Database: {db_path}")
  98. print(f"Dry run: {args.dry_run}")
  99. with sqlite3.connect(db_path) as conn:
  100. conn.execute("PRAGMA journal_mode=WAL")
  101. rows = conn.execute(
  102. "SELECT cluster_id, topic, payload FROM clusters ORDER BY updated_at ASC"
  103. ).fetchall()
  104. if args.limit:
  105. rows = rows[: args.limit]
  106. total = len(rows)
  107. changed_count = 0
  108. errors = 0
  109. with sqlite3.connect(db_path) as conn:
  110. conn.execute("PRAGMA journal_mode=WAL")
  111. for i, (cluster_id, topic, payload_json) in enumerate(rows):
  112. try:
  113. payload = json.loads(payload_json)
  114. except Exception:
  115. errors += 1
  116. continue
  117. normalized = _normalize_payload(payload)
  118. if normalized is not payload: # identity check: _normalize_payload returns same ref if no changes
  119. changed_count += 1
  120. if not args.dry_run:
  121. new_json = json.dumps(normalized, ensure_ascii=False)
  122. conn.execute(
  123. "UPDATE clusters SET payload=?, updated_at=? WHERE cluster_id=?",
  124. (new_json, datetime.now(timezone.utc).isoformat(), cluster_id),
  125. )
  126. if (i + 1) % 500 == 0 or i == total - 1:
  127. print(f" Processed {i+1}/{total}... ({changed_count} changed, {errors} errors)", flush=True)
  128. if not args.dry_run:
  129. conn.commit()
  130. print(f"\nDone. {changed_count}/{total} payloads normalized, {errors} errors.")
  131. if args.dry_run:
  132. print(" (dry run — no changes written)")
  133. if __name__ == "__main__":
  134. main()