Переглянути джерело

Normalize timestamps at write time + backfill script

Problem: cluster payloads stored timestamps as raw RSS strings
(RFC 2822 HTTP-date like "Sat, 30 May 2026 02:00:12 +0000")
while queries assumed ISO 8601. _parse_ts fallbacks handled
reads but SQL-side time filtering on updated_at (row mod time
not event time) caused sentiment/entity queries to return
wrong data regardless of timestamp format.

Fix: normalize → ISO 8601 at the write boundary so all stored
payloads are consistent and query-time parsing is trivial.

Changes:
- news_mcp/storage/sqlite_store.py:
  * Add _normalize_ts() helper: parses ISO 8601, RFC 2822,
    epoch seconds → "YYYY-MM-DDTHH:MM:SS+00:00"
  * Hook into sanitize_cluster_payload(): normalizes
    cluster.timestamp, first_seen, last_updated, and every
    article[].timestamp before payload is written to DB

- scripts/merge_cluster_embeddings.py:
  * Import _normalize_ts from store
  * Normalize article timestamps and cluster-level
    timestamp/first_seen/last_updated in _merge_payloads()

- scripts/normalize_cluster_timestamps.py (new):
  * Backfill script: reads all cluster payloads, normalizes
    timestamps in-place, writes back only changed rows
  * Supports --dry-run, --limit flags
  * Idempotent: safe to re-run, skips already-normalized rows
  * Usage: ./.venv/bin/python scripts/normalize_cluster_timestamps.py --dry-run
           ./.venv/bin/python scripts/normalize_cluster_timestamps.py

All 5 affected files pass syntax checks.
Lukas Goldschmidt 1 тиждень тому
батько
коміт
10f8ffcdc0

+ 46 - 0
news_mcp/storage/sqlite_store.py

@@ -18,6 +18,43 @@ from news_mcp.entity_normalize import normalize_entities
 from news_mcp.trends_resolution import resolve_entity_via_trends
 
 
+def _normalize_ts(ts: Any) -> str:
+    """Parse any timestamp string and return ISO 8601 UTC.
+    
+    Handles ISO 8601, RFC 2822/HTTP-date, and unix epoch seconds.
+    Returns empty string if unparseable.
+    """
+    if ts is None:
+        return ""
+    if isinstance(ts, (int, float)):
+        try:
+            dt = datetime.fromtimestamp(float(ts), tz=timezone.utc)
+            return dt.strftime("%Y-%m-%dT%H:%M:%S+00:00")
+        except Exception:
+            return ""
+    text = str(ts).strip()
+    if not text:
+        return ""
+    # Try ISO 8601
+    try:
+        dt = datetime.fromisoformat(text.replace("Z", "+00:00"))
+        if dt.tzinfo is None:
+            dt = dt.replace(tzinfo=timezone.utc)
+        return dt.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S+00:00")
+    except Exception:
+        pass
+    # Try RFC 2822 / HTTP-date
+    try:
+        dt = parsedate_to_datetime(text)
+        if dt.tzinfo is None:
+            dt = dt.replace(tzinfo=timezone.utc)
+        return dt.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S+00:00")
+    except Exception:
+        pass
+    # Return original if we can't parse — better than losing data
+    return text
+
+
 @dataclass
 class ClusterRow:
     cluster_id: str
