from __future__ import annotations import asyncio import logging from datetime import datetime, timezone from typing import Any, Dict from news_mcp.config import DEFAULT_LOOKBACK_HOURS, DEFAULT_TOPICS, DB_PATH, NEWS_FEED_URL, NEWS_FEED_URLS from news_mcp.dedup.cluster import dedup_and_cluster_articles from news_mcp.enrichment.enrich import enrich_cluster from news_mcp.enrichment.llm_enrich import classify_cluster_llm from news_mcp.trends_resolution import resolve_entity_via_trends from news_mcp.sources.news_feeds import fetch_news_articles from news_mcp.storage.sqlite_store import SQLiteClusterStore from news_mcp.config import ( ENRICH_OTHER_TOPICS_ONLY, ENRICHMENT_MAX_PER_REFRESH, NEWS_PRUNE_INTERVAL_HOURS, NEWS_PRUNING_ENABLED, NEWS_RETENTION_DAYS, ) async def refresh_clusters(topic: str | None = None, limit: int = 80) -> None: logger = logging.getLogger("news_mcp.refresh") store = SQLiteClusterStore(DB_PATH) logger.info("refresh start topic=%s limit=%s", topic, limit) articles = await asyncio.to_thread(fetch_news_articles, limit) logger.info("refresh fetched articles=%s", len(articles)) # Skip expensive work if the feed content (titles/urls/timestamps) didn't change. import hashlib rss_urls = [u.strip() for u in NEWS_FEED_URLS.split(",") if u.strip()] if not rss_urls: rss_urls = [NEWS_FEED_URL] feed_key = "newsfeeds:" + hashlib.sha1(",".join(rss_urls).encode("utf-8")).hexdigest() material = "\n".join( f"{a.get('title','')}|{a.get('url','')}|{a.get('timestamp','')}" for a in articles ) last_hash = hashlib.sha1(material.encode("utf-8")).hexdigest() prev_hash = store.get_feed_hash(feed_key) if prev_hash == last_hash: logger.info("refresh unchanged feed_key=%s topic=%s", feed_key, topic) store.set_meta("last_refresh_at", datetime.now(timezone.utc).isoformat()) prune_result = store.prune_if_due( pruning_enabled=NEWS_PRUNING_ENABLED, retention_days=NEWS_RETENTION_DAYS, interval_hours=NEWS_PRUNE_INTERVAL_HOURS, ) logger.info("refresh prune_result=%s", prune_result) return else: logger.info("refresh changed feed_key=%s topic=%s", feed_key, topic) store.set_feed_hash(feed_key, last_hash) clustered_by_topic = dedup_and_cluster_articles(articles) logger.info("refresh clustered topics=%s", list(clustered_by_topic.keys())) for t, clusters in clustered_by_topic.items(): if topic and t != topic: continue enriched = [] # Determine how many clusters to LLM-enrich. # ENRICHMENT_MAX_PER_REFRESH=0 means enrich every cluster (no cap). enrich_limit = ENRICHMENT_MAX_PER_REFRESH or len(clusters) # Track whether the LLM pipeline is available for this topic. _llm_enabled_for_topic = ( (not ENRICH_OTHER_TOPICS_ONLY) or (t == "other") ) for idx, c in enumerate(clusters[:enrich_limit]): c2 = enrich_cluster(c) # Seed the heuristic topic on the payload so classify_cluster_llm # has a sane fallback if the LLM omits or hallucinates one. c2.setdefault("topic", t) if _llm_enabled_for_topic: # Cache: if we already have entities/sentiment for this cluster, skip LLM call. existing = store.get_cluster_by_id(c2.get("cluster_id")) if existing and existing.get("entities"): c2 = dict(c2) # Keep existing enriched fields. c2["entities"] = existing.get("entities", []) # IMPORTANT: entityResolutions must stay consistent with entities. # Older rows may have entities but missing/malformed resolutions. existing_resolutions = existing.get("entityResolutions", None) if isinstance(existing_resolutions, list) and existing_resolutions: c2["entityResolutions"] = existing_resolutions else: # Recompute resolutions deterministically from the stored entities. c2["entityResolutions"] = [resolve_entity_via_trends(e) for e in c2["entities"]] if existing.get("sentiment"): c2["sentiment"] = existing.get("sentiment") if existing.get("sentimentScore") is not None: c2["sentimentScore"] = existing.get("sentimentScore") if existing.get("keywords"): c2["keywords"] = existing.get("keywords") # Preserve a previously-classified topic so we don't drift back # to the heuristic on cache hits. if existing.get("topic"): c2["topic"] = existing.get("topic") else: try: c2 = await classify_cluster_llm(c2) except Exception: logger.exception("LLM enrichment failed for cluster %s (topic %s)", c2.get("cluster_id"), t) # Mark so we can retry on next refresh. c2["enrichment_failed_at"] = datetime.now(timezone.utc).isoformat() enriched.append(c2) # Persist clusters under their *post-enrichment* topic so the SQL row # column matches what the LLM (or the validated heuristic fallback) # actually decided. Previously, every cluster from this bucket was # forced into the heuristic topic `t`, which caused a ~97% mismatch # between row-column topic and payload topic. by_final_topic: Dict[str, list] = {} for c2 in enriched: final_topic = str(c2.get("topic") or t or "other").strip().lower() if final_topic not in {x.lower() for x in DEFAULT_TOPICS}: final_topic = "other" by_final_topic.setdefault(final_topic, []).append(c2) for final_topic, group in by_final_topic.items(): store.upsert_clusters(group, topic=final_topic) logger.info("refresh stored topic=%s clusters=%s (heuristic_topic=%s)", final_topic, len(group), t) prune_result = store.prune_if_due( pruning_enabled=NEWS_PRUNING_ENABLED, retention_days=NEWS_RETENTION_DAYS, interval_hours=NEWS_PRUNE_INTERVAL_HOURS, ) store.set_meta("last_refresh_at", datetime.now(timezone.utc).isoformat()) logger.info("refresh prune_result=%s", prune_result)