Просмотр исходного кода

feat: seen_articles table + dual-signal clustering + lower title threshold

E: seen_articles table tracks every article_key (URL-derived) that has
been clustered. On each poll cycle, already-seen articles are filtered
out BEFORE clustering — no re-clustering, no re-enrichment, no wasted
LLM calls. Cleaned up on prune when clusters are deleted.

Backfill script: scripts/backfill_seen_articles.py (run once after deploy).

B: Dual-signal match tier in _is_match() — when title >= 0.55 AND
jaccard >= 0.25, merge even without embeddings. Catches cross-source
variants where headlines differ editorially but share token overlap.

C: Lower DEFAULT_TITLE_THRESHOLD from 0.87 to 0.75 — catches more
headline variants across different outlets.

FeedStats.seen counter added; feed stats log now shows seen=X.
Health endpoint includes seen_article_count.
Lukas Goldschmidt 6 дней назад
Родитель
Сommit
28f4322c94

+ 9 - 2
news_mcp/dedup/cluster.py

@@ -87,7 +87,7 @@ def _jaccard(a: set, b: set) -> float:
 # Composite similarity
 # ---------------------------------------------------------------------------
 
-DEFAULT_TITLE_THRESHOLD = 0.87
+DEFAULT_TITLE_THRESHOLD = 0.75
 DEFAULT_JACCARD_THRESHOLD = 0.55
 
 
@@ -143,7 +143,7 @@ def _is_match(
 ) -> tuple[bool, str, float]:
     """Decide whether two items should merge based on the strongest signal.
 
-    Cascade: cosine (if embeddings enabled) → title → jaccard → consensus.
+    Cascade: cosine (if enabled) → title → jaccard → consensus → dual.
     Returns (matched, signal_name, signal_value).
     """
     cosine_threshold = NEWS_EMBEDDING_SIMILARITY_THRESHOLD
@@ -160,6 +160,13 @@ def _is_match(
     ):
         val = (signals["cosine"] + max(signals["jaccard"], signals["title"])) / 2.0
         return True, "consensus", val
+    # Dual-signal: medium title + medium jaccard → credible match even without
+    # embeddings.  Catches cross-source variants where headlines differ
+    # editorially (title ~0.55-0.74) but share substantial token overlap
+    # (jaccard ~0.25-0.54).
+    if signals["title"] >= 0.55 and signals["jaccard"] >= 0.25:
+        val = (signals["title"] + signals["jaccard"]) / 2.0
+        return True, "dual", val
     return False, "none", 0.0
 
 

+ 33 - 5
news_mcp/jobs/poller.py

@@ -41,7 +41,8 @@ class FeedStats:
     fetched: int = 0         # total items fetched from the feed
     duplicate: int = 0       # unchanged hash → skipped entirely
     stale: int = 0           # older than retention window (dropped)
-    ingested: int = 0        # passed dedup + retention, entered clustering
+    seen: int = 0            # already in seen_articles table → skipped
+    ingested: int = 0        # passed dedup + retention + seen, entered clustering
     enriched: int = 0        # newly LLM-enriched this cycle
     already_enriched: int = 0  # cache hit — already had entities+keywords
     failed: int = 0          # LLM enrichment failed after retries
@@ -66,6 +67,7 @@ class PollStats:
                     "fetched": f.fetched,
                     "duplicate": f.duplicate,
                     "stale": f.stale,
+                    "seen": f.seen,
                     "ingested": f.ingested,
                 }
                 for f in self.feeds
@@ -131,6 +133,32 @@ class ClusterPoller:
             self._prune_and_finalize(enabled_urls, feed_map)
             return self.stats
 
+        # 3b. Seen-articles filter: drop articles whose URL was already
+        # processed in any previous cycle.  This is the strongest dedup —
+        # an article key (derived from URL) in seen_articles means it has
+        # already been clustered and enriched, so re-processing is pure waste.
+        new_articles, already_seen = self.store.filter_already_seen(articles)
+        if already_seen:
+            # Attribute seen articles to their feeds for stats
+            for a in already_seen:
+                fu = a.get("feed_url", "")
+                for fs in feed_stats:
+                    if fs.feed_url == fu:
+                        fs.seen += 1
+                        break
+            self.logger.info(
+                "seen_articles: total=%d new=%d already_seen=%d",
+                len(articles), len(new_articles), len(already_seen),
+            )
+        articles = new_articles
+
+        if not articles:
+            self.logger.info("poll: all articles already seen (nothing new to cluster)")
+            self.stats.feeds = feed_stats
+            self._save_feed_stats(feed_stats)
+            self._prune_and_finalize(enabled_urls, feed_map)
+            return self.stats
+
         # 4. Pre-seed existing clusters for cross-cycle merging
         existing_clusters = self._preseed_clusters()
 
