| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120 |
- from __future__ import annotations
- import asyncio
- import logging
- from datetime import datetime, timezone
- from typing import Any, Dict
- from news_mcp.config import DEFAULT_LOOKBACK_HOURS, DB_PATH, NEWS_FEED_URL, NEWS_FEED_URLS
- from news_mcp.dedup.cluster import dedup_and_cluster_articles
- from news_mcp.enrichment.enrich import enrich_cluster
- from news_mcp.enrichment.llm_enrich import classify_cluster_llm
- from news_mcp.trends_resolution import resolve_entity_via_trends
- from news_mcp.sources.news_feeds import fetch_news_articles
- from news_mcp.storage.sqlite_store import SQLiteClusterStore
- from news_mcp.config import (
- ENRICH_OTHER_TOPICS_ONLY,
- ENRICHMENT_MAX_PER_REFRESH,
- NEWS_PRUNE_INTERVAL_HOURS,
- NEWS_PRUNING_ENABLED,
- NEWS_RETENTION_DAYS,
- )
- async def refresh_clusters(topic: str | None = None, limit: int = 80) -> None:
- logger = logging.getLogger("news_mcp.refresh")
- store = SQLiteClusterStore(DB_PATH)
- logger.info("refresh start topic=%s limit=%s", topic, limit)
- articles = await asyncio.to_thread(fetch_news_articles, limit)
- logger.info("refresh fetched articles=%s", len(articles))
- # Skip expensive work if the feed content (titles/urls/timestamps) didn't change.
- import hashlib
- rss_urls = [u.strip() for u in NEWS_FEED_URLS.split(",") if u.strip()]
- if not rss_urls:
- rss_urls = [NEWS_FEED_URL]
- feed_key = "newsfeeds:" + hashlib.sha1(",".join(rss_urls).encode("utf-8")).hexdigest()
- material = "\n".join(
- f"{a.get('title','')}|{a.get('url','')}|{a.get('timestamp','')}"
- for a in articles
- )
- last_hash = hashlib.sha1(material.encode("utf-8")).hexdigest()
- prev_hash = store.get_feed_hash(feed_key)
- if prev_hash == last_hash:
- logger.info("refresh unchanged feed_key=%s topic=%s", feed_key, topic)
- store.set_meta("last_refresh_at", datetime.now(timezone.utc).isoformat())
- prune_result = store.prune_if_due(
- pruning_enabled=NEWS_PRUNING_ENABLED,
- retention_days=NEWS_RETENTION_DAYS,
- interval_hours=NEWS_PRUNE_INTERVAL_HOURS,
- )
- logger.info("refresh prune_result=%s", prune_result)
- return
- else:
- logger.info("refresh changed feed_key=%s topic=%s", feed_key, topic)
- store.set_feed_hash(feed_key, last_hash)
- clustered_by_topic = dedup_and_cluster_articles(articles)
- logger.info("refresh clustered topics=%s", list(clustered_by_topic.keys()))
- for t, clusters in clustered_by_topic.items():
- if topic and t != topic:
- continue
- enriched = []
- # Determine how many clusters to LLM-enrich.
- # ENRICHMENT_MAX_PER_REFRESH=0 means enrich every cluster (no cap).
- enrich_limit = ENRICHMENT_MAX_PER_REFRESH or len(clusters)
- # Track whether the LLM pipeline is available for this topic.
- _llm_enabled_for_topic = (
- (not ENRICH_OTHER_TOPICS_ONLY) or (t == "other")
- )
- for idx, c in enumerate(clusters[:enrich_limit]):
- c2 = enrich_cluster(c)
- if _llm_enabled_for_topic:
- # Cache: if we already have entities/sentiment for this cluster, skip LLM call.
- existing = store.get_cluster_by_id(c2.get("cluster_id"))
- if existing and existing.get("entities"):
- c2 = dict(c2)
- # Keep existing enriched fields.
- c2["entities"] = existing.get("entities", [])
- # IMPORTANT: entityResolutions must stay consistent with entities.
- # Older rows may have entities but missing/malformed resolutions.
- existing_resolutions = existing.get("entityResolutions", None)
- if isinstance(existing_resolutions, list) and existing_resolutions:
- c2["entityResolutions"] = existing_resolutions
- else:
- # Recompute resolutions deterministically from the stored entities.
- c2["entityResolutions"] = [resolve_entity_via_trends(e) for e in c2["entities"]]
- if existing.get("sentiment"):
- c2["sentiment"] = existing.get("sentiment")
- if existing.get("sentimentScore") is not None:
- c2["sentimentScore"] = existing.get("sentimentScore")
- if existing.get("keywords"):
- c2["keywords"] = existing.get("keywords")
- else:
- try:
- c2 = await classify_cluster_llm(c2)
- except Exception:
- logger.exception("LLM enrichment failed for cluster %s (topic %s)", c2.get("cluster_id"), t)
- # Mark so we can retry on next refresh.
- c2["enrichment_failed_at"] = datetime.now(timezone.utc).isoformat()
- enriched.append(c2)
- store.upsert_clusters(enriched, topic=t)
- logger.info("refresh stored topic=%s clusters=%s", t, len(enriched))
- prune_result = store.prune_if_due(
- pruning_enabled=NEWS_PRUNING_ENABLED,
- retention_days=NEWS_RETENTION_DAYS,
- interval_hours=NEWS_PRUNE_INTERVAL_HOURS,
- )
- store.set_meta("last_refresh_at", datetime.now(timezone.utc).isoformat())
- logger.info("refresh prune_result=%s", prune_result)
|