normalize_cluster_timestamps.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. #!/usr/bin/env python3
  2. """Backfill: normalize all cluster payload timestamps to ISO 8601 UTC.
  3. Reads every cluster payload from the DB, normalizes timestamp fields
  4. (cluster-level and article-level), and writes back only changed payloads.
  5. Usage:
  6. ./.venv/bin/python scripts/normalize_cluster_timestamps.py [--dry-run] [--db PATH]
  7. Safe to re-run: only writes rows whose payload actually changed.
  8. """
  9. from __future__ import annotations
  10. import argparse
  11. import json
  12. import sqlite3
  13. import sys
  14. from datetime import datetime, timezone, timedelta
  15. from email.utils import parsedate_to_datetime
  16. from pathlib import Path
  17. from typing import Any
  18. ROOT = Path(__file__).resolve().parents[1]
  19. sys.path.insert(0, str(ROOT))
  20. from news_mcp.config import DB_PATH
  21. def _normalize_ts(ts: Any) -> str:
  22. """Parse any timestamp string and return ISO 8601 UTC. Returns "" if unparseable."""
  23. if ts is None:
  24. return ""
  25. if isinstance(ts, (int, float)):
  26. try:
  27. dt = datetime.fromtimestamp(float(ts), tz=timezone.utc)
  28. return dt.strftime("%Y-%m-%dT%H:%M:%S+00:00")
  29. except Exception:
  30. return ""
  31. text = str(ts).strip()
  32. if not text:
  33. return ""
  34. # ISO 8601
  35. try:
  36. dt = datetime.fromisoformat(text.replace("Z", "+00:00"))
  37. if dt.tzinfo is None:
  38. dt = dt.replace(tzinfo=timezone.utc)
  39. return dt.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S+00:00")
  40. except Exception:
  41. pass
  42. # RFC 2822 / HTTP-date
  43. try:
  44. dt = parsedate_to_datetime(text)
  45. if dt.tzinfo is None:
  46. dt = dt.replace(tzinfo=timezone.utc)
  47. return dt.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S+00:00")
  48. except Exception:
  49. pass
  50. return text # leave unparseable values as-is
  51. def _normalize_payload(payload: dict[str, Any]) -> dict[str, Any]:
  52. """Return a new dict with all timestamp fields normalized. Returns same ref if no changes."""
  53. changed = False
  54. out = dict(payload)
  55. # Article-level timestamps
  56. articles = out.get("articles") or []
  57. if articles:
  58. new_articles = []
  59. for a in articles:
  60. if isinstance(a, dict) and "timestamp" in a and a["timestamp"]:
  61. normed = _normalize_ts(a["timestamp"])
  62. if normed != a["timestamp"]:
  63. a = dict(a)
  64. a["timestamp"] = normed
  65. changed = True
  66. new_articles.append(a)
  67. out["articles"] = new_articles
  68. # Cluster-level timestamp fields
  69. for field in ("timestamp", "first_seen", "last_updated"):
  70. if field in out and out[field]:
  71. normed = _normalize_ts(out[field])
  72. if normed != out[field]:
  73. out[field] = normed
  74. changed = True
  75. return out
  76. def main() -> None:
  77. parser = argparse.ArgumentParser(description="Normalize all cluster payload timestamps to ISO 8601")
  78. parser.add_argument("--db", type=Path, default=DB_PATH)
  79. parser.add_argument("--dry-run", action="store_true")
  80. parser.add_argument("--limit", type=int, default=None)
  81. args = parser.parse_args()
  82. db_path = str(args.db)
  83. print(f"Database: {db_path}")
  84. print(f"Dry run: {args.dry_run}")
  85. with sqlite3.connect(db_path) as conn:
  86. conn.execute("PRAGMA journal_mode=WAL")
  87. rows = conn.execute(
  88. "SELECT cluster_id, topic, payload FROM clusters ORDER BY updated_at ASC"
  89. ).fetchall()
  90. if args.limit:
  91. rows = rows[: args.limit]
  92. total = len(rows)
  93. changed_count = 0
  94. errors = 0
  95. with sqlite3.connect(db_path) as conn:
  96. conn.execute("PRAGMA journal_mode=WAL")
  97. for i, (cluster_id, topic, payload_json) in enumerate(rows):
  98. try:
  99. payload = json.loads(payload_json)
  100. except Exception:
  101. errors += 1
  102. continue
  103. normalized = _normalize_payload(payload)
  104. if normalized is not payload: # identity check: _normalize_payload returns same ref if no changes
  105. changed_count += 1
  106. if not args.dry_run:
  107. new_json = json.dumps(normalized, ensure_ascii=False)
  108. conn.execute(
  109. "UPDATE clusters SET payload=?, updated_at=? WHERE cluster_id=?",
  110. (new_json, datetime.now(timezone.utc).isoformat(), cluster_id),
  111. )
  112. if (i + 1) % 500 == 0 or i == total - 1:
  113. print(f" Processed {i+1}/{total}... ({changed_count} changed, {errors} errors)", flush=True)
  114. if not args.dry_run:
  115. conn.commit()
  116. print(f"\nDone. {changed_count}/{total} payloads normalized, {errors} errors.")
  117. if args.dry_run:
  118. print(" (dry run — no changes written)")
  119. if __name__ == "__main__":
  120. main()