| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147 |
- #!/usr/bin/env python3
- """Backfill: normalize all cluster payload timestamps to ISO 8601 UTC.
- Reads every cluster payload from the DB, normalizes timestamp fields
- (cluster-level and article-level), and writes back only changed payloads.
- Usage:
- ./.venv/bin/python scripts/normalize_cluster_timestamps.py [--dry-run] [--db PATH]
- Safe to re-run: only writes rows whose payload actually changed.
- """
- from __future__ import annotations
- import argparse
- import json
- import sqlite3
- import sys
- from datetime import datetime, timezone, timedelta
- from email.utils import parsedate_to_datetime
- from pathlib import Path
- from typing import Any
- ROOT = Path(__file__).resolve().parents[1]
- sys.path.insert(0, str(ROOT))
- from news_mcp.config import DB_PATH
- def _normalize_ts(ts: Any) -> str:
- """Parse any timestamp string and return ISO 8601 UTC. Returns "" if unparseable."""
- if ts is None:
- return ""
- if isinstance(ts, (int, float)):
- try:
- dt = datetime.fromtimestamp(float(ts), tz=timezone.utc)
- return dt.strftime("%Y-%m-%dT%H:%M:%S+00:00")
- except Exception:
- return ""
- text = str(ts).strip()
- if not text:
- return ""
- # ISO 8601
- try:
- dt = datetime.fromisoformat(text.replace("Z", "+00:00"))
- if dt.tzinfo is None:
- dt = dt.replace(tzinfo=timezone.utc)
- return dt.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S+00:00")
- except Exception:
- pass
- # RFC 2822 / HTTP-date
- try:
- dt = parsedate_to_datetime(text)
- if dt.tzinfo is None:
- dt = dt.replace(tzinfo=timezone.utc)
- return dt.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S+00:00")
- except Exception:
- pass
- return text # leave unparseable values as-is
- def _normalize_payload(payload: dict[str, Any]) -> dict[str, Any]:
- """Return a new dict with all timestamp fields normalized. Returns same ref if no changes."""
- changed = False
- out = dict(payload)
- # Article-level timestamps
- articles = out.get("articles") or []
- if articles:
- new_articles = []
- for a in articles:
- if isinstance(a, dict) and "timestamp" in a and a["timestamp"]:
- normed = _normalize_ts(a["timestamp"])
- if normed != a["timestamp"]:
- a = dict(a)
- a["timestamp"] = normed
- changed = True
- new_articles.append(a)
- out["articles"] = new_articles
- # Cluster-level timestamp fields
- for field in ("timestamp", "first_seen", "last_updated"):
- if field in out and out[field]:
- normed = _normalize_ts(out[field])
- if normed != out[field]:
- out[field] = normed
- changed = True
- return out
- def main() -> None:
- parser = argparse.ArgumentParser(description="Normalize all cluster payload timestamps to ISO 8601")
- parser.add_argument("--db", type=Path, default=DB_PATH)
- parser.add_argument("--dry-run", action="store_true")
- parser.add_argument("--limit", type=int, default=None)
- args = parser.parse_args()
- db_path = str(args.db)
- print(f"Database: {db_path}")
- print(f"Dry run: {args.dry_run}")
- with sqlite3.connect(db_path) as conn:
- conn.execute("PRAGMA journal_mode=WAL")
- rows = conn.execute(
- "SELECT cluster_id, topic, payload FROM clusters ORDER BY updated_at ASC"
- ).fetchall()
- if args.limit:
- rows = rows[: args.limit]
- total = len(rows)
- changed_count = 0
- errors = 0
- with sqlite3.connect(db_path) as conn:
- conn.execute("PRAGMA journal_mode=WAL")
- for i, (cluster_id, topic, payload_json) in enumerate(rows):
- try:
- payload = json.loads(payload_json)
- except Exception:
- errors += 1
- continue
- normalized = _normalize_payload(payload)
- if normalized is not payload: # identity check: _normalize_payload returns same ref if no changes
- changed_count += 1
- if not args.dry_run:
- new_json = json.dumps(normalized, ensure_ascii=False)
- conn.execute(
- "UPDATE clusters SET payload=?, updated_at=? WHERE cluster_id=?",
- (new_json, datetime.now(timezone.utc).isoformat(), cluster_id),
- )
- if (i + 1) % 500 == 0 or i == total - 1:
- print(f" Processed {i+1}/{total}... ({changed_count} changed, {errors} errors)", flush=True)
- if not args.dry_run:
- conn.commit()
- print(f"\nDone. {changed_count}/{total} payloads normalized, {errors} errors.")
- if args.dry_run:
- print(" (dry run — no changes written)")
- if __name__ == "__main__":
- main()
|