Browse Source

news-mcp: sort/filter latest clusters by payload timestamp

Lukas Goldschmidt 1 tháng trước cách đây
mục cha
commit
91ec9a1c5c
1 tập tin đã thay đổi với 84 bổ sung11 xóa
  1. 84 11
      news_mcp/storage/sqlite_store.py

+ 84 - 11
news_mcp/storage/sqlite_store.py

@@ -7,6 +7,7 @@ from datetime import datetime, timezone, timedelta
 from pathlib import Path
 from typing import Any
 from urllib.parse import urlparse
+from email.utils import parsedate_to_datetime
 
 from news_mcp.entity_normalize import normalize_entities
 from news_mcp.trends_resolution import resolve_entity_via_trends
@@ -202,25 +203,97 @@ class SQLiteClusterStore:
             return json.loads(row[0])
 
     def get_latest_clusters(self, topic: str, ttl_hours: float, limit: int) -> list[dict]:
-        cutoff = datetime.now(timezone.utc) - timedelta(hours=ttl_hours)
-        cutoff_iso = cutoff.isoformat()
+        """Return newest clusters by *their own* timestamp.
+
+        Filtering/sorting by the DB row's `updated_at` can drift away from the
+        actual event time in `payload.timestamp`.
+        """
+
+        cutoff = datetime.now(timezone.utc) - timedelta(hours=float(ttl_hours))
+        cutoff_ts = cutoff.timestamp()
+
+        def _parse_payload_ts(ts: Any) -> float | None:
+            if not ts:
+                return None
+            if isinstance(ts, (int, float)):
+                return float(ts)
+            text = str(ts).strip()
+            try:
+                dt = datetime.fromisoformat(text.replace('Z', '+00:00'))
+                if dt.tzinfo is None:
+                    dt = dt.replace(tzinfo=timezone.utc)
+                return dt.astimezone(timezone.utc).timestamp()
+            except Exception:
+                pass
+            try:
+                dt = parsedate_to_datetime(text)
+                if dt.tzinfo is None:
+                    dt = dt.replace(tzinfo=timezone.utc)
+                return dt.astimezone(timezone.utc).timestamp()
+            except Exception:
+                return None
+
+        # Pull a wider candidate set, then filter by payload.timestamp.
         with self._conn() as conn:
             cur = conn.execute(
-                "SELECT payload FROM clusters WHERE topic=? AND updated_at >= ? ORDER BY updated_at DESC LIMIT ?",
-                (topic, cutoff_iso, int(limit)),
+                "SELECT payload FROM clusters WHERE topic=? LIMIT ?",
+                (topic, int(max(200, limit) * 10)),
             )
-            rows = [json.loads(r[0]) for r in cur.fetchall()]
-        return rows
+            candidates = [json.loads(r[0]) for r in cur.fetchall()]
+
+        filtered: list[dict] = []
+        for c in candidates:
+            ts = _parse_payload_ts(c.get("timestamp"))
+            if ts is None:
+                continue
+            if ts >= cutoff_ts:
+                filtered.append(c)
+
+        filtered.sort(key=lambda c: _parse_payload_ts(c.get("timestamp")) or 0.0, reverse=True)
+        return filtered[: int(limit)]
 
     def get_latest_clusters_all_topics(self, ttl_hours: float, limit: int) -> list[dict]:
-        cutoff = datetime.now(timezone.utc) - timedelta(hours=ttl_hours)
-        cutoff_iso = cutoff.isoformat()
+        cutoff = datetime.now(timezone.utc) - timedelta(hours=float(ttl_hours))
+        cutoff_ts = cutoff.timestamp()
+
+        def _parse_payload_ts(ts: Any) -> float | None:
+            if not ts:
+                return None
+            if isinstance(ts, (int, float)):
+                return float(ts)
+            text = str(ts).strip()
+            try:
+                dt = datetime.fromisoformat(text.replace('Z', '+00:00'))
+                if dt.tzinfo is None:
+                    dt = dt.replace(tzinfo=timezone.utc)
+                return dt.astimezone(timezone.utc).timestamp()
+            except Exception:
+                pass
+            try:
+                dt = parsedate_to_datetime(text)
+                if dt.tzinfo is None:
+                    dt = dt.replace(tzinfo=timezone.utc)
+                return dt.astimezone(timezone.utc).timestamp()
+            except Exception:
+                return None
+
         with self._conn() as conn:
             cur = conn.execute(
-                "SELECT payload FROM clusters WHERE updated_at >= ? ORDER BY updated_at DESC LIMIT ?",
-                (cutoff_iso, int(limit)),
+                "SELECT payload FROM clusters LIMIT ?",
+                (int(max(500, limit) * 10),),
             )
-            return [json.loads(r[0]) for r in cur.fetchall()]
+            candidates = [json.loads(r[0]) for r in cur.fetchall()]
+
+        filtered: list[dict] = []
+        for c in candidates:
+            ts = _parse_payload_ts(c.get("timestamp"))
+            if ts is None:
+                continue
+            if ts >= cutoff_ts:
+                filtered.append(c)
+
+        filtered.sort(key=lambda c: _parse_payload_ts(c.get("timestamp")) or 0.0, reverse=True)
+        return filtered[: int(limit)]
 
     def get_cluster_by_id(self, cluster_id: str) -> dict | None:
         with self._conn() as conn: