Forráskód Böngészése

refactor: rewrite polling loop as ClusterPoller class with per-cycle stats

Lukas Goldschmidt 1 hete
szülő
commit
2670ed9d44
1 módosított fájl, 438 hozzáadás és 295 törlés
  1. 438 295
      news_mcp/jobs/poller.py

+ 438 - 295
news_mcp/jobs/poller.py

@@ -5,15 +5,14 @@ import hashlib
 import logging
 import sys
 from collections import defaultdict
+from dataclasses import dataclass, field
 from datetime import datetime, timezone, timedelta
 from typing import Any, Dict
 
 from news_mcp.config import (
-    DEFAULT_LOOKBACK_HOURS,
     DEFAULT_TOPICS,
     DB_PATH,
     ENRICH_OTHER_TOPICS_ONLY,
-    ENRICHMENT_MAX_PER_REFRESH,
     NEWS_EXTRACT_PROVIDER,
     NEWS_FEED_URL,
     NEWS_FEED_URLS,
@@ -22,6 +21,7 @@ from news_mcp.config import (
     NEWS_RETENTION_DAYS,
     NEWS_CLUSTER_MAX_AGE_HOURS,
     llm_concurrency,
+    llm_rate_limit,
 )
 from news_mcp.dedup.cluster import dedup_and_cluster_articles, _cluster_is_within_age_window, _parse_ts
 from news_mcp.enrichment.enrich import enrich_cluster
@@ -30,326 +30,469 @@ from news_mcp.sources.news_feeds import fetch_news_articles
 from news_mcp.storage.sqlite_store import SQLiteClusterStore
 
 
-def _load_feed_urls() -> list[str]:
-    """Return the configured feed URLs from environment (unsorted)."""
-    urls = [u.strip() for u in NEWS_FEED_URLS.split(",") if u.strip()]
-    if not urls:
-        urls = [NEWS_FEED_URL]
-    return urls
-
-
-MAX_ENRICHMENT_RETRIES = 3  # per-cluster retries before giving up for this cycle
-
-async def _enrich_single_cluster(
-    c: dict,
-    topic: str,
-    llm_enabled: bool,
-    semaphore: asyncio.Semaphore,
-    store: SQLiteClusterStore,
-    logger: logging.Logger,
-) -> dict:
-    """Enrich one cluster: heuristic + optional LLM extraction, concurrency-limited.
-
-    Rule: if the cluster already has entities AND keywords (from a previous
-    enrichment), skip the LLM call entirely.  The data on the dict IS the
-    cache — no need to look up enriched_at timestamps or query the DB by
-    cluster_id.  This works regardless of whether cluster_id changed due to
-    article merging across polling cycles.
-
-    On LLM failure the cluster is retried up to MAX_ENRICHMENT_RETRIES times
-    with exponential backoff.  If all retries are exhausted the cluster is
-    marked with enrichment_failed_at and enrichment_retry_count so the next
-    polling cycle can re-attempt it.
-    """
-    c2 = enrich_cluster(c)
-    c2.setdefault("topic", topic)
-
-    cluster_id = c2.get("cluster_id")
-    if llm_enabled and cluster_id:
-        # --- Cache check: if the cluster already has entities AND keywords,
-        # it was enriched in a previous cycle.  Skip LLM entirely.
-        _existing_entities = c2.get("entities") or []
-        _existing_keywords = c2.get("keywords") or []
-        if _existing_entities and _existing_keywords:
-            logger.debug("enrich skip (already enriched) cluster=%s topic=%s", cluster_id, topic)
-            return c2
-
-        # --- Actually call the LLM ---
-        last_err = ""
-        for attempt in range(1 + MAX_ENRICHMENT_RETRIES):
-            if attempt > 0:
-                backoff = 2 ** attempt
-                logger.info(
-                    "retry cluster=%s topic=%s attempt=%d/%d backoff=%.0fs",
-                    cluster_id, topic, attempt, MAX_ENRICHMENT_RETRIES, backoff,
-                )
-                await asyncio.sleep(backoff)
-            try:
-                async with semaphore:
-                    c2 = await classify_cluster_llm(dict(c2))
-                c2["enriched_at"] = datetime.now(timezone.utc).isoformat()
-                break  # success
-            except Exception:
-                last_err = str(sys.exc_info()[1])[:200] if sys.exc_info()[1] else "unknown"
-                logger.warning(
-                    "LLM enrichment failed cluster=%s topic=%s attempt=%d/%d err=%s",
-                    cluster_id, topic, attempt, MAX_ENRICHMENT_RETRIES, last_err,
-                )
-        else:
-            # Loop completed without break = all retries exhausted
-            prev_count = c2.get("enrichment_retry_count", 0)
-            c2["enrichment_failed_at"] = datetime.now(timezone.utc).isoformat()
-            c2["enrichment_retry_count"] = prev_count + 1
-            logger.error(
-                "LLM enrichment exhausted cluster=%s topic=%s after %d retries",
-                cluster_id, topic, MAX_ENRICHMENT_RETRIES,
-            )
-
-    return c2
-
-
-async def _enrich_topic_clusters(
-    clusters: list[dict],
-    topic: str,
-    semaphore: asyncio.Semaphore,
-    store: SQLiteClusterStore,
-    logger: logging.Logger,
-    enrich_limit: int,
-) -> list[dict]:
-    """Enrich all clusters for a single topic concurrently."""
-    llm_enabled = (not ENRICH_OTHER_TOPICS_ONLY) or (topic == "other")
-
-    # Persist the raw clusters first so a slow enrichment pass does not
-    # leave the first bootstrap run with nothing stored.
-    store.upsert_clusters(clusters, topic=topic)
-    logger.info("refresh stored raw topic=%s clusters=%s", topic, len(clusters))
-
-    targets = clusters[:enrich_limit]
-    tasks = [
-        _enrich_single_cluster(c, topic, llm_enabled, semaphore, store, logger)
-        for c in targets
-    ]
-    enriched = await asyncio.gather(*tasks, return_exceptions=False)
-
-    # Any clusters beyond enrich_limit still need importance enrichment
-    for c in clusters[enrich_limit:]:
-        c2 = enrich_cluster(c)
-        c2.setdefault("topic", topic)
-        enriched.append(c2)
-
-    logger.info("refresh enriched topic=%s clusters=%s", topic, len(enriched))
-    return enriched
-
+# --------------------------------------------------------------------------- #
+#  Per-feed + per-cycle statistics
+# --------------------------------------------------------------------------- #
+
+@dataclass
+class FeedStats:
+    """Per-feed statistics for one polling cycle."""
+    feed_url: str
+    fetched: int = 0         # total items fetched from the feed
+    duplicate: int = 0       # unchanged hash → skipped entirely
+    stale: int = 0           # older than retention window (dropped)
+    ingested: int = 0        # passed dedup + retention, entered clustering
+    enriched: int = 0        # newly LLM-enriched this cycle
+    already_enriched: int = 0  # cache hit — already had entities+keywords
+    failed: int = 0          # LLM enrichment failed after retries
+
+
+@dataclass
+class PollStats:
+    """Aggregated statistics for one polling cycle."""
+    started_at: str = ""
+    feeds: list[FeedStats] = field(default_factory=list)
+    total_clusters: int = 0
+    total_newly_enriched: int = 0
+    total_already_enriched: int = 0
+    total_failed: int = 0
+
+    def summary(self) -> dict:
+        return {
+            "started_at": self.started_at,
+            "feeds": [
+                {
+                    "feed_url": f.feed_url,
+                    "fetched": f.fetched,
+                    "duplicate": f.duplicate,
+                    "stale": f.stale,
+                    "ingested": f.ingested,
+                }
+                for f in self.feeds
+            ],
+            "total_clusters": self.total_clusters,
+            "total_newly_enriched": self.total_newly_enriched,
+            "total_already_enriched": self.total_already_enriched,
+            "total_failed": self.total_failed,
+        }
+
+
+# --------------------------------------------------------------------------- #
+#  Poller
+# --------------------------------------------------------------------------- #
+
+class ClusterPoller:
+    """One polling cycle: fetch → dedup → cluster → enrich-once → store."""
+
+    MAX_ENRICHMENT_RETRIES = 3
+
+    def __init__(
+        self,
+        store: SQLiteClusterStore,
+        logger: logging.Logger | None = None,
+    ):
+        self.store = store
+        self.logger = logger or logging.getLogger("news_mcp.refresh")
+        self.stats = PollStats()
+
+    # ------------------------------------------------------------------ #
+    #  Public entry point
+    # ------------------------------------------------------------------ #
+
+    async def poll(self, topic_filter: str | None = None) -> PollStats:
+        """Run one full polling cycle. Returns statistics."""
+        self.stats = PollStats(started_at=datetime.now(timezone.utc).isoformat())
+
+        # 1. Load enabled feed URLs
+        configured_urls = self._load_feed_urls()
+        enabled_urls = self.store.get_enabled_feed_urls(configured_urls)
+        self.logger.info("poll start: enabled_feeds=%d configured=%d", len(enabled_urls), len(configured_urls))
+
+        # 2. Fetch articles from all enabled feeds, per-feed dedup
+        feed_map, feed_stats = await self._fetch_feeds(enabled_urls)
+
+        # Flatten all fresh articles (stats already tracked per-feed in feed_stats)
+        all_fresh = [a for articles in feed_map.values() for a in articles]
+
+        if not all_fresh:
+            self.logger.info("poll: no fresh articles from any feed")
+            self.stats.feeds = feed_stats
+            self._save_feed_stats(feed_stats)
+            self._prune_and_finalize(enabled_urls, feed_map)
+            return self.stats
+
+        # 3. Retention filter
+        articles = self._apply_retention(all_fresh, feed_map)
+
+        if not articles:
+            self.logger.info("poll: all %d fresh articles dropped by retention", len(all_fresh))
+            self.stats.feeds = feed_stats
+            self._save_feed_stats(feed_stats)
+            self._prune_and_finalize(enabled_urls, feed_map)
+            return self.stats
+
+        # 4. Pre-seed existing clusters for cross-cycle merging
+        existing_clusters = self._preseed_clusters()
+
+        # 5. Cluster (sync, may do concurrent embeddings internally)
+        clustered_by_topic = await self._cluster(articles, existing_clusters)
+
+        # 6. Enrich every cluster that needs it, store immediately
+        await self._enrich_all(clustered_by_topic)
+
+        # 7. Retry previously failed enrichments
+        await self._retry_failed()
+
+        # 8. Persist feed stats + prune
+        self.stats.feeds = feed_stats
+        self._save_feed_stats(feed_stats)
+        self._prune_and_finalize(enabled_urls, feed_map)
+
+        self.logger.info(
+            "poll complete: clusters=%d newly_enriched=%d already_enriched=%d failed=%d",
+            self.stats.total_clusters,
+            self.stats.total_newly_enriched,
+            self.stats.total_already_enriched,
+            self.stats.total_failed,
+        )
+        return self.stats
+
+    # ------------------------------------------------------------------ #
+    #  Phase 1: Load feed URLs
+    # ------------------------------------------------------------------ #
+
+    @staticmethod
+    def _load_feed_urls() -> list[str]:
+        urls = [u.strip() for u in NEWS_FEED_URLS.split(",") if u.strip()]
+        if not urls:
+            urls = [NEWS_FEED_URL]
+        return urls
+
+    # ------------------------------------------------------------------ #
+    #  Phase 2: Fetch + per-feed dedup
+    # ------------------------------------------------------------------ #
+
+    async def _fetch_feeds(
+        self, feed_urls: list[str],
+    ) -> tuple[dict[str, list[dict]], list[FeedStats]]:
+        """Fetch all feeds concurrently. Returns {feed_url: fresh_articles}
+        and per-feed stats. Unchanged feeds (same content hash) are dropped."""
+        articles = await fetch_news_articles(limit=9999, url_list=feed_urls)
+        # limit=9999 effectively means no per-feed cap — fetches everything
+        # the feed gives us.  fetch_news_articles applies max(1, limit).
+
+        # Group by feed URL
+        per_feed: dict[str, list[dict]] = defaultdict(list)
+        for a in articles:
+            fu = str(a.get("feed_url") or NEWS_FEED_URL).strip() or NEWS_FEED_URL
+            per_feed[fu].append(a)
 
-def _cluster_age_ok(cluster: dict, max_age_hours: float) -> bool:
-    """Deprecated alias — use _cluster_is_within_age_window from cluster.py."""
-    return _cluster_is_within_age_window(cluster, max_age_hours=max_age_hours)
+        # Per-feed content hash dedup
+        feed_map: dict[str, list[dict]] = {}
+        feed_stats_list: list[FeedStats] = []
 
+        for feed_url in feed_urls:
+            feed_articles = per_feed.get(feed_url, [])
+            stats = FeedStats(feed_url=feed_url, fetched=len(feed_articles))
 
-async def refresh_clusters(topic: str | None = None, limit: int = 80) -> None:
-    logger = logging.getLogger("news_mcp.refresh")
-    store = SQLiteClusterStore(DB_PATH)
+            if not feed_articles:
+                self.logger.info("feed empty: feed_url=%s", feed_url)
+                feed_stats_list.append(stats)
+                continue
 
-    logger.info("refresh start topic=%s limit=%s", topic, limit)
-
-    # Get enabled feed URLs from store (seeds new ones as enabled by default).
-    configured_urls = _load_feed_urls()
-    enabled_urls = store.get_enabled_feed_urls(configured_urls)
-    logger.info("refresh enabled feeds=%d / configured=%d", len(enabled_urls), len(configured_urls))
-
-    # fetch_news_articles is now fully async (concurrent RSS fetching)
-    articles = await fetch_news_articles(limit, url_list=enabled_urls)
-    logger.info("refresh fetched articles=%s", len(articles))
-
-    # Drop legacy aggregate feed-state rows so the dashboard only reflects
-    # real per-feed poll status from this point forward.
-    with store._conn() as conn:
-        conn.execute("DELETE FROM feed_state WHERE feed_key LIKE 'newsfeeds:%'")
-
-    # Track feed freshness per RSS URL so unchanged feeds can be skipped.
-    per_feed: dict[str, list[dict[str, Any]]] = defaultdict(list)
-    for article in articles:
-        feed_url = str(article.get("feed_url") or NEWS_FEED_URL).strip() or NEWS_FEED_URL
-        per_feed[feed_url].append(article)
-
-    changed_articles: list[dict[str, Any]] = []
-    changed_feed_urls: list[str] = []
-    for feed_url, feed_articles in per_feed.items():
-        logger.info("refresh feed batch start feed_url=%s count=%s", feed_url, len(feed_articles))
-        material = "\n".join(
-            f"{a.get('title','')}|{a.get('url','')}|{a.get('timestamp','')}"
-            for a in feed_articles
-        )
-        last_hash = hashlib.sha1(material.encode("utf-8")).hexdigest()
-        feed_key = feed_url
-        prev_hash = store.get_feed_hash(feed_key)
-        if prev_hash == last_hash:
-            logger.info("refresh unchanged feed_url=%s count=%s topic=%s", feed_url, len(feed_articles), topic)
-        else:
-            logger.info("refresh changed feed_url=%s count=%s topic=%s", feed_url, len(feed_articles), topic)
-            changed_feed_urls.append(feed_url)
-            changed_articles.extend(feed_articles)
-        logger.info("refresh feed batch complete feed_url=%s changed_total=%s", feed_url, len(changed_articles))
-
-    if not changed_articles:
-        logger.info("refresh unchanged all feeds topic=%s", topic)
-        store.set_meta("last_refresh_at", datetime.now(timezone.utc).isoformat())
-        prune_result = store.prune_if_due(
-            pruning_enabled=NEWS_PRUNING_ENABLED,
-            retention_days=NEWS_RETENTION_DAYS,
-            interval_hours=NEWS_PRUNE_INTERVAL_HOURS,
-        )
-        logger.info("refresh prune_result=%s", prune_result)
-        return
+            material = "\n".join(
+                f"{a.get('title','')}|{a.get('url','')}|{a.get('timestamp','')}"
+                for a in feed_articles
+            )
+            content_hash = hashlib.sha1(material.encode("utf-8")).hexdigest()
+            prev_hash = self.store.get_feed_hash(feed_url)
 
-    articles = changed_articles
+            if prev_hash == content_hash:
+                stats.duplicate = len(feed_articles)
+                self.logger.info("feed unchanged: feed_url=%s items=%d", feed_url, len(feed_articles))
+                feed_stats_list.append(stats)
+                continue
 
-    # Pre-filter: drop articles whose RSS timestamp is older than retention.
-    # This prevents stale feed items from being re-ingested after pruning.
-    if NEWS_RETENTION_DAYS > 0:
-        retention_cutoff = datetime.now(timezone.utc) - timedelta(days=NEWS_RETENTION_DAYS)
-        fresh_articles = []
+            feed_map[feed_url] = feed_articles
+            self.logger.info(
+                "feed changed: feed_url=%s items=%d hash_prev=%s hash_now=%s",
+                feed_url, len(feed_articles),
+                (prev_hash or "-")[:12], content_hash[:12],
+            )
+            feed_stats_list.append(stats)
+
+        return feed_map, feed_stats_list
+
+    # ------------------------------------------------------------------ #
+    #  Phase 3: Retention filter
+    # ------------------------------------------------------------------ #
+
+    def _apply_retention(
+        self, articles: list[dict], feed_map: dict[str, list[dict]],
+    ) -> list[dict]:
+        """Drop articles older than NEWS_RETENTION_DAYS. Updates FeedStats."""
+        if NEWS_RETENTION_DAYS <= 0:
+            return articles
+        cutoff = datetime.now(timezone.utc) - timedelta(days=NEWS_RETENTION_DAYS)
+
+        # Build a lookup: article_url → feed_url for stats
+        article_feed: dict[str, str] = {}
+        for fu, arts in feed_map.items():
+            for a in arts:
+                article_feed[a.get("url", "")] = fu
+
+        fresh = []
+        dropped = 0
         for a in articles:
             ts_str = a.get("timestamp", "")
             if not ts_str:
-                fresh_articles.append(a)
+                fresh.append(a)
                 continue
             dt = _parse_ts(ts_str)
-            if dt is None:
-                fresh_articles.append(a)
-                continue
-            if dt >= retention_cutoff:
-                fresh_articles.append(a)
+            if dt is None or dt >= cutoff:
+                fresh.append(a)
             else:
-                logger.debug("drop stale article title=%s ts=%s", a.get("title", "")[:60], ts_str)
-        dropped = len(articles) - len(fresh_articles)
+                dropped += 1
+                fu = article_feed.get(a.get("url", ""), "")
+                if fu:
+                    # Find matching FeedStats and increment stale
+                    for fs in self.stats.feeds:
+                        if fs.feed_url == fu:
+                            fs.stale += 1
+                            break
         if dropped:
-            logger.info("refresh retention-filter dropped=%d remaining=%d retention_days=%.0f", dropped, len(fresh_articles), NEWS_RETENTION_DAYS)
-        articles = fresh_articles
-
-    if not articles:
-        logger.info("refresh no articles after retention filter topic=%s", topic)
-        store.set_meta("last_refresh_at", datetime.now(timezone.utc).isoformat())
-        prune_result = store.prune_if_due(
-            pruning_enabled=NEWS_PRUNING_ENABLED,
-            retention_days=NEWS_RETENTION_DAYS,
-            interval_hours=NEWS_PRUNE_INTERVAL_HOURS,
+            self.logger.info("retention: dropped=%d remaining=%d retention_days=%.0f", dropped, len(fresh), NEWS_RETENTION_DAYS)
+        return fresh
+
+    # ------------------------------------------------------------------ #
+    #  Phase 4: Pre-seed existing clusters
+    # ------------------------------------------------------------------ #
+
+    def _preseed_clusters(self) -> list[dict]:
+        """Load recent clusters from DB for cross-cycle article merging."""
+        max_age = NEWS_CLUSTER_MAX_AGE_HOURS
+        if max_age == 0:
+            return []
+        lookback = max_age if max_age > 0 else 72
+        all_recent = self.store.get_latest_clusters_all_topics(ttl_hours=lookback, limit=500)
+        recent = [c for c in all_recent if _cluster_is_within_age_window(c, max_age)]
+        self.logger.info("pre-seeded: existing_clusters=%d max_age_h=%.1f", len(recent), max_age)
+        return recent
+
+    # ------------------------------------------------------------------ #
+    #  Phase 5: Clustering
+    # ------------------------------------------------------------------ #
+
+    async def _cluster(
+        self, articles: list[dict], existing_clusters: list[dict],
+    ) -> dict[str, list[dict]]:
+        """Run dedup_and_cluster_articles. Returns {topic: [clusters]}."""
+        self.logger.info("clustering: articles=%d existing_clusters=%d", len(articles), len(existing_clusters))
+        clustered = await asyncio.to_thread(
+            dedup_and_cluster_articles,
+            articles,
+            None,  # default similarity_threshold
+            existing_clusters=existing_clusters if existing_clusters else None,
+            max_age_hours=NEWS_CLUSTER_MAX_AGE_HOURS,
         )
-        logger.info("refresh prune_result=%s", prune_result)
-        return
+        self.logger.info("clustered: topics=%s", list(clustered.keys()))
+        return clustered
 
-    logger.info("refresh clustering start articles=%s topic=%s", len(articles), topic)
+    # ------------------------------------------------------------------ #
+    #  Phase 6: Enrich + store
+    # ------------------------------------------------------------------ #
 
-    # Pre-seed with recent clusters from the DB so new articles can merge
-    # into existing clusters across polling cycles.
-    max_age = NEWS_CLUSTER_MAX_AGE_HOURS
-    recent_clusters: list[dict] = []
-    if max_age != 0:
-        lookback = max_age if max_age > 0 else 72
-        all_recent = store.get_latest_clusters_all_topics(
-            ttl_hours=lookback,
-            limit=500,
-        )
-        recent_clusters = [c for c in all_recent if _cluster_age_ok(c, max_age)]
-        logger.info(
-            "refresh pre-seeded existing_clusters=%s max_age_h=%s",
-            len(recent_clusters), max_age,
-        )
+    async def _enrich_all(self, clustered_by_topic: dict[str, list[dict]]) -> None:
+        """Enrich every cluster that needs it and store immediately."""
+        semaphore = asyncio.Semaphore(llm_concurrency(NEWS_EXTRACT_PROVIDER))
+        rate = llm_rate_limit(NEWS_EXTRACT_PROVIDER)
 
-    # Clustering is sync but may do concurrent embedding fetches internally.
-    # Run off-thread so the event loop stays responsive for MCP tool calls.
-    clustered_by_topic = await asyncio.to_thread(
-        dedup_and_cluster_articles,
-        articles,
-        None,  # use default similarity_threshold
-        existing_clusters=recent_clusters if recent_clusters else None,
-        max_age_hours=max_age,
-    )
-    logger.info("refresh clustered topics=%s", list(clustered_by_topic.keys()))
-
-    # Build LLM concurrency semaphore from the extract provider's config.
-    max_llm_concurrent = llm_concurrency(NEWS_EXTRACT_PROVIDER)
-    llm_semaphore = asyncio.Semaphore(max_llm_concurrent)
-    logger.info("refresh llm semaphore limit=%s provider=%s", max_llm_concurrent, NEWS_EXTRACT_PROVIDER)
-
-    from news_mcp.config import llm_rate_limit as _rl
-    _rate = _rl(NEWS_EXTRACT_PROVIDER)
-    logger.info("refresh llm rate-limit=%s/s provider=%s", _rate, NEWS_EXTRACT_PROVIDER)
-
-    # Enrich each topic's clusters concurrently.
-    topic_tasks = []
-    for t, clusters in clustered_by_topic.items():
-        if topic and t != topic:
-            continue
-
-        # Determine how many clusters to LLM-enrich.
-        # ENRICHMENT_MAX_PER_REFRESH=0 means enrich every cluster (no cap).
-        enrich_limit = ENRICHMENT_MAX_PER_REFRESH or len(clusters)
-
-        topic_tasks.append(
-            _enrich_topic_clusters(
-                clusters=clusters,
-                topic=t,
-                semaphore=llm_semaphore,
-                store=store,
-                logger=logger,
-                enrich_limit=enrich_limit,
-            )
+        self.logger.info(
+            "enrich: semaphore_limit=%d rate_limit=%s/s provider=%s",
+            llm_concurrency(NEWS_EXTRACT_PROVIDER), rate, NEWS_EXTRACT_PROVIDER,
         )
 
-    # Run all topic enrichment phases concurrently
-    topic_results = await asyncio.gather(*topic_tasks, return_exceptions=False)
+        # Flatten all clusters into one list with their topics
+        all_targets: list[tuple[str, dict]] = []
+        for topic, clusters in clustered_by_topic.items():
+            for c in clusters:
+                all_targets.append((topic, c))
 
-    # Persist enriched clusters grouped by their final topic
-    for enriched in topic_results:
-        by_final_topic: Dict[str, list] = {}
-        for c2 in enriched:
+        if not all_targets:
+            return
+
+        # Enrich concurrently
+        tasks = [
+            self._enrich_one(c, topic, semaphore, rate)
+            for topic, c in all_targets
+        ]
+        results = await asyncio.gather(*tasks, return_exceptions=False)
+
+        # Store each cluster individually, grouped by final topic
+        by_final_topic: dict[str, list[dict]] = defaultdict(list)
+        for c2, was_new in results:
             final_topic = str(c2.get("topic") or "other").strip().lower()
-            if final_topic not in {x.lower() for x in DEFAULT_TOPICS}:
+            if final_topic not in {t.lower() for t in DEFAULT_TOPICS}:
                 final_topic = "other"
-            by_final_topic.setdefault(final_topic, []).append(c2)
+            by_final_topic[final_topic].append(c2)
+            self.stats.total_clusters += 1
+            if was_new:
+                self.stats.total_newly_enriched += 1
+            else:
+                self.stats.total_already_enriched += 1
+
         for final_topic, group in by_final_topic.items():
-            store.upsert_clusters(group, topic=final_topic)
-            logger.info("refresh stored topic=%s clusters=%s", final_topic, len(group))
-
-    # Retry previously failed enrichments
-    failed_clusters = store.get_failed_enrichment_clusters(max_retries=3)
-    if failed_clusters:
-        logger.info("retry enrich failed clusters count=%d", len(failed_clusters))
-        retry_tasks = [
-            _enrich_single_cluster(
-                c, str(c.get("topic") or "other"), True, llm_semaphore, store, logger,
-            )
-            for c in failed_clusters
+            self.store.upsert_clusters(group, topic=final_topic)
+            self.logger.info("stored: topic=%s clusters=%d", final_topic, len(group))
+
+    async def _enrich_one(
+        self,
+        cluster: dict,
+        topic: str,
+        semaphore: asyncio.Semaphore,
+        rate: float,
+    ) -> tuple[dict, bool]:
+        """Enrich a single cluster. Returns (cluster, was_newly_enriched).
+
+        If the cluster already has entities AND keywords, skip LLM entirely.
+        The data on the dict IS the cache — no timestamp or DB lookup needed.
+        """
+        c2 = enrich_cluster(cluster)
+        c2.setdefault("topic", topic)
+
+        llm_enabled = (not ENRICH_OTHER_TOPICS_ONLY) or (topic == "other")
+        cluster_id = c2.get("cluster_id")
+
+        if not llm_enabled or not cluster_id:
+            return c2, False
+
+        # Cache check: entities + keywords already present → skip
+        if (c2.get("entities") or []) and (c2.get("keywords") or []):
+            self.logger.debug("enrich skip (cached): cluster=%s topic=%s", cluster_id, topic)
+            return c2, False
+
+        # Actually call the LLM
+        last_err = ""
+        for attempt in range(1 + self.MAX_ENRICHMENT_RETRIES):
+            if attempt > 0:
+                backoff = 2 ** attempt
+                self.logger.info("retry: cluster=%s attempt=%d backoff=%.0fs", cluster_id, attempt, backoff)
+                await asyncio.sleep(backoff)
+            try:
+                async with semaphore:
+                    c2 = await classify_cluster_llm(dict(c2))
+                c2["enriched_at"] = datetime.now(timezone.utc).isoformat()
+                return c2, True
+            except Exception:
+                last_err = str(sys.exc_info()[1])[:200] if sys.exc_info()[1] else "unknown"
+                self.logger.warning(
+                    "enrich failed: cluster=%s attempt=%d err=%s",
+                    cluster_id, attempt, last_err,
+                )
+
+        # All retries exhausted
+        prev_count = c2.get("enrichment_retry_count", 0)
+        c2["enrichment_failed_at"] = datetime.now(timezone.utc).isoformat()
+        c2["enrichment_retry_count"] = prev_count + 1
+        self.logger.error("enrich exhausted: cluster=%s after %d retries", cluster_id, self.MAX_ENRICHMENT_RETRIES)
+        self.stats.total_failed += 1
+        return c2, True  # was "newly" enriched (attempted), but failed
+
+    # ------------------------------------------------------------------ #
+    #  Phase 7: Retry failed enrichments
+    # ------------------------------------------------------------------ #
+
+    async def _retry_failed(self) -> None:
+        """Retry clusters whose previous enrichment failed."""
+        failed = self.store.get_failed_enrichment_clusters(max_retries=3)
+        if not failed:
+            return
+
+        self.logger.info("retry: failed_clusters=%d", len(failed))
+        semaphore = asyncio.Semaphore(llm_concurrency(NEWS_EXTRACT_PROVIDER))
+        rate = llm_rate_limit(NEWS_EXTRACT_PROVIDER)
+
+        tasks = [
+            self._enrich_one(c, str(c.get("topic") or "other"), semaphore, rate)
+            for c in failed
         ]
-        retry_results = await asyncio.gather(*retry_tasks, return_exceptions=False)
-        # Persist retried results
-        by_topic_retry: Dict[str, list] = {}
-        for c2 in retry_results:
-            # Clear stale failure marker on success
-            if not c2.get("enrichment_failed_at") or c2.get("entities"):
+        results = await asyncio.gather(*tasks, return_exceptions=False)
+
+        by_topic: dict[str, list[dict]] = defaultdict(list)
+        attempted = 0
+        now_success = 0
+        still_failed = 0
+        for c2, was_new in results:
+            if not was_new:
+                continue
+            attempted += 1
+            # Clear failure marker on success
+            if c2.get("enriched_at") and not c2.get("enrichment_failed_at"):
                 c2.pop("enrichment_failed_at", None)
                 c2.pop("enrichment_retry_count", None)
+                now_success += 1
+            else:
+                still_failed += 1
             t = str(c2.get("topic") or "other").strip().lower()
             if t not in {x.lower() for x in DEFAULT_TOPICS}:
                 t = "other"
-            by_topic_retry.setdefault(t, []).append(c2)
-        for t, group in by_topic_retry.items():
-            store.upsert_clusters(group, topic=t)
-            logger.info("retry stored topic=%s clusters=%s", t, len(group))
-
-    prune_result = store.prune_if_due(
-        pruning_enabled=NEWS_PRUNING_ENABLED,
-        retention_days=NEWS_RETENTION_DAYS,
-        interval_hours=NEWS_PRUNE_INTERVAL_HOURS,
-    )
-    for feed_url in changed_feed_urls:
-        feed_articles = per_feed[feed_url]
-        material = "\n".join(
-            f"{a.get('title','')}|{a.get('url','')}|{a.get('timestamp','')}"
-            for a in feed_articles
+            by_topic[t].append(c2)
+
+        for t, group in by_topic.items():
+            self.store.upsert_clusters(group, topic=t)
+            self.logger.info("retry stored: topic=%s clusters=%d", t, len(group))
+
+        if attempted:
+            self.logger.info("retry done: attempted=%d recovered=%d still_failed=%d", attempted, now_success, still_failed)
+
+    # ------------------------------------------------------------------ #
+    #  Phase 8: Feed stats + prune
+    # ------------------------------------------------------------------ #
+
+    def _save_feed_stats(self, feed_stats: list[FeedStats]) -> None:
+        """Log per-feed statistics. ingested = fetched - duplicate - stale."""
+        for fs in feed_stats:
+            fs.ingested = max(0, fs.fetched - fs.duplicate - fs.stale)
+            self.logger.info(
+                "feed stats: feed_url=%s fetched=%d duplicate=%d stale=%d ingested=%d",
+                fs.feed_url, fs.fetched, fs.duplicate, fs.stale, fs.ingested,
+            )
+
+    def _prune_and_finalize(
+        self,
+        enabled_urls: list[str],
+        feed_map: dict[str, list[dict]],
+    ) -> None:
+        """Run pruning and update feed_state hashes + timestamps."""
+        prune_result = self.store.prune_if_due(
+            pruning_enabled=NEWS_PRUNING_ENABLED,
+            retention_days=NEWS_RETENTION_DAYS,
+            interval_hours=NEWS_PRUNE_INTERVAL_HOURS,
         )
-        last_hash = hashlib.sha1(material.encode("utf-8")).hexdigest()
-        store.set_feed_state(feed_url, last_hash, len(feed_articles))
-    store.set_meta("last_refresh_at", datetime.now(timezone.utc).isoformat())
-    logger.info("refresh prune_result=%s", prune_result)
+
+        # Update feed_state: hash + item_count for feeds that had changes
+        for feed_url, feed_articles in feed_map.items():
+            material = "\n".join(
+                f"{a.get('title','')}|{a.get('url','')}|{a.get('timestamp','')}"
+                for a in feed_articles
+            )
+            content_hash = hashlib.sha1(material.encode("utf-8")).hexdigest()
+            self.store.set_feed_state(feed_url, content_hash, len(feed_articles))
+
+        # Drop legacy aggregate feed-state rows
+        with self.store._conn() as conn:
+            conn.execute("DELETE FROM feed_state WHERE feed_key LIKE 'newsfeeds:%'")
+
+        self.store.set_meta("last_refresh_at", datetime.now(timezone.utc).isoformat())
+        self.logger.info("prune: %s", prune_result)
+
+
+# --------------------------------------------------------------------------- #
+#  Compatibility wrapper (used by background loop + tests)
+# --------------------------------------------------------------------------- #
+
+async def refresh_clusters(topic: str | None = None, limit: int = 80) -> None:
+    """Backward-compatible entry point. Delegates to ClusterPoller."""
+    store = SQLiteClusterStore(DB_PATH)
+    poller = ClusterPoller(store)
+    await poller.poll(topic_filter=topic)