from __future__ import annotations import asyncio import hashlib import logging import sys from collections import defaultdict from datetime import datetime, timezone, timedelta from typing import Any, Dict from news_mcp.config import ( DEFAULT_LOOKBACK_HOURS, DEFAULT_TOPICS, DB_PATH, ENRICH_OTHER_TOPICS_ONLY, ENRICHMENT_MAX_PER_REFRESH, NEWS_EXTRACT_PROVIDER, NEWS_FEED_URL, NEWS_FEED_URLS, NEWS_PRUNE_INTERVAL_HOURS, NEWS_PRUNING_ENABLED, NEWS_RETENTION_DAYS, NEWS_CLUSTER_MAX_AGE_HOURS, llm_concurrency, ) from news_mcp.dedup.cluster import dedup_and_cluster_articles from news_mcp.enrichment.enrich import enrich_cluster from news_mcp.enrichment.llm_enrich import classify_cluster_llm from news_mcp.sources.news_feeds import fetch_news_articles from news_mcp.storage.sqlite_store import SQLiteClusterStore from news_mcp.trends_resolution import resolve_entity_via_trends def _load_feed_urls() -> list[str]: """Return the configured feed URLs from environment (unsorted).""" urls = [u.strip() for u in NEWS_FEED_URLS.split(",") if u.strip()] if not urls: urls = [NEWS_FEED_URL] return urls MAX_ENRICHMENT_RETRIES = 3 # per-cluster retries before giving up for this cycle async def _enrich_single_cluster( c: dict, topic: str, llm_enabled: bool, semaphore: asyncio.Semaphore, store: SQLiteClusterStore, logger: logging.Logger, ) -> dict: """Enrich one cluster: heuristic + optional LLM extraction, concurrency-limited. On LLM failure the cluster is retried up to MAX_ENRICHMENT_RETRIES times with exponential backoff. If all retries are exhausted the cluster is marked with enrichment_failed_at and enrichment_retry_count so the next polling cycle can re-attempt it. """ c2 = enrich_cluster(c) c2.setdefault("topic", topic) cluster_id = c2.get("cluster_id") if llm_enabled and cluster_id: # Cache: if we already have entities/sentiment for this cluster, skip LLM call. existing = store.get_cluster_by_id(cluster_id) if existing and existing.get("entities"): c2 = dict(c2) c2["entities"] = existing.get("entities", []) existing_resolutions = existing.get("entityResolutions", None) if isinstance(existing_resolutions, list) and existing_resolutions: c2["entityResolutions"] = existing_resolutions else: c2["entityResolutions"] = [resolve_entity_via_trends(e) for e in c2["entities"]] if existing.get("sentiment"): c2["sentiment"] = existing.get("sentiment") if existing.get("sentimentScore") is not None: c2["sentimentScore"] = existing.get("sentimentScore") if existing.get("keywords"): c2["keywords"] = existing.get("keywords") if existing.get("topic"): c2["topic"] = existing.get("topic") else: # Retry loop with exponential backoff | semaphore held per-attempt last_err = "" for attempt in range(1 + MAX_ENRICHMENT_RETRIES): if attempt > 0: backoff = 2 ** attempt logger.info( "retry cluster=%s topic=%s attempt=%d/%d backoff=%.0fs", cluster_id, topic, attempt, MAX_ENRICHMENT_RETRIES, backoff, ) await asyncio.sleep(backoff) try: async with semaphore: c2 = await classify_cluster_llm(dict(c2)) break # success except Exception: last_err = str(sys.exc_info()[1])[:200] if sys.exc_info()[1] else "unknown" logger.warning( "LLM enrichment failed cluster=%s topic=%s attempt=%d/%d err=%s", cluster_id, topic, attempt, MAX_ENRICHMENT_RETRIES, last_err, ) else: # Loop completed without break = all retries exhausted prev_count = c2.get("enrichment_retry_count", 0) c2["enrichment_failed_at"] = datetime.now(timezone.utc).isoformat() c2["enrichment_retry_count"] = prev_count + 1 logger.error( "LLM enrichment exhausted cluster=%s topic=%s after %d retries", cluster_id, topic, MAX_ENRICHMENT_RETRIES, ) return c2 async def _enrich_topic_clusters( clusters: list[dict], topic: str, semaphore: asyncio.Semaphore, store: SQLiteClusterStore, logger: logging.Logger, enrich_limit: int, ) -> list[dict]: """Enrich all clusters for a single topic concurrently.""" llm_enabled = (not ENRICH_OTHER_TOPICS_ONLY) or (topic == "other") # Persist the raw clusters first so a slow enrichment pass does not # leave the first bootstrap run with nothing stored. store.upsert_clusters(clusters, topic=topic) logger.info("refresh stored raw topic=%s clusters=%s", topic, len(clusters)) targets = clusters[:enrich_limit] tasks = [ _enrich_single_cluster(c, topic, llm_enabled, semaphore, store, logger) for c in targets ] enriched = await asyncio.gather(*tasks, return_exceptions=False) # Any clusters beyond enrich_limit still need importance enrichment for c in clusters[enrich_limit:]: c2 = enrich_cluster(c) c2.setdefault("topic", topic) enriched.append(c2) logger.info("refresh enriched topic=%s clusters=%s", topic, len(enriched)) return enriched def _cluster_age_ok(cluster: dict, max_age_hours: float) -> bool: """Check whether a cluster's last_updated is within the merge window.""" if max_age_hours <= 0: return True ts_str = cluster.get("last_updated") or cluster.get("timestamp") or "" if not ts_str: return True try: s = str(ts_str).replace("Z", "+00:00") dt = datetime.fromisoformat(s) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) dt = dt.astimezone(timezone.utc) except Exception: try: from email.utils import parsedate_to_datetime dt = parsedate_to_datetime(str(ts_str)) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) dt = dt.astimezone(timezone.utc) except Exception: return True cutoff = datetime.now(timezone.utc) - timedelta(hours=max_age_hours) return dt >= cutoff async def refresh_clusters(topic: str | None = None, limit: int = 80) -> None: logger = logging.getLogger("news_mcp.refresh") store = SQLiteClusterStore(DB_PATH) logger.info("refresh start topic=%s limit=%s", topic, limit) # Get enabled feed URLs from store (seeds new ones as enabled by default). configured_urls = _load_feed_urls() enabled_urls = store.get_enabled_feed_urls(configured_urls) logger.info("refresh enabled feeds=%d / configured=%d", len(enabled_urls), len(configured_urls)) # fetch_news_articles is now fully async (concurrent RSS fetching) articles = await fetch_news_articles(limit, url_list=enabled_urls) logger.info("refresh fetched articles=%s", len(articles)) # Drop legacy aggregate feed-state rows so the dashboard only reflects # real per-feed poll status from this point forward. with store._conn() as conn: conn.execute("DELETE FROM feed_state WHERE feed_key LIKE 'newsfeeds:%'") # Track feed freshness per RSS URL so unchanged feeds can be skipped. per_feed: dict[str, list[dict[str, Any]]] = defaultdict(list) for article in articles: feed_url = str(article.get("feed_url") or NEWS_FEED_URL).strip() or NEWS_FEED_URL per_feed[feed_url].append(article) changed_articles: list[dict[str, Any]] = [] changed_feed_urls: list[str] = [] for feed_url, feed_articles in per_feed.items(): logger.info("refresh feed batch start feed_url=%s count=%s", feed_url, len(feed_articles)) material = "\n".join( f"{a.get('title','')}|{a.get('url','')}|{a.get('timestamp','')}" for a in feed_articles ) last_hash = hashlib.sha1(material.encode("utf-8")).hexdigest() feed_key = feed_url prev_hash = store.get_feed_hash(feed_key) if prev_hash == last_hash: logger.info("refresh unchanged feed_url=%s count=%s topic=%s", feed_url, len(feed_articles), topic) else: logger.info("refresh changed feed_url=%s count=%s topic=%s", feed_url, len(feed_articles), topic) changed_feed_urls.append(feed_url) changed_articles.extend(feed_articles) logger.info("refresh feed batch complete feed_url=%s changed_total=%s", feed_url, len(changed_articles)) if not changed_articles: logger.info("refresh unchanged all feeds topic=%s", topic) store.set_meta("last_refresh_at", datetime.now(timezone.utc).isoformat()) prune_result = store.prune_if_due( pruning_enabled=NEWS_PRUNING_ENABLED, retention_days=NEWS_RETENTION_DAYS, interval_hours=NEWS_PRUNE_INTERVAL_HOURS, ) logger.info("refresh prune_result=%s", prune_result) return articles = changed_articles logger.info("refresh clustering start articles=%s topic=%s", len(articles), topic) # Pre-seed with recent clusters from the DB so new articles can merge # into existing clusters across polling cycles. max_age = NEWS_CLUSTER_MAX_AGE_HOURS recent_clusters: list[dict] = [] if max_age != 0: lookback = max_age if max_age > 0 else 72 all_recent = store.get_latest_clusters_all_topics( ttl_hours=lookback, limit=500, ) recent_clusters = [c for c in all_recent if _cluster_age_ok(c, max_age)] logger.info( "refresh pre-seeded existing_clusters=%s max_age_h=%s", len(recent_clusters), max_age, ) # Clustering is sync but may do concurrent embedding fetches internally. # Run off-thread so the event loop stays responsive for MCP tool calls. clustered_by_topic = await asyncio.to_thread( dedup_and_cluster_articles, articles, None, # use default similarity_threshold existing_clusters=recent_clusters if recent_clusters else None, max_age_hours=max_age, ) logger.info("refresh clustered topics=%s", list(clustered_by_topic.keys())) # Build LLM concurrency semaphore from the extract provider's config. max_llm_concurrent = llm_concurrency(NEWS_EXTRACT_PROVIDER) llm_semaphore = asyncio.Semaphore(max_llm_concurrent) logger.info("refresh llm semaphore limit=%s provider=%s", max_llm_concurrent, NEWS_EXTRACT_PROVIDER) # Enrich each topic's clusters concurrently. topic_tasks = [] for t, clusters in clustered_by_topic.items(): if topic and t != topic: continue # Determine how many clusters to LLM-enrich. # ENRICHMENT_MAX_PER_REFRESH=0 means enrich every cluster (no cap). enrich_limit = ENRICHMENT_MAX_PER_REFRESH or len(clusters) topic_tasks.append( _enrich_topic_clusters( clusters=clusters, topic=t, semaphore=llm_semaphore, store=store, logger=logger, enrich_limit=enrich_limit, ) ) # Run all topic enrichment phases concurrently topic_results = await asyncio.gather(*topic_tasks, return_exceptions=False) # Persist enriched clusters grouped by their final topic for enriched in topic_results: by_final_topic: Dict[str, list] = {} for c2 in enriched: final_topic = str(c2.get("topic") or "other").strip().lower() if final_topic not in {x.lower() for x in DEFAULT_TOPICS}: final_topic = "other" by_final_topic.setdefault(final_topic, []).append(c2) for final_topic, group in by_final_topic.items(): store.upsert_clusters(group, topic=final_topic) logger.info("refresh stored topic=%s clusters=%s", final_topic, len(group)) # Retry previously failed enrichments failed_clusters = store.get_failed_enrichment_clusters(max_retries=3) if failed_clusters: logger.info("retry enrich failed clusters count=%d", len(failed_clusters)) retry_tasks = [ _enrich_single_cluster( c, str(c.get("topic") or "other"), True, llm_semaphore, store, logger ) for c in failed_clusters ] retry_results = await asyncio.gather(*retry_tasks, return_exceptions=False) # Persist retried results by_topic_retry: Dict[str, list] = {} for c2 in retry_results: # Clear stale failure marker on success if not c2.get("enrichment_failed_at") or c2.get("entities"): c2.pop("enrichment_failed_at", None) c2.pop("enrichment_retry_count", None) t = str(c2.get("topic") or "other").strip().lower() if t not in {x.lower() for x in DEFAULT_TOPICS}: t = "other" by_topic_retry.setdefault(t, []).append(c2) for t, group in by_topic_retry.items(): store.upsert_clusters(group, topic=t) logger.info("retry stored topic=%s clusters=%s", t, len(group)) prune_result = store.prune_if_due( pruning_enabled=NEWS_PRUNING_ENABLED, retention_days=NEWS_RETENTION_DAYS, interval_hours=NEWS_PRUNE_INTERVAL_HOURS, ) for feed_url in changed_feed_urls: feed_articles = per_feed[feed_url] material = "\n".join( f"{a.get('title','')}|{a.get('url','')}|{a.get('timestamp','')}" for a in feed_articles ) last_hash = hashlib.sha1(material.encode("utf-8")).hexdigest() store.set_feed_state(feed_url, last_hash, len(feed_articles)) store.set_meta("last_refresh_at", datetime.now(timezone.utc).isoformat()) logger.info("refresh prune_result=%s", prune_result)