from __future__ import annotations import logging from typing import Any, Dict from news_mcp.config import DEFAULT_LOOKBACK_HOURS, DB_PATH, NEWS_FEED_URL, NEWS_FEED_URLS from news_mcp.dedup.cluster import dedup_and_cluster_articles from news_mcp.enrichment.enrich import enrich_cluster from news_mcp.enrichment.llm_enrich import classify_cluster_llm from news_mcp.trends_resolution import resolve_entity_via_trends from news_mcp.sources.news_feeds import fetch_news_articles from news_mcp.storage.sqlite_store import SQLiteClusterStore from news_mcp.config import ( GROQ_ENRICH_OTHER_ONLY, GROQ_MAX_CLUSTERS_PER_REFRESH, NEWS_PRUNE_INTERVAL_HOURS, NEWS_PRUNING_ENABLED, NEWS_RETENTION_DAYS, ) async def refresh_clusters(topic: str | None = None, limit: int = 80) -> None: logger = logging.getLogger("news_mcp.refresh") store = SQLiteClusterStore(DB_PATH) logger.info("refresh start topic=%s limit=%s", topic, limit) articles = fetch_news_articles(limit=limit) logger.info("refresh fetched articles=%s", len(articles)) # Skip expensive work if the feed content (titles/urls/timestamps) didn't change. import hashlib rss_urls = [u.strip() for u in NEWS_FEED_URLS.split(",") if u.strip()] if not rss_urls: rss_urls = [NEWS_FEED_URL] feed_key = "newsfeeds:" + hashlib.sha1(",".join(rss_urls).encode("utf-8")).hexdigest() material = "\n".join( f"{a.get('title','')}|{a.get('url','')}|{a.get('timestamp','')}" for a in articles ) last_hash = hashlib.sha1(material.encode("utf-8")).hexdigest() prev_hash = store.get_feed_hash(feed_key) if prev_hash == last_hash: logger.info("refresh unchanged feed_key=%s topic=%s", feed_key, topic) else: logger.info("refresh changed feed_key=%s topic=%s", feed_key, topic) store.set_feed_hash(feed_key, last_hash) clustered_by_topic = dedup_and_cluster_articles(articles) logger.info("refresh clustered topics=%s", list(clustered_by_topic.keys())) for t, clusters in clustered_by_topic.items(): if topic and t != topic: continue enriched = [] # Always compute cheap enrichment first. for idx, c in enumerate(clusters[:GROQ_MAX_CLUSTERS_PER_REFRESH]): c2 = enrich_cluster(c) # Groq enrichment only when configured. if (not GROQ_ENRICH_OTHER_ONLY) or (t == "other"): # Cache Groq: if we already have entities/sentiment for this cluster, skip. existing = store.get_cluster_by_id(c2.get("cluster_id")) if existing and existing.get("entities"): c2 = dict(c2) # Keep existing enriched fields. c2["entities"] = existing.get("entities", []) # IMPORTANT: entityResolutions must stay consistent with entities. # Older rows may have entities but missing/malformed resolutions. existing_resolutions = existing.get("entityResolutions", None) if isinstance(existing_resolutions, list) and existing_resolutions: c2["entityResolutions"] = existing_resolutions else: # Recompute resolutions deterministically from the stored entities. c2["entityResolutions"] = [resolve_entity_via_trends(e) for e in c2["entities"]] if existing.get("sentiment"): c2["sentiment"] = existing.get("sentiment") if existing.get("sentimentScore") is not None: c2["sentimentScore"] = existing.get("sentimentScore") if existing.get("keywords"): c2["keywords"] = existing.get("keywords") else: c2 = await classify_cluster_llm(c2) enriched.append(c2) store.upsert_clusters(enriched, topic=t) logger.info("refresh stored topic=%s clusters=%s", t, len(enriched)) prune_result = store.prune_if_due( pruning_enabled=NEWS_PRUNING_ENABLED, retention_days=NEWS_RETENTION_DAYS, interval_hours=NEWS_PRUNE_INTERVAL_HOURS, ) logger.info("refresh prune_result=%s", prune_result)