ソースを参照

fix: prune on payload_ts (event time) + pre-filter articles older than retention

Two fixes to prevent stale RSS articles from being re-ingested forever:

1. prune_clusters() now deletes WHERE payload_ts < ? (event time from
   payload.timestamp) instead of updated_at (row write time).  The old
   query used updated_at, which is refreshed to NOW() on every upsert —
   meaning a re-ingested old article would never age out.

2. refresh_clusters() in the poller now pre-filters changed_articles
   before clustering: any article whose RSS timestamp parses to a date
   older than NEWS_RETENTION_DAYS is dropped.  This prevents clustering
   work, LLM calls, and DB writes for articles that would just be pruned
   at the next cycle.

Articles with no timestamp or unparseable timestamps are passed through
(lenient — they'll get a timestamp from sanitize_cluster_payload and can
be pruned by payload_ts later).

Tests updated to use dynamic now() timestamps instead of hardcoded April
2026 dates that became stale relative to the 14-day test retention
window.
Lukas Goldschmidt 1 週間 前
コミット
bb2b345be3
3 ファイル変更50 行追加13 行削除
  1. 37 2
      news_mcp/jobs/poller.py
  2. 4 1
      news_mcp/storage/sqlite_store.py
  3. 9 10
      test_news_mcp.py

+ 37 - 2
news_mcp/jobs/poller.py

@@ -5,7 +5,7 @@ import hashlib
 import logging
 import sys
 from collections import defaultdict