@@ -74,12 +111,21 @@ def sanitize_cluster_payload(cluster: dict[str, Any], *, include_resolutions: bo
 
     raw_articles = out.get("articles", []) or []
     articles = [a for a in raw_articles if isinstance(a, dict)]
+    # Normalize article timestamps
+    for a in articles:
+        if "timestamp" in a:
+            a["timestamp"] = _normalize_ts(a["timestamp"])
     out["articles"] = _dedup_articles(articles)
 
     raw_entities = out.get("entities", []) or []
     entities = normalize_entities(raw_entities)
     out["entities"] = entities
 
+    # Normalize cluster-level timestamps
+    for field in ("timestamp", "last_updated", "first_seen"):
+        if field in out and out[field]:
+            out[field] = _normalize_ts(out[field])
+
     if not include_resolutions:
         return out
 

+ 13 - 3
scripts/merge_cluster_embeddings.py

@@ -23,7 +23,7 @@ sys.path.insert(0, str(ROOT))
 
 from news_mcp.config import DB_PATH
 from news_mcp.dedup.embedding_support import cosine_similarity
-from news_mcp.storage.sqlite_store import SQLiteClusterStore
+from news_mcp.storage.sqlite_store import SQLiteClusterStore, _normalize_ts
 
 
 def _embedding(cluster: dict[str, Any]) -> list[float] | None:
@@ -101,11 +101,21 @@ def _merge_payloads(clusters: list[dict[str, Any]]) -> dict[str, Any]:
             last_updated = lu
 
     merged["articles"] = _uniq_by_url(all_articles)
+    # Normalize article timestamps
+    for a in merged["articles"]:
+        if "timestamp" in a and a["timestamp"]:
+            a["timestamp"] = _normalize_ts(a["timestamp"])
+
     merged["sources"] = list(dict.fromkeys(all_sources))
     merged["entities"] = list(dict.fromkeys(e for e in all_entities if e))
     merged["keywords"] = list(dict.fromkeys(k for k in all_keywords if k))
-    merged["first_seen"] = first_seen or merged.get("first_seen")
-    merged["last_updated"] = last_updated or merged.get("last_updated")
+
+    # Normalize cluster-level timestamps
+    for field in ("timestamp", "first_seen", "last_seen", "last_updated"):
+        val = merged.get(field) or (first_seen if field == "first_seen" else last_updated if field == "last_updated" else "")
+        if val:
+            merged[field] = _normalize_ts(val)
+
     merged["importance"] = max(float(c.get("importance", 0.0) or 0.0) for c in clusters)
     if sent_scores:
         merged["sentimentScore"] = sum(sent_scores) / len(sent_scores)

+ 147 - 0
scripts/normalize_cluster_timestamps.py

@@ -0,0 +1,147 @@
+#!/usr/bin/env python3
+"""Backfill: normalize all cluster payload timestamps to ISO 8601 UTC.
+
+Reads every cluster payload from the DB, normalizes timestamp fields
+(cluster-level and article-level), and writes back only changed payloads.
+
+Usage:
+  ./.venv/bin/python scripts/normalize_cluster_timestamps.py [--dry-run] [--db PATH]
+
+Safe to re-run: only writes rows whose payload actually changed.
+"""
+
+from __future__ import annotations
+
+import argparse
+import json
+import sqlite3
+import sys
+from datetime import datetime, timezone, timedelta
+from email.utils import parsedate_to_datetime
+from pathlib import Path
+from typing import Any
+
+ROOT = Path(__file__).resolve().parents[1]
+sys.path.insert(0, str(ROOT))
+
+from news_mcp.config import DB_PATH
+
+
+def _normalize_ts(ts: Any) -> str:
+    """Parse any timestamp string and return ISO 8601 UTC. Returns "" if unparseable."""
+    if ts is None:
+        return ""
+    if isinstance(ts, (int, float)):
+        try:
+            dt = datetime.fromtimestamp(float(ts), tz=timezone.utc)
+            return dt.strftime("%Y-%m-%dT%H:%M:%S+00:00")
+        except Exception:
+            return ""
+    text = str(ts).strip()
+    if not text:
+        return ""
+    # ISO 8601
+    try:
+        dt = datetime.fromisoformat(text.replace("Z", "+00:00"))
+        if dt.tzinfo is None:
+            dt = dt.replace(tzinfo=timezone.utc)
+        return dt.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S+00:00")
+    except Exception:
+        pass
+    # RFC 2822 / HTTP-date
+    try:
+        dt = parsedate_to_datetime(text)
+        if dt.tzinfo is None:
+            dt = dt.replace(tzinfo=timezone.utc)
+        return dt.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S+00:00")
+    except Exception:
+        pass
+    return text  # leave unparseable values as-is
+
+
+def _normalize_payload(payload: dict[str, Any]) -> dict[str, Any]:
+    """Return a new dict with all timestamp fields normalized. Returns same ref if no changes."""
+    changed = False
+    out = dict(payload)
+
+    # Article-level timestamps
+    articles = out.get("articles") or []
+    if articles:
+        new_articles = []
+        for a in articles:
+            if isinstance(a, dict) and "timestamp" in a and a["timestamp"]:
+                normed = _normalize_ts(a["timestamp"])
+                if normed != a["timestamp"]:
+                    a = dict(a)
+                    a["timestamp"] = normed
+                    changed = True
+            new_articles.append(a)
+        out["articles"] = new_articles
+
+    # Cluster-level timestamp fields
+    for field in ("timestamp", "first_seen", "last_updated"):
+        if field in out and out[field]:
+            normed = _normalize_ts(out[field])
+            if normed != out[field]:
+                out[field] = normed
+                changed = True
+
+    return out
+
+
+def main() -> None:
+    parser = argparse.ArgumentParser(description="Normalize all cluster payload timestamps to ISO 8601")
+    parser.add_argument("--db", type=Path, default=DB_PATH)
+    parser.add_argument("--dry-run", action="store_true")
+    parser.add_argument("--limit", type=int, default=None)
+    args = parser.parse_args()
+
+    db_path = str(args.db)
+    print(f"Database: {db_path}")
+    print(f"Dry run:  {args.dry_run}")
+
+    with sqlite3.connect(db_path) as conn:
+        conn.execute("PRAGMA journal_mode=WAL")
+        rows = conn.execute(
+            "SELECT cluster_id, topic, payload FROM clusters ORDER BY updated_at ASC"
+        ).fetchall()
+        if args.limit:
+            rows = rows[: args.limit]
+
+    total = len(rows)
+    changed_count = 0
+    errors = 0
+
+    with sqlite3.connect(db_path) as conn:
+        conn.execute("PRAGMA journal_mode=WAL")
+        for i, (cluster_id, topic, payload_json) in enumerate(rows):
+            try:
+                payload = json.loads(payload_json)
+            except Exception:
+                errors += 1
+                continue
+
+            normalized = _normalize_payload(payload)
+
+            if normalized is not payload:  # identity check: _normalize_payload returns same ref if no changes
+                changed_count += 1
+                if not args.dry_run:
+                    new_json = json.dumps(normalized, ensure_ascii=False)
+                    conn.execute(
+                        "UPDATE clusters SET payload=?, updated_at=? WHERE cluster_id=?",
+                        (new_json, datetime.now(timezone.utc).isoformat(), cluster_id),
+                    )
+
+            if (i + 1) % 500 == 0 or i == total - 1:
+                print(f"  Processed {i+1}/{total}... ({changed_count} changed, {errors} errors)", flush=True)
+
+        if not args.dry_run:
+            conn.commit()
+
+    print(f"\nDone. {changed_count}/{total} payloads normalized, {errors} errors.")
+    if args.dry_run:
+        print("  (dry run — no changes written)")
+
+
+if __name__ == "__main__":
+    main()