| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384 |
- from __future__ import annotations
- import asyncio
- import hashlib
- import logging
- import sys
- from collections import defaultdict
- from datetime import datetime, timezone, timedelta
- from typing import Any, Dict
- from news_mcp.config import (
- DEFAULT_LOOKBACK_HOURS,
- DEFAULT_TOPICS,
- DB_PATH,
- ENRICH_OTHER_TOPICS_ONLY,
- ENRICHMENT_MAX_PER_REFRESH,
- NEWS_EXTRACT_PROVIDER,
- NEWS_FEED_URL,
- NEWS_FEED_URLS,
- NEWS_PRUNE_INTERVAL_HOURS,
- NEWS_PRUNING_ENABLED,
- NEWS_RETENTION_DAYS,
- NEWS_CLUSTER_MAX_AGE_HOURS,
- llm_concurrency,
- )
- from news_mcp.dedup.cluster import dedup_and_cluster_articles, _cluster_is_within_age_window, _parse_ts
- from news_mcp.enrichment.enrich import enrich_cluster
- from news_mcp.enrichment.llm_enrich import classify_cluster_llm
- from news_mcp.sources.news_feeds import fetch_news_articles
- from news_mcp.storage.sqlite_store import SQLiteClusterStore
- from news_mcp.trends_resolution import resolve_entity_via_trends
- def _load_feed_urls() -> list[str]:
- """Return the configured feed URLs from environment (unsorted)."""
- urls = [u.strip() for u in NEWS_FEED_URLS.split(",") if u.strip()]
- if not urls:
- urls = [NEWS_FEED_URL]
- return urls
- MAX_ENRICHMENT_RETRIES = 3 # per-cluster retries before giving up for this cycle
- async def _enrich_single_cluster(
- c: dict,
- topic: str,
- llm_enabled: bool,
- semaphore: asyncio.Semaphore,
- store: SQLiteClusterStore,
- logger: logging.Logger,
- ) -> dict:
- """Enrich one cluster: heuristic + optional LLM extraction, concurrency-limited.
- Cache strategy: if the cluster already has entities and an enriched_at
- timestamp (from a previous successful enrichment), skip the LLM call.
- Clusters only need to be enriched once — the enrichment output
- (entities, keywords, sentiment, topic) is derived from the cluster's
- article content, which doesn't change meaningfully between polls unless
- enough new articles arrive to form a different cluster.
- On LLM failure the cluster is retried up to MAX_ENRICHMENT_RETRIES times
- with exponential backoff. If all retries are exhausted the cluster is
- marked with enrichment_failed_at and enrichment_retry_count so the next
- polling cycle can re-attempt it.
- """
- c2 = enrich_cluster(c)
- c2.setdefault("topic", topic)
- cluster_id = c2.get("cluster_id")
- if llm_enabled and cluster_id:
- # --- Cache check: skip LLM if already enriched ---
- # The cluster payload may already carry enriched_at if it was loaded
- # from an existing DB cluster during cross-cycle seeding.
- enriched_at_str = c2.get("enriched_at")
- if enriched_at_str and c2.get("entities"):
- logger.debug("enrich skip (cached) cluster=%s topic=%s", cluster_id, topic)
- return c2
- # --- Check DB: cluster may have been enriched in a previous cycle ---
- # Note: cluster_id is derived from the article set, so this lookup
- # only matches when the same exact article set was clustered before.
- # That's fine — it means the enrichment is still valid.
- existing = store.get_cluster_by_id(cluster_id)
- if existing and existing.get("entities") and existing.get("enriched_at"):
- logger.info("enrich cache-hit cluster=%s topic=%s", cluster_id, topic)
- c2 = dict(c2)
- c2["entities"] = existing.get("entities", [])
- c2["enriched_at"] = existing.get("enriched_at")
- existing_resolutions = existing.get("entityResolutions", None)
- if isinstance(existing_resolutions, list) and existing_resolutions:
- c2["entityResolutions"] = existing_resolutions
- else:
- c2["entityResolutions"] = [resolve_entity_via_trends(e) for e in c2["entities"]]
- if existing.get("sentiment"):
- c2["sentiment"] = existing.get("sentiment")
- if existing.get("sentimentScore") is not None:
- c2["sentimentScore"] = existing.get("sentimentScore")
- if existing.get("keywords"):
- c2["keywords"] = existing.get("keywords")
- if existing.get("topic"):
- c2["topic"] = existing.get("topic")
- return c2
- # --- Actually call the LLM ---
- last_err = ""
- for attempt in range(1 + MAX_ENRICHMENT_RETRIES):
- if attempt > 0:
- backoff = 2 ** attempt
- logger.info(
- "retry cluster=%s topic=%s attempt=%d/%d backoff=%.0fs",
- cluster_id, topic, attempt, MAX_ENRICHMENT_RETRIES, backoff,
- )
- await asyncio.sleep(backoff)
- try:
- async with semaphore:
- c2 = await classify_cluster_llm(dict(c2))
- c2["enriched_at"] = datetime.now(timezone.utc).isoformat()
- break # success
- except Exception:
- last_err = str(sys.exc_info()[1])[:200] if sys.exc_info()[1] else "unknown"
- logger.warning(
- "LLM enrichment failed cluster=%s topic=%s attempt=%d/%d err=%s",
- cluster_id, topic, attempt, MAX_ENRICHMENT_RETRIES, last_err,
- )
- else:
- # Loop completed without break = all retries exhausted
- prev_count = c2.get("enrichment_retry_count", 0)
- c2["enrichment_failed_at"] = datetime.now(timezone.utc).isoformat()
- c2["enrichment_retry_count"] = prev_count + 1
- logger.error(
- "LLM enrichment exhausted cluster=%s topic=%s after %d retries",
- cluster_id, topic, MAX_ENRICHMENT_RETRIES,
- )
- return c2
- async def _enrich_topic_clusters(
- clusters: list[dict],
- topic: str,
- semaphore: asyncio.Semaphore,
- store: SQLiteClusterStore,
- logger: logging.Logger,
- enrich_limit: int,
- ) -> list[dict]:
- """Enrich all clusters for a single topic concurrently."""
- llm_enabled = (not ENRICH_OTHER_TOPICS_ONLY) or (topic == "other")
- # Persist the raw clusters first so a slow enrichment pass does not
- # leave the first bootstrap run with nothing stored.
- store.upsert_clusters(clusters, topic=topic)
- logger.info("refresh stored raw topic=%s clusters=%s", topic, len(clusters))
- targets = clusters[:enrich_limit]
- tasks = [
- _enrich_single_cluster(c, topic, llm_enabled, semaphore, store, logger)
- for c in targets
- ]
- enriched = await asyncio.gather(*tasks, return_exceptions=False)
- # Any clusters beyond enrich_limit still need importance enrichment
- for c in clusters[enrich_limit:]:
- c2 = enrich_cluster(c)
- c2.setdefault("topic", topic)
- enriched.append(c2)
- logger.info("refresh enriched topic=%s clusters=%s", topic, len(enriched))
- return enriched
- def _cluster_age_ok(cluster: dict, max_age_hours: float) -> bool:
- """Deprecated alias — use _cluster_is_within_age_window from cluster.py."""
- return _cluster_is_within_age_window(cluster, max_age_hours=max_age_hours)
- async def refresh_clusters(topic: str | None = None, limit: int = 80) -> None:
- logger = logging.getLogger("news_mcp.refresh")
- store = SQLiteClusterStore(DB_PATH)
- logger.info("refresh start topic=%s limit=%s", topic, limit)
- # Get enabled feed URLs from store (seeds new ones as enabled by default).
- configured_urls = _load_feed_urls()
- enabled_urls = store.get_enabled_feed_urls(configured_urls)
- logger.info("refresh enabled feeds=%d / configured=%d", len(enabled_urls), len(configured_urls))
- # fetch_news_articles is now fully async (concurrent RSS fetching)
- articles = await fetch_news_articles(limit, url_list=enabled_urls)
- logger.info("refresh fetched articles=%s", len(articles))
- # Drop legacy aggregate feed-state rows so the dashboard only reflects
- # real per-feed poll status from this point forward.
- with store._conn() as conn:
- conn.execute("DELETE FROM feed_state WHERE feed_key LIKE 'newsfeeds:%'")
- # Track feed freshness per RSS URL so unchanged feeds can be skipped.
- per_feed: dict[str, list[dict[str, Any]]] = defaultdict(list)
- for article in articles:
- feed_url = str(article.get("feed_url") or NEWS_FEED_URL).strip() or NEWS_FEED_URL
- per_feed[feed_url].append(article)
- changed_articles: list[dict[str, Any]] = []
- changed_feed_urls: list[str] = []
- for feed_url, feed_articles in per_feed.items():
- logger.info("refresh feed batch start feed_url=%s count=%s", feed_url, len(feed_articles))
- material = "\n".join(
- f"{a.get('title','')}|{a.get('url','')}|{a.get('timestamp','')}"
- for a in feed_articles
- )
- last_hash = hashlib.sha1(material.encode("utf-8")).hexdigest()
- feed_key = feed_url
- prev_hash = store.get_feed_hash(feed_key)
- if prev_hash == last_hash:
- logger.info("refresh unchanged feed_url=%s count=%s topic=%s", feed_url, len(feed_articles), topic)
- else:
- logger.info("refresh changed feed_url=%s count=%s topic=%s", feed_url, len(feed_articles), topic)
- changed_feed_urls.append(feed_url)
- changed_articles.extend(feed_articles)
- logger.info("refresh feed batch complete feed_url=%s changed_total=%s", feed_url, len(changed_articles))
- if not changed_articles:
- logger.info("refresh unchanged all feeds topic=%s", topic)
- store.set_meta("last_refresh_at", datetime.now(timezone.utc).isoformat())
- prune_result = store.prune_if_due(
- pruning_enabled=NEWS_PRUNING_ENABLED,
- retention_days=NEWS_RETENTION_DAYS,
- interval_hours=NEWS_PRUNE_INTERVAL_HOURS,
- )
- logger.info("refresh prune_result=%s", prune_result)
- return
- articles = changed_articles
- # Pre-filter: drop articles whose RSS timestamp is older than retention.
- # This prevents stale feed items from being re-ingested after pruning.
- if NEWS_RETENTION_DAYS > 0:
- retention_cutoff = datetime.now(timezone.utc) - timedelta(days=NEWS_RETENTION_DAYS)
- fresh_articles = []
- for a in articles:
- ts_str = a.get("timestamp", "")
- if not ts_str:
- fresh_articles.append(a)
- continue
- dt = _parse_ts(ts_str)
- if dt is None:
- fresh_articles.append(a)
- continue
- if dt >= retention_cutoff:
- fresh_articles.append(a)
- else:
- logger.debug("drop stale article title=%s ts=%s", a.get("title", "")[:60], ts_str)
- dropped = len(articles) - len(fresh_articles)
- if dropped:
- logger.info("refresh retention-filter dropped=%d remaining=%d retention_days=%.0f", dropped, len(fresh_articles), NEWS_RETENTION_DAYS)
- articles = fresh_articles
- if not articles:
- logger.info("refresh no articles after retention filter topic=%s", topic)
- store.set_meta("last_refresh_at", datetime.now(timezone.utc).isoformat())
- prune_result = store.prune_if_due(
- pruning_enabled=NEWS_PRUNING_ENABLED,
- retention_days=NEWS_RETENTION_DAYS,
- interval_hours=NEWS_PRUNE_INTERVAL_HOURS,
- )
- logger.info("refresh prune_result=%s", prune_result)
- return
- logger.info("refresh clustering start articles=%s topic=%s", len(articles), topic)
- # Pre-seed with recent clusters from the DB so new articles can merge
- # into existing clusters across polling cycles.
- max_age = NEWS_CLUSTER_MAX_AGE_HOURS
- recent_clusters: list[dict] = []
- if max_age != 0:
- lookback = max_age if max_age > 0 else 72
- all_recent = store.get_latest_clusters_all_topics(
- ttl_hours=lookback,
- limit=500,
- )
- recent_clusters = [c for c in all_recent if _cluster_age_ok(c, max_age)]
- logger.info(
- "refresh pre-seeded existing_clusters=%s max_age_h=%s",
- len(recent_clusters), max_age,
- )
- # Clustering is sync but may do concurrent embedding fetches internally.
- # Run off-thread so the event loop stays responsive for MCP tool calls.
- clustered_by_topic = await asyncio.to_thread(
- dedup_and_cluster_articles,
- articles,
- None, # use default similarity_threshold
- existing_clusters=recent_clusters if recent_clusters else None,
- max_age_hours=max_age,
- )
- logger.info("refresh clustered topics=%s", list(clustered_by_topic.keys()))
- # Build LLM concurrency semaphore from the extract provider's config.
- max_llm_concurrent = llm_concurrency(NEWS_EXTRACT_PROVIDER)
- llm_semaphore = asyncio.Semaphore(max_llm_concurrent)
- logger.info("refresh llm semaphore limit=%s provider=%s", max_llm_concurrent, NEWS_EXTRACT_PROVIDER)
- from news_mcp.config import llm_rate_limit as _rl
- _rate = _rl(NEWS_EXTRACT_PROVIDER)
- logger.info("refresh llm rate-limit=%s/s provider=%s", _rate, NEWS_EXTRACT_PROVIDER)
- # Enrich each topic's clusters concurrently.
- topic_tasks = []
- for t, clusters in clustered_by_topic.items():
- if topic and t != topic:
- continue
- # Determine how many clusters to LLM-enrich.
- # ENRICHMENT_MAX_PER_REFRESH=0 means enrich every cluster (no cap).
- enrich_limit = ENRICHMENT_MAX_PER_REFRESH or len(clusters)
- topic_tasks.append(
- _enrich_topic_clusters(
- clusters=clusters,
- topic=t,
- semaphore=llm_semaphore,
- store=store,
- logger=logger,
- enrich_limit=enrich_limit,
- )
- )
- # Run all topic enrichment phases concurrently
- topic_results = await asyncio.gather(*topic_tasks, return_exceptions=False)
- # Persist enriched clusters grouped by their final topic
- for enriched in topic_results:
- by_final_topic: Dict[str, list] = {}
- for c2 in enriched:
- final_topic = str(c2.get("topic") or "other").strip().lower()
- if final_topic not in {x.lower() for x in DEFAULT_TOPICS}:
- final_topic = "other"
- by_final_topic.setdefault(final_topic, []).append(c2)
- for final_topic, group in by_final_topic.items():
- store.upsert_clusters(group, topic=final_topic)
- logger.info("refresh stored topic=%s clusters=%s", final_topic, len(group))
- # Retry previously failed enrichments
- failed_clusters = store.get_failed_enrichment_clusters(max_retries=3)
- if failed_clusters:
- logger.info("retry enrich failed clusters count=%d", len(failed_clusters))
- retry_tasks = [
- _enrich_single_cluster(
- c, str(c.get("topic") or "other"), True, llm_semaphore, store, logger,
- )
- for c in failed_clusters
- ]
- retry_results = await asyncio.gather(*retry_tasks, return_exceptions=False)
- # Persist retried results
- by_topic_retry: Dict[str, list] = {}
- for c2 in retry_results:
- # Clear stale failure marker on success
- if not c2.get("enrichment_failed_at") or c2.get("entities"):
- c2.pop("enrichment_failed_at", None)
- c2.pop("enrichment_retry_count", None)
- t = str(c2.get("topic") or "other").strip().lower()
- if t not in {x.lower() for x in DEFAULT_TOPICS}:
- t = "other"
- by_topic_retry.setdefault(t, []).append(c2)
- for t, group in by_topic_retry.items():
- store.upsert_clusters(group, topic=t)
- logger.info("retry stored topic=%s clusters=%s", t, len(group))
- prune_result = store.prune_if_due(
- pruning_enabled=NEWS_PRUNING_ENABLED,
- retention_days=NEWS_RETENTION_DAYS,
- interval_hours=NEWS_PRUNE_INTERVAL_HOURS,
- )
- for feed_url in changed_feed_urls:
- feed_articles = per_feed[feed_url]
- material = "\n".join(
- f"{a.get('title','')}|{a.get('url','')}|{a.get('timestamp','')}"
- for a in feed_articles
- )
- last_hash = hashlib.sha1(material.encode("utf-8")).hexdigest()
- store.set_feed_state(feed_url, last_hash, len(feed_articles))
- store.set_meta("last_refresh_at", datetime.now(timezone.utc).isoformat())
- logger.info("refresh prune_result=%s", prune_result)
|