from __future__ import annotations import asyncio import hashlib import logging import sys from collections import defaultdict from dataclasses import dataclass, field from datetime import datetime, timezone, timedelta from typing import Any, Dict from news_mcp.config import ( DEFAULT_TOPICS, DB_PATH, ENRICH_OTHER_TOPICS_ONLY, NEWS_EXTRACT_PROVIDER, NEWS_FEED_URL, NEWS_FEED_URLS, NEWS_PRUNE_INTERVAL_HOURS, NEWS_PRUNING_ENABLED, NEWS_RETENTION_DAYS, NEWS_CLUSTER_MAX_AGE_HOURS, llm_concurrency, llm_rate_limit, ) from news_mcp.dedup.cluster import dedup_and_cluster_articles, _cluster_is_within_age_window, _parse_ts from news_mcp.enrichment.enrich import enrich_cluster from news_mcp.enrichment.llm_enrich import classify_cluster_llm from news_mcp.sources.news_feeds import fetch_news_articles from news_mcp.storage.sqlite_store import SQLiteClusterStore # --------------------------------------------------------------------------- # # Per-feed + per-cycle statistics # --------------------------------------------------------------------------- # @dataclass class FeedStats: """Per-feed statistics for one polling cycle.""" feed_url: str fetched: int = 0 # total items fetched from the feed duplicate: int = 0 # unchanged hash → skipped entirely stale: int = 0 # older than retention window (dropped) ingested: int = 0 # passed dedup + retention, entered clustering enriched: int = 0 # newly LLM-enriched this cycle already_enriched: int = 0 # cache hit — already had entities+keywords failed: int = 0 # LLM enrichment failed after retries @dataclass class PollStats: """Aggregated statistics for one polling cycle.""" started_at: str = "" feeds: list[FeedStats] = field(default_factory=list) total_clusters: int = 0 total_newly_enriched: int = 0 total_already_enriched: int = 0 total_failed: int = 0 def summary(self) -> dict: return { "started_at": self.started_at, "feeds": [ { "feed_url": f.feed_url, "fetched": f.fetched, "duplicate": f.duplicate, "stale": f.stale, "ingested": f.ingested, } for f in self.feeds ], "total_clusters": self.total_clusters, "total_newly_enriched": self.total_newly_enriched, "total_already_enriched": self.total_already_enriched, "total_failed": self.total_failed, } # --------------------------------------------------------------------------- # # Poller # --------------------------------------------------------------------------- # class ClusterPoller: """One polling cycle: fetch → dedup → cluster → enrich-once → store.""" MAX_ENRICHMENT_RETRIES = 3 def __init__( self, store: SQLiteClusterStore, logger: logging.Logger | None = None, ): self.store = store self.logger = logger or logging.getLogger("news_mcp.refresh") self.stats = PollStats() # ------------------------------------------------------------------ # # Public entry point # ------------------------------------------------------------------ # async def poll(self, topic_filter: str | None = None) -> PollStats: """Run one full polling cycle. Returns statistics.""" self.stats = PollStats(started_at=datetime.now(timezone.utc).isoformat()) # 1. Load configured + enabled feed URLs configured_urls = self._load_feed_urls() enabled_urls = self.store.get_enabled_feed_urls(configured_urls) self.logger.info("poll start: enabled_feeds=%d configured=%d", len(enabled_urls), len(configured_urls)) # 2. Fetch articles from enabled feeds only, per-feed dedup feed_map, feed_stats = await self._fetch_feeds(enabled_urls) # Flatten all fresh articles (stats already tracked per-feed in feed_stats) all_fresh = [a for articles in feed_map.values() for a in articles] if not all_fresh: self.logger.info("poll: no fresh articles from any feed") self.stats.feeds = feed_stats self._save_feed_stats(feed_stats) self._prune_and_finalize(enabled_urls, feed_map) return self.stats # 3. Retention filter articles = self._apply_retention(all_fresh, feed_map) if not articles: self.logger.info("poll: all %d fresh articles dropped by retention", len(all_fresh)) self.stats.feeds = feed_stats self._save_feed_stats(feed_stats) self._prune_and_finalize(enabled_urls, feed_map) return self.stats # 4. Pre-seed existing clusters for cross-cycle merging existing_clusters = self._preseed_clusters() # 5. Cluster (sync, may do concurrent embeddings internally) clustered_by_topic = await self._cluster(articles, existing_clusters) # 6. Enrich every cluster that needs it, store immediately await self._enrich_all(clustered_by_topic) # 7. Retry previously failed enrichments await self._retry_failed() # 8. Persist feed stats + prune self.stats.feeds = feed_stats self._save_feed_stats(feed_stats) self._prune_and_finalize(enabled_urls, feed_map) self.logger.info( "poll complete: clusters=%d newly_enriched=%d already_enriched=%d failed=%d", self.stats.total_clusters, self.stats.total_newly_enriched, self.stats.total_already_enriched, self.stats.total_failed, ) return self.stats # ------------------------------------------------------------------ # # Phase 1: Load feed URLs # ------------------------------------------------------------------ # @staticmethod def _load_feed_urls() -> list[str]: urls = [u.strip() for u in NEWS_FEED_URLS.split(",") if u.strip()] if not urls: urls = [NEWS_FEED_URL] return urls # ------------------------------------------------------------------ # # Phase 2: Fetch + per-feed dedup # ------------------------------------------------------------------ # async def _fetch_feeds( self, feed_urls: list[str], ) -> tuple[dict[str, list[dict]], list[FeedStats]]: """Fetch all feeds concurrently. Returns {feed_url: fresh_articles} and per-feed stats. Unchanged feeds (same content hash) are dropped.""" articles = await fetch_news_articles(limit=9999, url_list=feed_urls) # limit=9999 effectively means no per-feed cap — fetches everything # the feed gives us. fetch_news_articles applies max(1, limit). # Group by feed URL per_feed: dict[str, list[dict]] = defaultdict(list) for a in articles: fu = str(a.get("feed_url") or NEWS_FEED_URL).strip() or NEWS_FEED_URL per_feed[fu].append(a) # Per-feed content hash dedup feed_map: dict[str, list[dict]] = {} feed_stats_list: list[FeedStats] = [] for feed_url in feed_urls: feed_articles = per_feed.get(feed_url, []) stats = FeedStats(feed_url=feed_url, fetched=len(feed_articles)) if not feed_articles: self.logger.info("feed empty: feed_url=%s", feed_url) feed_stats_list.append(stats) continue material = "\n".join( f"{a.get('title','')}|{a.get('url','')}" for a in feed_articles ) content_hash = hashlib.sha1(material.encode("utf-8")).hexdigest() prev_hash = self.store.get_feed_hash(feed_url) if prev_hash == content_hash: stats.duplicate = len(feed_articles) self.logger.info("feed unchanged: feed_url=%s items=%d", feed_url, len(feed_articles)) feed_stats_list.append(stats) continue feed_map[feed_url] = feed_articles self.logger.info( "feed changed: feed_url=%s items=%d hash_prev=%s hash_now=%s", feed_url, len(feed_articles), (prev_hash or "-")[:12], content_hash[:12], ) feed_stats_list.append(stats) return feed_map, feed_stats_list # ------------------------------------------------------------------ # # Phase 3: Retention filter # ------------------------------------------------------------------ # def _apply_retention( self, articles: list[dict], feed_map: dict[str, list[dict]], ) -> list[dict]: """Drop articles older than NEWS_RETENTION_DAYS. Updates FeedStats.""" if NEWS_RETENTION_DAYS <= 0: return articles cutoff = datetime.now(timezone.utc) - timedelta(days=NEWS_RETENTION_DAYS) # Build a lookup: article_url → feed_url for stats article_feed: dict[str, str] = {} for fu, arts in feed_map.items(): for a in arts: article_feed[a.get("url", "")] = fu fresh = [] dropped = 0 for a in articles: ts_str = a.get("timestamp", "") if not ts_str: fresh.append(a) continue dt = _parse_ts(ts_str) if dt is None or dt >= cutoff: fresh.append(a) else: dropped += 1 fu = article_feed.get(a.get("url", ""), "") if fu: # Find matching FeedStats and increment stale for fs in self.stats.feeds: if fs.feed_url == fu: fs.stale += 1 break if dropped: self.logger.info("retention: dropped=%d remaining=%d retention_days=%.0f", dropped, len(fresh), NEWS_RETENTION_DAYS) return fresh # ------------------------------------------------------------------ # # Phase 4: Pre-seed existing clusters # ------------------------------------------------------------------ # def _preseed_clusters(self) -> list[dict]: """Load recent clusters from DB for cross-cycle article merging.""" max_age = NEWS_CLUSTER_MAX_AGE_HOURS if max_age == 0: return [] lookback = max_age if max_age > 0 else 72 all_recent = self.store.get_latest_clusters_all_topics(ttl_hours=lookback, limit=500) recent = [c for c in all_recent if _cluster_is_within_age_window(c, max_age_hours=max_age)] self.logger.info("pre-seeded: existing_clusters=%d max_age_h=%.1f", len(recent), max_age) return recent # ------------------------------------------------------------------ # # Phase 5: Clustering # ------------------------------------------------------------------ # async def _cluster( self, articles: list[dict], existing_clusters: list[dict], ) -> dict[str, list[dict]]: """Run dedup_and_cluster_articles. Returns {topic: [clusters]}.""" self.logger.info("clustering: articles=%d existing_clusters=%d", len(articles), len(existing_clusters)) clustered = await asyncio.to_thread( dedup_and_cluster_articles, articles, None, # default similarity_threshold existing_clusters=existing_clusters if existing_clusters else None, max_age_hours=NEWS_CLUSTER_MAX_AGE_HOURS, ) self.logger.info("clustered: topics=%s", list(clustered.keys())) return clustered # ------------------------------------------------------------------ # # Phase 6: Enrich + store # ------------------------------------------------------------------ # async def _enrich_all(self, clustered_by_topic: dict[str, list[dict]]) -> None: """Enrich every cluster that needs it and store immediately.""" semaphore = asyncio.Semaphore(llm_concurrency(NEWS_EXTRACT_PROVIDER)) rate = llm_rate_limit(NEWS_EXTRACT_PROVIDER) self.logger.info( "enrich: semaphore_limit=%d rate_limit=%s/s provider=%s", llm_concurrency(NEWS_EXTRACT_PROVIDER), rate, NEWS_EXTRACT_PROVIDER, ) # Flatten all clusters into one list with their topics all_targets: list[tuple[str, dict]] = [] for topic, clusters in clustered_by_topic.items(): for c in clusters: all_targets.append((topic, c)) if not all_targets: return # Enrich concurrently tasks = [ self._enrich_one(c, topic, semaphore, rate) for topic, c in all_targets ] results = await asyncio.gather(*tasks, return_exceptions=False) # Store each cluster individually, grouped by final topic by_final_topic: dict[str, list[dict]] = defaultdict(list) for c2, was_new in results: final_topic = str(c2.get("topic") or "other").strip().lower() if final_topic not in {t.lower() for t in DEFAULT_TOPICS}: final_topic = "other" by_final_topic[final_topic].append(c2) self.stats.total_clusters += 1 if was_new: self.stats.total_newly_enriched += 1 else: self.stats.total_already_enriched += 1 for final_topic, group in by_final_topic.items(): self.store.upsert_clusters(group, topic=final_topic) self.logger.info("stored: topic=%s clusters=%d", final_topic, len(group)) async def _enrich_one( self, cluster: dict, topic: str, semaphore: asyncio.Semaphore, rate: float, ) -> tuple[dict, bool]: """Enrich a single cluster. Returns (cluster, was_newly_enriched). If the cluster already has entities AND keywords, skip LLM entirely. The data on the dict IS the cache — no timestamp or DB lookup needed. """ c2 = enrich_cluster(cluster) c2.setdefault("topic", topic) llm_enabled = (not ENRICH_OTHER_TOPICS_ONLY) or (topic == "other") cluster_id = c2.get("cluster_id") if not llm_enabled or not cluster_id: return c2, False # Cache check: entities + keywords already present → skip if (c2.get("entities") or []) and (c2.get("keywords") or []): self.logger.debug("enrich skip (cached): cluster=%s topic=%s", cluster_id, topic) return c2, False # Actually call the LLM last_err = "" for attempt in range(1 + self.MAX_ENRICHMENT_RETRIES): if attempt > 0: backoff = 2 ** attempt self.logger.info("retry: cluster=%s attempt=%d backoff=%.0fs", cluster_id, attempt, backoff) await asyncio.sleep(backoff) try: async with semaphore: c2 = await classify_cluster_llm(dict(c2)) c2["enriched_at"] = datetime.now(timezone.utc).isoformat() return c2, True except Exception: last_err = str(sys.exc_info()[1])[:200] if sys.exc_info()[1] else "unknown" self.logger.warning( "enrich failed: cluster=%s attempt=%d err=%s", cluster_id, attempt, last_err, ) # All retries exhausted prev_count = c2.get("enrichment_retry_count", 0) c2["enrichment_failed_at"] = datetime.now(timezone.utc).isoformat() c2["enrichment_retry_count"] = prev_count + 1 self.logger.error("enrich exhausted: cluster=%s after %d retries", cluster_id, self.MAX_ENRICHMENT_RETRIES) self.stats.total_failed += 1 return c2, True # was "newly" enriched (attempted), but failed # ------------------------------------------------------------------ # # Phase 7: Retry failed enrichments # ------------------------------------------------------------------ # async def _retry_failed(self) -> None: """Retry clusters whose previous enrichment failed.""" failed = self.store.get_failed_enrichment_clusters(max_retries=3) if not failed: return self.logger.info("retry: failed_clusters=%d", len(failed)) semaphore = asyncio.Semaphore(llm_concurrency(NEWS_EXTRACT_PROVIDER)) rate = llm_rate_limit(NEWS_EXTRACT_PROVIDER) tasks = [ self._enrich_one(c, str(c.get("topic") or "other"), semaphore, rate) for c in failed ] results = await asyncio.gather(*tasks, return_exceptions=False) by_topic: dict[str, list[dict]] = defaultdict(list) attempted = 0 now_success = 0 still_failed = 0 for c2, was_new in results: if not was_new: continue attempted += 1 # Clear failure marker on success if c2.get("enriched_at") and not c2.get("enrichment_failed_at"): c2.pop("enrichment_failed_at", None) c2.pop("enrichment_retry_count", None) now_success += 1 else: still_failed += 1 t = str(c2.get("topic") or "other").strip().lower() if t not in {x.lower() for x in DEFAULT_TOPICS}: t = "other" by_topic[t].append(c2) for t, group in by_topic.items(): self.store.upsert_clusters(group, topic=t) self.logger.info("retry stored: topic=%s clusters=%d", t, len(group)) if attempted: self.logger.info("retry done: attempted=%d recovered=%d still_failed=%d", attempted, now_success, still_failed) # ------------------------------------------------------------------ # # Phase 8: Feed stats + prune # ------------------------------------------------------------------ # def _save_feed_stats(self, feed_stats: list[FeedStats]) -> None: """Log per-feed statistics. ingested = fetched - duplicate - stale.""" for fs in feed_stats: fs.ingested = max(0, fs.fetched - fs.duplicate - fs.stale) self.logger.info( "feed stats: feed_url=%s fetched=%d duplicate=%d stale=%d ingested=%d", fs.feed_url, fs.fetched, fs.duplicate, fs.stale, fs.ingested, ) def _prune_and_finalize( self, enabled_urls: list[str], feed_map: dict[str, list[dict]], ) -> None: """Run pruning and update feed_state hashes + timestamps.""" prune_result = self.store.prune_if_due( pruning_enabled=NEWS_PRUNING_ENABLED, retention_days=NEWS_RETENTION_DAYS, interval_hours=NEWS_PRUNE_INTERVAL_HOURS, ) # Update feed_state: hash + item_count for feeds that had changes for feed_url, feed_articles in feed_map.items(): material = "\n".join( f"{a.get('title','')}|{a.get('url','')}" for a in feed_articles ) content_hash = hashlib.sha1(material.encode("utf-8")).hexdigest() self.store.set_feed_state(feed_url, content_hash, len(feed_articles)) # Drop legacy aggregate feed-state rows with self.store._conn() as conn: conn.execute("DELETE FROM feed_state WHERE feed_key LIKE 'newsfeeds:%'") self.store.set_meta("last_refresh_at", datetime.now(timezone.utc).isoformat()) self.logger.info("prune: %s", prune_result) # --------------------------------------------------------------------------- # # Compatibility wrapper (used by background loop + tests) # --------------------------------------------------------------------------- # async def refresh_clusters(topic: str | None = None, limit: int = 80) -> None: """Backward-compatible entry point. Delegates to ClusterPoller.""" store = SQLiteClusterStore(DB_PATH) poller = ClusterPoller(store) await poller.poll(topic_filter=topic)