from __future__ import annotations import asyncio import hashlib import logging import sys from collections import defaultdict from datetime import datetime, timezone, timedelta from typing import Any, Dict from news_mcp.config import ( DEFAULT_LOOKBACK_HOURS, DEFAULT_TOPICS, DB_PATH, ENRICH_OTHER_TOPICS_ONLY, ENRICHMENT_MAX_PER_REFRESH, NEWS_EXTRACT_PROVIDER, NEWS_FEED_URL, NEWS_FEED_URLS, NEWS_PRUNE_INTERVAL_HOURS, NEWS_PRUNING_ENABLED, NEWS_RETENTION_DAYS, NEWS_CLUSTER_MAX_AGE_HOURS, llm_concurrency, ) from news_mcp.dedup.cluster import dedup_and_cluster_articles, _cluster_is_within_age_window, _parse_ts from news_mcp.enrichment.enrich import enrich_cluster from news_mcp.enrichment.llm_enrich import classify_cluster_llm from news_mcp.sources.news_feeds import fetch_news_articles from news_mcp.storage.sqlite_store import SQLiteClusterStore def _load_feed_urls() -> list[str]: """Return the configured feed URLs from environment (unsorted).""" urls = [u.strip() for u in NEWS_FEED_URLS.split(",") if u.strip()] if not urls: urls = [NEWS_FEED_URL] return urls MAX_ENRICHMENT_RETRIES = 3 # per-cluster retries before giving up for this cycle async def _enrich_single_cluster( c: dict, topic: str, llm_enabled: bool, semaphore: asyncio.Semaphore, store: SQLiteClusterStore, logger: logging.Logger, ) -> dict: """Enrich one cluster: heuristic + optional LLM extraction, concurrency-limited. Rule: if the cluster already has entities AND keywords (from a previous enrichment), skip the LLM call entirely. The data on the dict IS the cache — no need to look up enriched_at timestamps or query the DB by cluster_id. This works regardless of whether cluster_id changed due to article merging across polling cycles. On LLM failure the cluster is retried up to MAX_ENRICHMENT_RETRIES times with exponential backoff. If all retries are exhausted the cluster is marked with enrichment_failed_at and enrichment_retry_count so the next polling cycle can re-attempt it. """ c2 = enrich_cluster(c) c2.setdefault("topic", topic) cluster_id = c2.get("cluster_id") if llm_enabled and cluster_id: # --- Cache check: if the cluster already has entities AND keywords, # it was enriched in a previous cycle. Skip LLM entirely. _existing_entities = c2.get("entities") or [] _existing_keywords = c2.get("keywords") or [] if _existing_entities and _existing_keywords: logger.debug("enrich skip (already enriched) cluster=%s topic=%s", cluster_id, topic) return c2 # --- Actually call the LLM --- last_err = "" for attempt in range(1 + MAX_ENRICHMENT_RETRIES): if attempt > 0: backoff = 2 ** attempt logger.info( "retry cluster=%s topic=%s attempt=%d/%d backoff=%.0fs", cluster_id, topic, attempt, MAX_ENRICHMENT_RETRIES, backoff, ) await asyncio.sleep(backoff) try: async with semaphore: c2 = await classify_cluster_llm(dict(c2)) c2["enriched_at"] = datetime.now(timezone.utc).isoformat() break # success except Exception: last_err = str(sys.exc_info()[1])[:200] if sys.exc_info()[1] else "unknown" logger.warning( "LLM enrichment failed cluster=%s topic=%s attempt=%d/%d err=%s", cluster_id, topic, attempt, MAX_ENRICHMENT_RETRIES, last_err, ) else: # Loop completed without break = all retries exhausted prev_count = c2.get("enrichment_retry_count", 0) c2["enrichment_failed_at"] = datetime.now(timezone.utc).isoformat() c2["enrichment_retry_count"] = prev_count + 1 logger.error( "LLM enrichment exhausted cluster=%s topic=%s after %d retries", cluster_id, topic, MAX_ENRICHMENT_RETRIES, ) return c2 async def _enrich_topic_clusters( clusters: list[dict], topic: str, semaphore: asyncio.Semaphore, store: SQLiteClusterStore, logger: logging.Logger, enrich_limit: int, ) -> list[dict]: """Enrich all clusters for a single topic concurrently.""" llm_enabled = (not ENRICH_OTHER_TOPICS_ONLY) or (topic == "other") # Persist the raw clusters first so a slow enrichment pass does not # leave the first bootstrap run with nothing stored. store.upsert_clusters(clusters, topic=topic) logger.info("refresh stored raw topic=%s clusters=%s", topic, len(clusters)) targets = clusters[:enrich_limit] tasks = [ _enrich_single_cluster(c, topic, llm_enabled, semaphore, store, logger) for c in targets ] enriched = await asyncio.gather(*tasks, return_exceptions=False) # Any clusters beyond enrich_limit still need importance enrichment for c in clusters[enrich_limit:]: c2 = enrich_cluster(c) c2.setdefault("topic", topic) enriched.append(c2) logger.info("refresh enriched topic=%s clusters=%s", topic, len(enriched)) return enriched def _cluster_age_ok(cluster: dict, max_age_hours: float) -> bool: """Deprecated alias — use _cluster_is_within_age_window from cluster.py.""" return _cluster_is_within_age_window(cluster, max_age_hours=max_age_hours) async def refresh_clusters(topic: str | None = None, limit: int = 80) -> None: logger = logging.getLogger("news_mcp.refresh") store = SQLiteClusterStore(DB_PATH) logger.info("refresh start topic=%s limit=%s", topic, limit) # Get enabled feed URLs from store (seeds new ones as enabled by default). configured_urls = _load_feed_urls() enabled_urls = store.get_enabled_feed_urls(configured_urls) logger.info("refresh enabled feeds=%d / configured=%d", len(enabled_urls), len(configured_urls)) # fetch_news_articles is now fully async (concurrent RSS fetching) articles = await fetch_news_articles(limit, url_list=enabled_urls) logger.info("refresh fetched articles=%s", len(articles)) # Drop legacy aggregate feed-state rows so the dashboard only reflects # real per-feed poll status from this point forward. with store._conn() as conn: conn.execute("DELETE FROM feed_state WHERE feed_key LIKE 'newsfeeds:%'") # Track feed freshness per RSS URL so unchanged feeds can be skipped. per_feed: dict[str, list[dict[str, Any]]] = defaultdict(list) for article in articles: feed_url = str(article.get("feed_url") or NEWS_FEED_URL).strip() or NEWS_FEED_URL per_feed[feed_url].append(article) changed_articles: list[dict[str, Any]] = [] changed_feed_urls: list[str] = [] for feed_url, feed_articles in per_feed.items(): logger.info("refresh feed batch start feed_url=%s count=%s", feed_url, len(feed_articles)) material = "\n".join( f"{a.get('title','')}|{a.get('url','')}|{a.get('timestamp','')}" for a in feed_articles ) last_hash = hashlib.sha1(material.encode("utf-8")).hexdigest() feed_key = feed_url prev_hash = store.get_feed_hash(feed_key) if prev_hash == last_hash: logger.info("refresh unchanged feed_url=%s count=%s topic=%s", feed_url, len(feed_articles), topic) else: logger.info("refresh changed feed_url=%s count=%s topic=%s", feed_url, len(feed_articles), topic) changed_feed_urls.append(feed_url) changed_articles.extend(feed_articles) logger.info("refresh feed batch complete feed_url=%s changed_total=%s", feed_url, len(changed_articles)) if not changed_articles: logger.info("refresh unchanged all feeds topic=%s", topic) store.set_meta("last_refresh_at", datetime.now(timezone.utc).isoformat()) prune_result = store.prune_if_due( pruning_enabled=NEWS_PRUNING_ENABLED, retention_days=NEWS_RETENTION_DAYS, interval_hours=NEWS_PRUNE_INTERVAL_HOURS, ) logger.info("refresh prune_result=%s", prune_result) return articles = changed_articles # Pre-filter: drop articles whose RSS timestamp is older than retention. # This prevents stale feed items from being re-ingested after pruning. if NEWS_RETENTION_DAYS > 0: retention_cutoff = datetime.now(timezone.utc) - timedelta(days=NEWS_RETENTION_DAYS) fresh_articles = [] for a in articles: ts_str = a.get("timestamp", "") if not ts_str: fresh_articles.append(a) continue dt = _parse_ts(ts_str) if dt is None: fresh_articles.append(a) continue if dt >= retention_cutoff: fresh_articles.append(a) else: logger.debug("drop stale article title=%s ts=%s", a.get("title", "")[:60], ts_str) dropped = len(articles) - len(fresh_articles) if dropped: logger.info("refresh retention-filter dropped=%d remaining=%d retention_days=%.0f", dropped, len(fresh_articles), NEWS_RETENTION_DAYS) articles = fresh_articles if not articles: logger.info("refresh no articles after retention filter topic=%s", topic) store.set_meta("last_refresh_at", datetime.now(timezone.utc).isoformat()) prune_result = store.prune_if_due( pruning_enabled=NEWS_PRUNING_ENABLED, retention_days=NEWS_RETENTION_DAYS, interval_hours=NEWS_PRUNE_INTERVAL_HOURS, ) logger.info("refresh prune_result=%s", prune_result) return logger.info("refresh clustering start articles=%s topic=%s", len(articles), topic) # Pre-seed with recent clusters from the DB so new articles can merge # into existing clusters across polling cycles. max_age = NEWS_CLUSTER_MAX_AGE_HOURS recent_clusters: list[dict] = [] if max_age != 0: lookback = max_age if max_age > 0 else 72 all_recent = store.get_latest_clusters_all_topics( ttl_hours=lookback, limit=500, ) recent_clusters = [c for c in all_recent if _cluster_age_ok(c, max_age)] logger.info( "refresh pre-seeded existing_clusters=%s max_age_h=%s", len(recent_clusters), max_age, ) # Clustering is sync but may do concurrent embedding fetches internally. # Run off-thread so the event loop stays responsive for MCP tool calls. clustered_by_topic = await asyncio.to_thread( dedup_and_cluster_articles, articles, None, # use default similarity_threshold existing_clusters=recent_clusters if recent_clusters else None, max_age_hours=max_age, ) logger.info("refresh clustered topics=%s", list(clustered_by_topic.keys())) # Build LLM concurrency semaphore from the extract provider's config. max_llm_concurrent = llm_concurrency(NEWS_EXTRACT_PROVIDER) llm_semaphore = asyncio.Semaphore(max_llm_concurrent) logger.info("refresh llm semaphore limit=%s provider=%s", max_llm_concurrent, NEWS_EXTRACT_PROVIDER) from news_mcp.config import llm_rate_limit as _rl _rate = _rl(NEWS_EXTRACT_PROVIDER) logger.info("refresh llm rate-limit=%s/s provider=%s", _rate, NEWS_EXTRACT_PROVIDER) # Enrich each topic's clusters concurrently. topic_tasks = [] for t, clusters in clustered_by_topic.items(): if topic and t != topic: continue # Determine how many clusters to LLM-enrich. # ENRICHMENT_MAX_PER_REFRESH=0 means enrich every cluster (no cap). enrich_limit = ENRICHMENT_MAX_PER_REFRESH or len(clusters) topic_tasks.append( _enrich_topic_clusters( clusters=clusters, topic=t, semaphore=llm_semaphore, store=store, logger=logger, enrich_limit=enrich_limit, ) ) # Run all topic enrichment phases concurrently topic_results = await asyncio.gather(*topic_tasks, return_exceptions=False) # Persist enriched clusters grouped by their final topic for enriched in topic_results: by_final_topic: Dict[str, list] = {} for c2 in enriched: final_topic = str(c2.get("topic") or "other").strip().lower() if final_topic not in {x.lower() for x in DEFAULT_TOPICS}: final_topic = "other" by_final_topic.setdefault(final_topic, []).append(c2) for final_topic, group in by_final_topic.items(): store.upsert_clusters(group, topic=final_topic) logger.info("refresh stored topic=%s clusters=%s", final_topic, len(group)) # Retry previously failed enrichments failed_clusters = store.get_failed_enrichment_clusters(max_retries=3) if failed_clusters: logger.info("retry enrich failed clusters count=%d", len(failed_clusters)) retry_tasks = [ _enrich_single_cluster( c, str(c.get("topic") or "other"), True, llm_semaphore, store, logger, ) for c in failed_clusters ] retry_results = await asyncio.gather(*retry_tasks, return_exceptions=False) # Persist retried results by_topic_retry: Dict[str, list] = {} for c2 in retry_results: # Clear stale failure marker on success if not c2.get("enrichment_failed_at") or c2.get("entities"): c2.pop("enrichment_failed_at", None) c2.pop("enrichment_retry_count", None) t = str(c2.get("topic") or "other").strip().lower() if t not in {x.lower() for x in DEFAULT_TOPICS}: t = "other" by_topic_retry.setdefault(t, []).append(c2) for t, group in by_topic_retry.items(): store.upsert_clusters(group, topic=t) logger.info("retry stored topic=%s clusters=%s", t, len(group)) prune_result = store.prune_if_due( pruning_enabled=NEWS_PRUNING_ENABLED, retention_days=NEWS_RETENTION_DAYS, interval_hours=NEWS_PRUNE_INTERVAL_HOURS, ) for feed_url in changed_feed_urls: feed_articles = per_feed[feed_url] material = "\n".join( f"{a.get('title','')}|{a.get('url','')}|{a.get('timestamp','')}" for a in feed_articles ) last_hash = hashlib.sha1(material.encode("utf-8")).hexdigest() store.set_feed_state(feed_url, last_hash, len(feed_articles)) store.set_meta("last_refresh_at", datetime.now(timezone.utc).isoformat()) logger.info("refresh prune_result=%s", prune_result)