@@ -450,12 +478,12 @@ class ClusterPoller:
     # ------------------------------------------------------------------ #
 
     def _save_feed_stats(self, feed_stats: list[FeedStats]) -> None:
-        """Log per-feed statistics. ingested = fetched - duplicate - stale."""
+        """Log per-feed statistics. ingested = fetched - duplicate - stale - seen."""
         for fs in feed_stats:
-            fs.ingested = max(0, fs.fetched - fs.duplicate - fs.stale)
+            fs.ingested = max(0, fs.fetched - fs.duplicate - fs.stale - fs.seen)
             self.logger.info(
-                "feed stats: feed_url=%s fetched=%d duplicate=%d stale=%d ingested=%d",
-                fs.feed_url, fs.fetched, fs.duplicate, fs.stale, fs.ingested,
+                "feed stats: feed_url=%s fetched=%d duplicate=%d stale=%d seen=%d ingested=%d",
+                fs.feed_url, fs.fetched, fs.duplicate, fs.stale, fs.seen, fs.ingested,
             )
 
     def _prune_and_finalize(

+ 73 - 0
news_mcp/storage/sqlite_store.py

@@ -266,6 +266,20 @@ class SQLiteClusterStore:
                 "CREATE INDEX IF NOT EXISTS idx_cluster_keywords_keyword ON cluster_keywords(keyword)"
             )
 
+            # Seen-articles table: tracks every article_key that has been
+            # clustered, so the poller can skip already-processed articles
+            # entirely (no re-clustering, no re-enrichment).
+            conn.execute(
+                """
+                CREATE TABLE IF NOT EXISTS seen_articles (
+                    article_key TEXT PRIMARY KEY,
+                    cluster_id  TEXT NOT NULL,
+                    first_seen  TEXT NOT NULL,
+                    url         TEXT NOT NULL DEFAULT ''
+                )
+                """
+            )
+
             try:
                 cur = conn.execute("PRAGMA table_info(entity_metadata)")
                 cols = [row[1] for row in cur.fetchall()]
@@ -345,6 +359,16 @@ class SQLiteClusterStore:
                             "INSERT OR IGNORE INTO cluster_keywords(cluster_id, keyword) VALUES(?, ?)",
                             (cluster_id, kw_norm),
                         )
