#!/usr/bin/env python3 """Backfill: normalize all cluster payload timestamps to ISO 8601 UTC. Reads every cluster payload from the DB, normalizes timestamp fields (cluster-level and article-level), and writes back only changed payloads. Usage: ./.venv/bin/python scripts/normalize_cluster_timestamps.py [--dry-run] [--db PATH] Safe to re-run: only writes rows whose payload actually changed. """ from __future__ import annotations import argparse import json import sqlite3 import sys from datetime import datetime, timezone, timedelta from email.utils import parsedate_to_datetime from pathlib import Path from typing import Any ROOT = Path(__file__).resolve().parents[1] sys.path.insert(0, str(ROOT)) from news_mcp.config import DB_PATH def _normalize_ts(ts: Any) -> str: """Parse any timestamp string and return ISO 8601 UTC. Returns "" if unparseable.""" if ts is None: return "" if isinstance(ts, (int, float)): try: dt = datetime.fromtimestamp(float(ts), tz=timezone.utc) return dt.strftime("%Y-%m-%dT%H:%M:%S+00:00") except Exception: return "" text = str(ts).strip() if not text: return "" # ISO 8601 try: dt = datetime.fromisoformat(text.replace("Z", "+00:00")) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) return dt.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S+00:00") except Exception: pass # RFC 2822 / HTTP-date try: dt = parsedate_to_datetime(text) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) return dt.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S+00:00") except Exception: pass return text # leave unparseable values as-is def _normalize_payload(payload: dict[str, Any]) -> dict[str, Any]: """Return a new dict with all timestamp fields normalized. Returns same ref if no changes.""" changed = False out = dict(payload) # Article-level timestamps articles = out.get("articles") or [] if articles: new_articles = [] for a in articles: if isinstance(a, dict) and "timestamp" in a and a["timestamp"]: normed = _normalize_ts(a["timestamp"]) if normed != a["timestamp"]: a = dict(a) a["timestamp"] = normed changed = True new_articles.append(a) out["articles"] = new_articles # Cluster-level timestamp fields for field in ("timestamp", "first_seen", "last_updated"): if field in out and out[field]: normed = _normalize_ts(out[field]) if normed != out[field]: out[field] = normed changed = True return out def main() -> None: parser = argparse.ArgumentParser(description="Normalize all cluster payload timestamps to ISO 8601") parser.add_argument("--db", type=Path, default=DB_PATH) parser.add_argument("--dry-run", action="store_true") parser.add_argument("--limit", type=int, default=None) args = parser.parse_args() db_path = str(args.db) print(f"Database: {db_path}") print(f"Dry run: {args.dry_run}") with sqlite3.connect(db_path) as conn: conn.execute("PRAGMA journal_mode=WAL") rows = conn.execute( "SELECT cluster_id, topic, payload FROM clusters ORDER BY updated_at ASC" ).fetchall() if args.limit: rows = rows[: args.limit] total = len(rows) changed_count = 0 errors = 0 with sqlite3.connect(db_path) as conn: conn.execute("PRAGMA journal_mode=WAL") for i, (cluster_id, topic, payload_json) in enumerate(rows): try: payload = json.loads(payload_json) except Exception: errors += 1 continue normalized = _normalize_payload(payload) if normalized is not payload: # identity check: _normalize_payload returns same ref if no changes changed_count += 1 if not args.dry_run: new_json = json.dumps(normalized, ensure_ascii=False) conn.execute( "UPDATE clusters SET payload=?, updated_at=? WHERE cluster_id=?", (new_json, datetime.now(timezone.utc).isoformat(), cluster_id), ) if (i + 1) % 500 == 0 or i == total - 1: print(f" Processed {i+1}/{total}... ({changed_count} changed, {errors} errors)", flush=True) if not args.dry_run: conn.commit() print(f"\nDone. {changed_count}/{total} payloads normalized, {errors} errors.") if args.dry_run: print(" (dry run — no changes written)") if __name__ == "__main__": main()