| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563 |
- from __future__ import annotations
- import asyncio
- import hashlib
- import logging
- import sys
- from collections import defaultdict
- from dataclasses import dataclass, field
- from datetime import datetime, timezone, timedelta
- from typing import Any, Dict
- from news_mcp.config import (
- DEFAULT_TOPICS,
- DB_PATH,
- ENRICH_OTHER_TOPICS_ONLY,
- NEWS_EXTRACT_PROVIDER,
- NEWS_FEED_URL,
- NEWS_FEED_URLS,
- NEWS_PRUNE_INTERVAL_HOURS,
- NEWS_PRUNING_ENABLED,
- NEWS_RETENTION_DAYS,
- NEWS_CLUSTER_MAX_AGE_HOURS,
- llm_concurrency,
- llm_rate_limit,
- )
- from news_mcp.dedup.cluster import dedup_and_cluster_articles, _cluster_is_within_age_window, _parse_ts
- from news_mcp.enrichment.enrich import enrich_cluster
- from news_mcp.enrichment.llm_enrich import classify_cluster_llm
- from news_mcp.sources.news_feeds import fetch_news_articles
- from news_mcp.storage.sqlite_store import SQLiteClusterStore
- # --------------------------------------------------------------------------- #
- # Per-feed + per-cycle statistics
- # --------------------------------------------------------------------------- #
- @dataclass
- class FeedStats:
- """Per-feed statistics for one polling cycle."""
- feed_url: str
- fetched: int = 0 # total items fetched from the feed
- duplicate: int = 0 # unchanged hash → skipped entirely
- stale: int = 0 # older than retention window (dropped)
- seen: int = 0 # already in seen_articles table → skipped
- ingested: int = 0 # passed dedup + retention + seen, entered clustering
- enriched: int = 0 # newly LLM-enriched this cycle
- already_enriched: int = 0 # cache hit — already had entities+keywords
- failed: int = 0 # LLM enrichment failed after retries
- @dataclass
- class PollStats:
- """Aggregated statistics for one polling cycle."""
- started_at: str = ""
- feeds: list[FeedStats] = field(default_factory=list)
- total_clusters: int = 0
- total_newly_enriched: int = 0
- total_already_enriched: int = 0
- total_failed: int = 0
- def summary(self) -> dict:
- return {
- "started_at": self.started_at,
- "feeds": [
- {
- "feed_url": f.feed_url,
- "fetched": f.fetched,
- "duplicate": f.duplicate,
- "stale": f.stale,
- "seen": f.seen,
- "ingested": f.ingested,
- }
- for f in self.feeds
- ],
- "total_clusters": self.total_clusters,
- "total_newly_enriched": self.total_newly_enriched,
- "total_already_enriched": self.total_already_enriched,
- "total_failed": self.total_failed,
- }
- # --------------------------------------------------------------------------- #
- # Poller
- # --------------------------------------------------------------------------- #
- class ClusterPoller:
- """One polling cycle: fetch → dedup → cluster → enrich-once → store."""
- MAX_ENRICHMENT_RETRIES = 3
- def __init__(
- self,
- store: SQLiteClusterStore,
- logger: logging.Logger | None = None,
- ):
- self.store = store
- self.logger = logger or logging.getLogger("news_mcp.refresh")
- self.stats = PollStats()
- # ------------------------------------------------------------------ #
- # Public entry point
- # ------------------------------------------------------------------ #
- async def poll(self, topic_filter: str | None = None) -> PollStats:
- """Run one full polling cycle. Returns statistics."""
- self.stats = PollStats(started_at=datetime.now(timezone.utc).isoformat())
- # 1. Load configured + enabled feed URLs
- configured_urls = self._load_feed_urls()
- enabled_urls = self.store.get_enabled_feed_urls(configured_urls)
- self.logger.info("poll start: enabled_feeds=%d configured=%d", len(enabled_urls), len(configured_urls))
- # 2. Fetch articles from enabled feeds only, per-feed dedup
- feed_map, feed_stats = await self._fetch_feeds(enabled_urls)
- # Flatten all fresh articles (stats already tracked per-feed in feed_stats)
- all_fresh = [a for articles in feed_map.values() for a in articles]
- if not all_fresh:
- self.logger.info("poll: no fresh articles from any feed")
- self.stats.feeds = feed_stats
- self._save_feed_stats(feed_stats)
- self._prune_and_finalize(enabled_urls, feed_map)
- return self.stats
- # 3. Retention filter
- articles = self._apply_retention(all_fresh, feed_map)
- if not articles:
- self.logger.info("poll: all %d fresh articles dropped by retention", len(all_fresh))
- self.stats.feeds = feed_stats
- self._save_feed_stats(feed_stats)
- self._prune_and_finalize(enabled_urls, feed_map)
- return self.stats
- # 3b. Seen-articles filter: drop articles whose URL was already
- # processed with the same content. Three outcomes:
- # new → never seen, full clustering + enrichment
- # seen_unchanged → same URL, same content hash → skip entirely
- # seen_changed → same URL, different content → re-cluster to update
- # the existing cluster (triggers re-enrichment)
- new_articles, seen_unchanged, seen_changed = self.store.filter_already_seen(articles)
- changed_cluster_ids: set[str] = set()
- if seen_unchanged or seen_changed:
- for a in seen_unchanged:
- fu = a.get("feed_url", "")
- for fs in feed_stats:
- if fs.feed_url == fu:
- fs.seen += 1
- break
- for a in seen_changed:
- fu = a.get("feed_url", "")
- for fs in feed_stats:
- if fs.feed_url == fu:
- fs.seen += 1 # still "seen" (by URL), but content changed
- break
- self.logger.info(
- "seen_articles: total=%d new=%d unchanged=%d changed=%d",
- len(articles), len(new_articles), len(seen_unchanged), len(seen_changed),
- )
- # Merge changed articles with new ones for clustering
- articles = new_articles + seen_changed
- if not articles:
- self.logger.info("poll: all articles already seen (nothing new to cluster)")
- self.stats.feeds = feed_stats
- self._save_feed_stats(feed_stats)
- self._prune_and_finalize(enabled_urls, feed_map)
- return self.stats
- # 3c. For changed-content articles, clear enriched_at in the cluster
- # payload JSON so the next enrichment cycle re-processes them with
- # the updated article data. (enriched_at is stored inside the JSON
- # payload, not as a separate SQL column.)
- if seen_changed:
- from news_mcp.article_identity import article_key as _ak
- import json as _json
- changed_keys = {_ak(a) for a in seen_changed}
- with self.store._conn() as conn:
- for ak in changed_keys:
- row = conn.execute(
- "SELECT sa.cluster_id, c.payload FROM seen_articles sa "
- "JOIN clusters c ON c.cluster_id = sa.cluster_id "
- "WHERE sa.article_key=?",
- (ak,),
- ).fetchone()
- if row:
- changed_cluster_ids.add(row[0])
- payload = _json.loads(row[1])
- payload.pop("enriched_at", None)
- conn.execute(
- "UPDATE clusters SET payload=? WHERE cluster_id=?",
- (_json.dumps(payload, ensure_ascii=False), row[0]),
- )
- if changed_cluster_ids:
- self.logger.info("content_changed: clusters=%d will re-enrich", len(changed_cluster_ids))
- # 4. Pre-seed existing clusters for cross-cycle merging
- existing_clusters = self._preseed_clusters()
- # 5. Cluster (sync, may do concurrent embeddings internally)
- clustered_by_topic = await self._cluster(articles, existing_clusters)
- # 6. Enrich every cluster that needs it, store immediately
- await self._enrich_all(clustered_by_topic)
- # 7. Retry previously failed enrichments
- await self._retry_failed()
- # 8. Persist feed stats + prune
- self.stats.feeds = feed_stats
- self._save_feed_stats(feed_stats)
- self._prune_and_finalize(enabled_urls, feed_map)
- self.logger.info(
- "poll complete: clusters=%d newly_enriched=%d already_enriched=%d failed=%d",
- self.stats.total_clusters,
- self.stats.total_newly_enriched,
- self.stats.total_already_enriched,
- self.stats.total_failed,
- )
- return self.stats
- # ------------------------------------------------------------------ #
- # Phase 1: Load feed URLs
- # ------------------------------------------------------------------ #
- @staticmethod
- def _load_feed_urls() -> list[str]:
- urls = [u.strip() for u in NEWS_FEED_URLS.split(",") if u.strip()]
- if not urls:
- urls = [NEWS_FEED_URL]
- return urls
- # ------------------------------------------------------------------ #
- # Phase 2: Fetch + per-feed dedup
- # ------------------------------------------------------------------ #
- async def _fetch_feeds(
- self, feed_urls: list[str],
- ) -> tuple[dict[str, list[dict]], list[FeedStats]]:
- """Fetch all feeds concurrently. Returns {feed_url: fresh_articles}
- and per-feed stats. Unchanged feeds (same content hash) are dropped."""
- articles = await fetch_news_articles(limit=9999, url_list=feed_urls)
- # limit=9999 effectively means no per-feed cap — fetches everything
- # the feed gives us. fetch_news_articles applies max(1, limit).
- # Group by feed URL
- per_feed: dict[str, list[dict]] = defaultdict(list)
- for a in articles:
- fu = str(a.get("feed_url") or NEWS_FEED_URL).strip() or NEWS_FEED_URL
- per_feed[fu].append(a)
- # Per-feed content hash dedup
- feed_map: dict[str, list[dict]] = {}
- feed_stats_list: list[FeedStats] = []
- for feed_url in feed_urls:
- feed_articles = per_feed.get(feed_url, [])
- stats = FeedStats(feed_url=feed_url, fetched=len(feed_articles))
- if not feed_articles:
- self.logger.info("feed empty: feed_url=%s", feed_url)
- feed_stats_list.append(stats)
- continue
- material = "\n".join(
- f"{a.get('title','')}|{a.get('url','')}"
- for a in feed_articles
- )
- content_hash = hashlib.sha1(material.encode("utf-8")).hexdigest()
- prev_hash = self.store.get_feed_hash(feed_url)
- if prev_hash == content_hash:
- stats.duplicate = len(feed_articles)
- self.logger.info("feed unchanged: feed_url=%s items=%d", feed_url, len(feed_articles))
- feed_stats_list.append(stats)
- continue
- feed_map[feed_url] = feed_articles
- self.logger.info(
- "feed changed: feed_url=%s items=%d hash_prev=%s hash_now=%s",
- feed_url, len(feed_articles),
- (prev_hash or "-")[:12], content_hash[:12],
- )
- feed_stats_list.append(stats)
- return feed_map, feed_stats_list
- # ------------------------------------------------------------------ #
- # Phase 3: Retention filter
- # ------------------------------------------------------------------ #
- def _apply_retention(
- self, articles: list[dict], feed_map: dict[str, list[dict]],
- ) -> list[dict]:
- """Drop articles older than NEWS_RETENTION_DAYS. Updates FeedStats."""
- if NEWS_RETENTION_DAYS <= 0:
- return articles
- cutoff = datetime.now(timezone.utc) - timedelta(days=NEWS_RETENTION_DAYS)
- # Build a lookup: article_url → feed_url for stats
- article_feed: dict[str, str] = {}
- for fu, arts in feed_map.items():
- for a in arts:
- article_feed[a.get("url", "")] = fu
- fresh = []
- dropped = 0
- for a in articles:
- ts_str = a.get("timestamp", "")
- if not ts_str:
- fresh.append(a)
- continue
- dt = _parse_ts(ts_str)
- if dt is None or dt >= cutoff:
- fresh.append(a)
- else:
- dropped += 1
- fu = article_feed.get(a.get("url", ""), "")
- if fu:
- # Find matching FeedStats and increment stale
- for fs in self.stats.feeds:
- if fs.feed_url == fu:
- fs.stale += 1
- break
- if dropped:
- self.logger.info("retention: dropped=%d remaining=%d retention_days=%.0f", dropped, len(fresh), NEWS_RETENTION_DAYS)
- return fresh
- # ------------------------------------------------------------------ #
- # Phase 4: Pre-seed existing clusters
- # ------------------------------------------------------------------ #
- def _preseed_clusters(self) -> list[dict]:
- """Load recent clusters from DB for cross-cycle article merging."""
- max_age = NEWS_CLUSTER_MAX_AGE_HOURS
- if max_age == 0:
- return []
- lookback = max_age if max_age > 0 else 72
- all_recent = self.store.get_latest_clusters_all_topics(ttl_hours=lookback, limit=500)
- recent = [c for c in all_recent if _cluster_is_within_age_window(c, max_age_hours=max_age)]
- self.logger.info("pre-seeded: existing_clusters=%d max_age_h=%.1f", len(recent), max_age)
- return recent
- # ------------------------------------------------------------------ #
- # Phase 5: Clustering
- # ------------------------------------------------------------------ #
- async def _cluster(
- self, articles: list[dict], existing_clusters: list[dict],
- ) -> dict[str, list[dict]]:
- """Run dedup_and_cluster_articles. Returns {topic: [clusters]}."""
- self.logger.info("clustering: articles=%d existing_clusters=%d", len(articles), len(existing_clusters))
- clustered = await asyncio.to_thread(
- dedup_and_cluster_articles,
- articles,
- None, # default similarity_threshold
- existing_clusters=existing_clusters if existing_clusters else None,
- max_age_hours=NEWS_CLUSTER_MAX_AGE_HOURS,
- )
- self.logger.info("clustered: topics=%s", list(clustered.keys()))
- return clustered
- # ------------------------------------------------------------------ #
- # Phase 6: Enrich + store
- # ------------------------------------------------------------------ #
- async def _enrich_all(self, clustered_by_topic: dict[str, list[dict]]) -> None:
- """Enrich every cluster that needs it and store immediately."""
- semaphore = asyncio.Semaphore(llm_concurrency(NEWS_EXTRACT_PROVIDER))
- rate = llm_rate_limit(NEWS_EXTRACT_PROVIDER)
- self.logger.info(
- "enrich: semaphore_limit=%d rate_limit=%s/s provider=%s",
- llm_concurrency(NEWS_EXTRACT_PROVIDER), rate, NEWS_EXTRACT_PROVIDER,
- )
- # Flatten all clusters into one list with their topics
- all_targets: list[tuple[str, dict]] = []
- for topic, clusters in clustered_by_topic.items():
- for c in clusters:
- all_targets.append((topic, c))
- if not all_targets:
- return
- # Enrich concurrently
- tasks = [
- self._enrich_one(c, topic, semaphore, rate)
- for topic, c in all_targets
- ]
- results = await asyncio.gather(*tasks, return_exceptions=False)
- # Store each cluster individually, grouped by final topic
- by_final_topic: dict[str, list[dict]] = defaultdict(list)
- for c2, was_new in results:
- final_topic = str(c2.get("topic") or "other").strip().lower()
- if final_topic not in {t.lower() for t in DEFAULT_TOPICS}:
- final_topic = "other"
- by_final_topic[final_topic].append(c2)
- self.stats.total_clusters += 1
- if was_new:
- self.stats.total_newly_enriched += 1
- else:
- self.stats.total_already_enriched += 1
- for final_topic, group in by_final_topic.items():
- self.store.upsert_clusters(group, topic=final_topic)
- self.logger.info("stored: topic=%s clusters=%d", final_topic, len(group))
- async def _enrich_one(
- self,
- cluster: dict,
- topic: str,
- semaphore: asyncio.Semaphore,
- rate: float,
- ) -> tuple[dict, bool]:
- """Enrich a single cluster. Returns (cluster, was_newly_enriched).
- If the cluster already has entities AND keywords, skip LLM entirely.
- The data on the dict IS the cache — no timestamp or DB lookup needed.
- """
- c2 = enrich_cluster(cluster)
- c2.setdefault("topic", topic)
- llm_enabled = (not ENRICH_OTHER_TOPICS_ONLY) or (topic == "other")
- cluster_id = c2.get("cluster_id")
- if not llm_enabled or not cluster_id:
- return c2, False
- # Cache check: entities + keywords already present → skip
- # Exception: enriched_at missing means content changed → force re-enrich
- if c2.get("enriched_at") and (c2.get("entities") or []) and (c2.get("keywords") or []):
- self.logger.debug("enrich skip (cached): cluster=%s topic=%s", cluster_id, topic)
- return c2, False
- # Actually call the LLM
- last_err = ""
- for attempt in range(1 + self.MAX_ENRICHMENT_RETRIES):
- if attempt > 0:
- backoff = 2 ** attempt
- self.logger.info("retry: cluster=%s attempt=%d backoff=%.0fs", cluster_id, attempt, backoff)
- await asyncio.sleep(backoff)
- try:
- async with semaphore:
- c2 = await classify_cluster_llm(dict(c2))
- c2["enriched_at"] = datetime.now(timezone.utc).isoformat()
- return c2, True
- except Exception:
- last_err = str(sys.exc_info()[1])[:200] if sys.exc_info()[1] else "unknown"
- self.logger.warning(
- "enrich failed: cluster=%s attempt=%d err=%s",
- cluster_id, attempt, last_err,
- )
- # All retries exhausted
- prev_count = c2.get("enrichment_retry_count", 0)
- c2["enrichment_failed_at"] = datetime.now(timezone.utc).isoformat()
- c2["enrichment_retry_count"] = prev_count + 1
- self.logger.error("enrich exhausted: cluster=%s after %d retries", cluster_id, self.MAX_ENRICHMENT_RETRIES)
- self.stats.total_failed += 1
- return c2, True # was "newly" enriched (attempted), but failed
- # ------------------------------------------------------------------ #
- # Phase 7: Retry failed enrichments
- # ------------------------------------------------------------------ #
- async def _retry_failed(self) -> None:
- """Retry clusters whose previous enrichment failed."""
- failed = self.store.get_failed_enrichment_clusters(max_retries=3)
- if not failed:
- return
- self.logger.info("retry: failed_clusters=%d", len(failed))
- semaphore = asyncio.Semaphore(llm_concurrency(NEWS_EXTRACT_PROVIDER))
- rate = llm_rate_limit(NEWS_EXTRACT_PROVIDER)
- tasks = [
- self._enrich_one(c, str(c.get("topic") or "other"), semaphore, rate)
- for c in failed
- ]
- results = await asyncio.gather(*tasks, return_exceptions=False)
- by_topic: dict[str, list[dict]] = defaultdict(list)
- attempted = 0
- now_success = 0
- still_failed = 0
- for c2, was_new in results:
- if not was_new:
- continue
- attempted += 1
- # Clear failure marker on success
- if c2.get("enriched_at") and not c2.get("enrichment_failed_at"):
- c2.pop("enrichment_failed_at", None)
- c2.pop("enrichment_retry_count", None)
- now_success += 1
- else:
- still_failed += 1
- t = str(c2.get("topic") or "other").strip().lower()
- if t not in {x.lower() for x in DEFAULT_TOPICS}:
- t = "other"
- by_topic[t].append(c2)
- for t, group in by_topic.items():
- self.store.upsert_clusters(group, topic=t)
- self.logger.info("retry stored: topic=%s clusters=%d", t, len(group))
- if attempted:
- self.logger.info("retry done: attempted=%d recovered=%d still_failed=%d", attempted, now_success, still_failed)
- # ------------------------------------------------------------------ #
- # Phase 8: Feed stats + prune
- # ------------------------------------------------------------------ #
- def _save_feed_stats(self, feed_stats: list[FeedStats]) -> None:
- """Log per-feed statistics. ingested = fetched - duplicate - stale - seen."""
- for fs in feed_stats:
- fs.ingested = max(0, fs.fetched - fs.duplicate - fs.stale - fs.seen)
- self.logger.info(
- "feed stats: feed_url=%s fetched=%d duplicate=%d stale=%d seen=%d ingested=%d",
- fs.feed_url, fs.fetched, fs.duplicate, fs.stale, fs.seen, fs.ingested,
- )
- def _prune_and_finalize(
- self,
- enabled_urls: list[str],
- feed_map: dict[str, list[dict]],
- ) -> None:
- """Run pruning and update feed_state hashes + timestamps."""
- prune_result = self.store.prune_if_due(
- pruning_enabled=NEWS_PRUNING_ENABLED,
- retention_days=NEWS_RETENTION_DAYS,
- interval_hours=NEWS_PRUNE_INTERVAL_HOURS,
- )
- # Update feed_state: hash + item_count for feeds that had changes
- for feed_url, feed_articles in feed_map.items():
- material = "\n".join(
- f"{a.get('title','')}|{a.get('url','')}"
- for a in feed_articles
- )
- content_hash = hashlib.sha1(material.encode("utf-8")).hexdigest()
- self.store.set_feed_state(feed_url, content_hash, len(feed_articles))
- # Drop legacy aggregate feed-state rows
- with self.store._conn() as conn:
- conn.execute("DELETE FROM feed_state WHERE feed_key LIKE 'newsfeeds:%'")
- self.store.set_meta("last_refresh_at", datetime.now(timezone.utc).isoformat())
- self.logger.info("prune: %s", prune_result)
- # --------------------------------------------------------------------------- #
- # Compatibility wrapper (used by background loop + tests)
- # --------------------------------------------------------------------------- #
- async def refresh_clusters(topic: str | None = None, limit: int = 80) -> None:
- """Backward-compatible entry point. Delegates to ClusterPoller."""
- store = SQLiteClusterStore(DB_PATH)
- poller = ClusterPoller(store)
- await poller.poll(topic_filter=topic)
|