-from datetime import datetime, timezone
+from datetime import datetime, timezone, timedelta
 from typing import Any, Dict
 
 from news_mcp.config import (
@@ -23,7 +23,7 @@ from news_mcp.config import (
     NEWS_CLUSTER_MAX_AGE_HOURS,
     llm_concurrency,
 )
-from news_mcp.dedup.cluster import dedup_and_cluster_articles, _cluster_is_within_age_window
+from news_mcp.dedup.cluster import dedup_and_cluster_articles, _cluster_is_within_age_window, _parse_ts
 from news_mcp.enrichment.enrich import enrich_cluster
 from news_mcp.enrichment.llm_enrich import classify_cluster_llm
 from news_mcp.sources.news_feeds import fetch_news_articles
@@ -232,6 +232,41 @@ async def refresh_clusters(topic: str | None = None, limit: int = 80) -> None:
         return
 
     articles = changed_articles
+
+    # Pre-filter: drop articles whose RSS timestamp is older than retention.
+    # This prevents stale feed items from being re-ingested after pruning.
+    if NEWS_RETENTION_DAYS > 0:
+        retention_cutoff = datetime.now(timezone.utc) - timedelta(days=NEWS_RETENTION_DAYS)
+        fresh_articles = []
+        for a in articles:
+            ts_str = a.get("timestamp", "")
+            if not ts_str:
+                fresh_articles.append(a)
+                continue
+            dt = _parse_ts(ts_str)
+            if dt is None:
+                fresh_articles.append(a)
+                continue
+            if dt >= retention_cutoff:
+                fresh_articles.append(a)
+            else:
+                logger.debug("drop stale article title=%s ts=%s", a.get("title", "")[:60], ts_str)
+        dropped = len(articles) - len(fresh_articles)
+        if dropped:
+            logger.info("refresh retention-filter dropped=%d remaining=%d retention_days=%.0f", dropped, len(fresh_articles), NEWS_RETENTION_DAYS)
+        articles = fresh_articles
+
+    if not articles:
+        logger.info("refresh no articles after retention filter topic=%s", topic)
+        store.set_meta("last_refresh_at", datetime.now(timezone.utc).isoformat())
+        prune_result = store.prune_if_due(
+            pruning_enabled=NEWS_PRUNING_ENABLED,
+            retention_days=NEWS_RETENTION_DAYS,
+            interval_hours=NEWS_PRUNE_INTERVAL_HOURS,
+        )
+        logger.info("refresh prune_result=%s", prune_result)
+        return
+
     logger.info("refresh clustering start articles=%s topic=%s", len(articles), topic)
 
     # Pre-seed with recent clusters from the DB so new articles can merge

+ 4 - 1
news_mcp/storage/sqlite_store.py

@@ -615,7 +615,10 @@ class SQLiteClusterStore:
         cutoff_iso = cutoff.isoformat()
         pruned_at = datetime.now(timezone.utc).isoformat()
         with self._conn() as conn:
-            cur = conn.execute("DELETE FROM clusters WHERE updated_at < ?", (cutoff_iso,))
+            # Use payload_ts (event time from payload.timestamp) not updated_at
+            # (row write time). updated_at is refreshed on every upsert, which
+            # would keep re-ingested old articles alive forever.
+            cur = conn.execute("DELETE FROM clusters WHERE payload_ts < ?", (cutoff_iso,))
             deleted = int(cur.rowcount or 0)
             conn.execute(
                 "INSERT INTO meta(key, value) VALUES(?, ?) "

+ 9 - 10
test_news_mcp.py

@@ -103,16 +103,18 @@ def test_sqlite_summary_cache_does_not_create_placeholder_row():
 
 
 def test_prune_clusters_deletes_rows_older_than_retention():
+    from datetime import datetime, timezone, timedelta
     with tempfile.TemporaryDirectory() as td:
         db = Path(td) / "news.sqlite"
         store = SQLiteClusterStore(db)
+        now_str = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S+00:00")
         store.upsert_clusters([
             {
                 "cluster_id": "fresh",
                 "headline": "Fresh",
                 "summary": "Fresh summary",
                 "entities": ["Bitcoin"],
-                "timestamp": "Wed, 01 Apr 2026 12:00:00 GMT",
+                "timestamp": now_str,
                 "articles": [],
             },
             {
@@ -120,17 +122,11 @@ def test_prune_clusters_deletes_rows_older_than_retention():
                 "headline": "Stale",
                 "summary": "Stale summary",
                 "entities": ["Iran"],
-                "timestamp": "Wed, 01 Apr 2026 11:00:00 GMT",
+                "timestamp": "2025-01-01T11:00:00+00:00",
                 "articles": [],
             },
         ], topic="other")
 
-        with store._conn() as conn:
-            conn.execute(
-                "UPDATE clusters SET updated_at=? WHERE cluster_id=?",
-                ("2025-01-01T00:00:00+00:00", "stale"),
-            )
-
         deleted = store.prune_clusters(retention_days=30)
 
         assert deleted == 1
@@ -589,9 +585,12 @@ def test_poller_persists_clusters_under_post_enrichment_topic(monkeypatch):
     the post-enrichment topic so SQL filtering and dashboard groupings see the
     real classification."""
     import asyncio
+    from datetime import datetime, timezone
 
     import news_mcp.jobs.poller as poller
 
+    _now_str = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S+00:00")
+
     captured = {"upserts": []}
 
     class DummyStore:
@@ -656,7 +655,7 @@ def test_poller_persists_clusters_under_post_enrichment_topic(monkeypatch):
                     "sentiment": "neutral",
                     "importance": 0.0,
                     "sources": ["S"],
-                    "timestamp": "Wed, 01 Apr 2026 12:00:00 GMT",
+                    "timestamp": _now_str,
                     "articles": [],
                 }
             ]
@@ -681,7 +680,7 @@ def test_poller_persists_clusters_under_post_enrichment_topic(monkeypatch):
     async def _mock_fetch2(limit, url_list=None):
             return [
                 {"title": "SEC fines firm", "url": "https://example.com/a", "source": "S",
-                 "timestamp": "Wed, 01 Apr 2026 12:00:00 GMT", "summary": "..."},
+                 "timestamp": _now_str, "summary": "..."},
             ]
     monkeypatch.setattr(poller, "fetch_news_articles", _mock_fetch2)
     monkeypatch.setattr(poller, "dedup_and_cluster_articles", fake_cluster)