+                # Record every article in seen_articles so the poller can
+                # skip already-processed articles on future cycles.
+                for art in c.get("articles", []):
+                    akey = _article_key(art)
+                    if akey:
+                        art_url = str(art.get("url") or "").strip()
+                        conn.execute(
+                            "INSERT OR IGNORE INTO seen_articles(article_key, cluster_id, first_seen, url) VALUES(?,?,?,?)",
+                            (akey, cluster_id, now.isoformat(), art_url),
+                        )
 
     def upsert_cluster_summary(
         self,
@@ -495,6 +519,41 @@ class SQLiteClusterStore:
                 cur = conn.execute("SELECT feed_key FROM feed_state WHERE enabled = 1")
             return [row[0] for row in cur.fetchall()]
 
+    # ------------------------------------------------------------------ #
+    #  Seen-articles: skip already-processed articles at ingestion
+    # ------------------------------------------------------------------ #
+
+    def filter_already_seen(self, articles: list[dict]) -> tuple[list[dict], list[dict]]:
+        """Split articles into (new, already_seen) based on seen_articles table.
+
+        Uses _article_key (derived from URL) as the identity check.
+        Returns two lists: articles never seen before, and articles already
+        processed in a previous cycle.
+        """
+        keys = [_article_key(a) for a in articles]
+        if not keys:
+            return [], []
+        with self._conn() as conn:
+            placeholders = ",".join("?" for _ in keys)
+            cur = conn.execute(
+                f"SELECT article_key FROM seen_articles WHERE article_key IN ({placeholders})",
+                keys,
+            )
+            seen_set = {row[0] for row in cur.fetchall()}
+        new_articles = []
+        seen_articles = []
+        for art, key in zip(articles, keys):
+            if key in seen_set:
+                seen_articles.append(art)
+            else:
+                new_articles.append(art)
+        return new_articles, seen_articles
+
+    def get_seen_article_count(self) -> int:
+        """Total rows in seen_articles (for diagnostics)."""
+        with self._conn() as conn:
+            return conn.execute("SELECT count(*) FROM seen_articles").fetchone()[0]
+
     def get_meta(self, key: str) -> str | None:
         with self._conn() as conn:
             cur = conn.execute("SELECT value FROM meta WHERE key=?", (key,))
@@ -618,8 +677,21 @@ class SQLiteClusterStore:
             # Use payload_ts (event time from payload.timestamp) not updated_at
             # (row write time). updated_at is refreshed on every upsert, which
             # would keep re-ingested old articles alive forever.
+            # Collect cluster_ids being pruned so we can clean seen_articles.
+            pruned_ids = [
+                row[0] for row in conn.execute(
+                    "SELECT cluster_id FROM clusters WHERE payload_ts < ?", (cutoff_iso,)
+                ).fetchall()
+            ]
             cur = conn.execute("DELETE FROM clusters WHERE payload_ts < ?", (cutoff_iso,))
             deleted = int(cur.rowcount or 0)
+            # Clean up seen_articles rows pointing to pruned clusters
+            if pruned_ids:
+                placeholders = ",".join("?" for _ in pruned_ids)
+                conn.execute(
+                    f"DELETE FROM seen_articles WHERE cluster_id IN ({placeholders})",
+                    pruned_ids,
+                )
             conn.execute(
                 "INSERT INTO meta(key, value) VALUES(?, ?) "
                 "ON CONFLICT(key) DO UPDATE SET value=excluded.value",
@@ -732,6 +804,7 @@ class SQLiteClusterStore:
                 "data_fresh": fresh,
                 "feeds": feeds,
                 "feed_count": len(feeds),
+                "seen_article_count": self.get_seen_article_count(),
                 "prune_state": prune_state,
             }
 

+ 79 - 0
scripts/backfill_seen_articles.py

@@ -0,0 +1,79 @@
+#!/usr/bin/env python3
+"""Backfill seen_articles table from existing clusters.
+
+Run once after deploying the seen_articles feature:
+  docker exec -it news-mcp python3 /app/scripts/backfill_seen_articles.py
+
+Or locally against the repo DB:
+  python3 scripts/backfill_seen_articles.py
+"""
+import json
+import sqlite3
+import sys
+from datetime import datetime, timezone
+from urllib.parse import urlparse
+
+
+def _article_key(article: dict) -> str:
+    url = str(article.get("url") or "").strip()
+    if not url:
+        return str(article.get("title") or "")
+    try:
+        parsed = urlparse(url)
+        parts = [p for p in parsed.path.split("/") if p]
+        if parts:
+            return parts[-1]
+    except Exception:
+        pass
+    return url
+
+
+def main(db_path: str = "./data/news.sqlite"):
+    conn = sqlite3.connect(db_path)
+
+    # Ensure the table exists
+    conn.execute("""
+        CREATE TABLE IF NOT EXISTS seen_articles (
+            article_key TEXT PRIMARY KEY,
+            cluster_id  TEXT NOT NULL,
+            first_seen  TEXT NOT NULL,
+            url         TEXT NOT NULL DEFAULT ''
+        )
+    """)
+
+    rows = conn.execute("SELECT cluster_id, payload FROM clusters").fetchall()
+    now = datetime.now(timezone.utc).isoformat()
+    added = 0
+    skipped = 0
+
+    for cluster_id, payload_json in rows:
+        try:
+            payload = json.loads(payload_json)
+        except json.JSONDecodeError:
+            skipped += 1
+            continue
+        for art in payload.get("articles", []):
+            akey = _article_key(art)
+            if not akey:
+                continue
+            art_url = str(art.get("url") or "").strip()
+            try:
+                conn.execute(
+                    "INSERT OR IGNORE INTO seen_articles(article_key, cluster_id, first_seen, url) VALUES(?,?,?,?)",
+                    (akey, cluster_id, now, art_url),
+                )
+                if conn.execute("SELECT changes()").fetchone()[0] > 0:
+                    added += 1
+            except Exception as e:
+                print(f"  ERROR: {e}", file=sys.stderr)
+                skipped += 1
+
+    conn.commit()
+    total = conn.execute("SELECT count(*) FROM seen_articles").fetchone()[0]
+    conn.close()
+    print(f"Backfill complete: added={added} skipped={skipped} total_seen={total}")
+
+
+if __name__ == "__main__":
+    db = sys.argv[1] if len(sys.argv) > 1 else "./data/news.sqlite"
+    main(db)

+ 6 - 0
test_news_mcp.py

@@ -376,6 +376,9 @@ def test_refresh_skips_reprocessing_when_feed_hash_is_unchanged(monkeypatch):
         def get_latest_clusters_all_topics(self, ttl_hours=24, limit=500):
             return []
 
+        def filter_already_seen(self, articles):
+            return articles, []
+
         def set_meta(self, key, value):
             self.meta[key] = value
 
@@ -634,6 +637,9 @@ def test_poller_persists_clusters_under_post_enrichment_topic(monkeypatch):
         def get_latest_clusters_all_topics(self, ttl_hours=24, limit=500):
             return []
 
+        def filter_already_seen(self, articles):
+            return articles, []
+
         def set_meta(self, key, value):
             pass