poller.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355
  1. from __future__ import annotations
  2. import asyncio
  3. import hashlib
  4. import logging
  5. import sys
  6. from collections import defaultdict
  7. from datetime import datetime, timezone, timedelta
  8. from typing import Any, Dict
  9. from news_mcp.config import (
  10. DEFAULT_LOOKBACK_HOURS,
  11. DEFAULT_TOPICS,
  12. DB_PATH,
  13. ENRICH_OTHER_TOPICS_ONLY,
  14. ENRICHMENT_MAX_PER_REFRESH,
  15. NEWS_EXTRACT_PROVIDER,
  16. NEWS_FEED_URL,
  17. NEWS_FEED_URLS,
  18. NEWS_PRUNE_INTERVAL_HOURS,
  19. NEWS_PRUNING_ENABLED,
  20. NEWS_RETENTION_DAYS,
  21. NEWS_CLUSTER_MAX_AGE_HOURS,
  22. llm_concurrency,
  23. )
  24. from news_mcp.dedup.cluster import dedup_and_cluster_articles, _cluster_is_within_age_window, _parse_ts
  25. from news_mcp.enrichment.enrich import enrich_cluster
  26. from news_mcp.enrichment.llm_enrich import classify_cluster_llm
  27. from news_mcp.sources.news_feeds import fetch_news_articles
  28. from news_mcp.storage.sqlite_store import SQLiteClusterStore
  29. def _load_feed_urls() -> list[str]:
  30. """Return the configured feed URLs from environment (unsorted)."""
  31. urls = [u.strip() for u in NEWS_FEED_URLS.split(",") if u.strip()]
  32. if not urls:
  33. urls = [NEWS_FEED_URL]
  34. return urls
  35. MAX_ENRICHMENT_RETRIES = 3 # per-cluster retries before giving up for this cycle
  36. async def _enrich_single_cluster(
  37. c: dict,
  38. topic: str,
  39. llm_enabled: bool,
  40. semaphore: asyncio.Semaphore,
  41. store: SQLiteClusterStore,
  42. logger: logging.Logger,
  43. ) -> dict:
  44. """Enrich one cluster: heuristic + optional LLM extraction, concurrency-limited.
  45. Rule: if the cluster already has entities AND keywords (from a previous
  46. enrichment), skip the LLM call entirely. The data on the dict IS the
  47. cache — no need to look up enriched_at timestamps or query the DB by
  48. cluster_id. This works regardless of whether cluster_id changed due to
  49. article merging across polling cycles.
  50. On LLM failure the cluster is retried up to MAX_ENRICHMENT_RETRIES times
  51. with exponential backoff. If all retries are exhausted the cluster is
  52. marked with enrichment_failed_at and enrichment_retry_count so the next
  53. polling cycle can re-attempt it.
  54. """
  55. c2 = enrich_cluster(c)
  56. c2.setdefault("topic", topic)
  57. cluster_id = c2.get("cluster_id")
  58. if llm_enabled and cluster_id:
  59. # --- Cache check: if the cluster already has entities AND keywords,
  60. # it was enriched in a previous cycle. Skip LLM entirely.
  61. _existing_entities = c2.get("entities") or []
  62. _existing_keywords = c2.get("keywords") or []
  63. if _existing_entities and _existing_keywords:
  64. logger.debug("enrich skip (already enriched) cluster=%s topic=%s", cluster_id, topic)
  65. return c2
  66. # --- Actually call the LLM ---
  67. last_err = ""
  68. for attempt in range(1 + MAX_ENRICHMENT_RETRIES):
  69. if attempt > 0:
  70. backoff = 2 ** attempt
  71. logger.info(
  72. "retry cluster=%s topic=%s attempt=%d/%d backoff=%.0fs",
  73. cluster_id, topic, attempt, MAX_ENRICHMENT_RETRIES, backoff,
  74. )
  75. await asyncio.sleep(backoff)
  76. try:
  77. async with semaphore:
  78. c2 = await classify_cluster_llm(dict(c2))
  79. c2["enriched_at"] = datetime.now(timezone.utc).isoformat()
  80. break # success
  81. except Exception:
  82. last_err = str(sys.exc_info()[1])[:200] if sys.exc_info()[1] else "unknown"
  83. logger.warning(
  84. "LLM enrichment failed cluster=%s topic=%s attempt=%d/%d err=%s",
  85. cluster_id, topic, attempt, MAX_ENRICHMENT_RETRIES, last_err,
  86. )
  87. else:
  88. # Loop completed without break = all retries exhausted
  89. prev_count = c2.get("enrichment_retry_count", 0)
  90. c2["enrichment_failed_at"] = datetime.now(timezone.utc).isoformat()
  91. c2["enrichment_retry_count"] = prev_count + 1
  92. logger.error(
  93. "LLM enrichment exhausted cluster=%s topic=%s after %d retries",
  94. cluster_id, topic, MAX_ENRICHMENT_RETRIES,
  95. )
  96. return c2
  97. async def _enrich_topic_clusters(
  98. clusters: list[dict],
  99. topic: str,
  100. semaphore: asyncio.Semaphore,
  101. store: SQLiteClusterStore,
  102. logger: logging.Logger,
  103. enrich_limit: int,
  104. ) -> list[dict]:
  105. """Enrich all clusters for a single topic concurrently."""
  106. llm_enabled = (not ENRICH_OTHER_TOPICS_ONLY) or (topic == "other")
  107. # Persist the raw clusters first so a slow enrichment pass does not
  108. # leave the first bootstrap run with nothing stored.
  109. store.upsert_clusters(clusters, topic=topic)
  110. logger.info("refresh stored raw topic=%s clusters=%s", topic, len(clusters))
  111. targets = clusters[:enrich_limit]
  112. tasks = [
  113. _enrich_single_cluster(c, topic, llm_enabled, semaphore, store, logger)
  114. for c in targets
  115. ]
  116. enriched = await asyncio.gather(*tasks, return_exceptions=False)
  117. # Any clusters beyond enrich_limit still need importance enrichment
  118. for c in clusters[enrich_limit:]:
  119. c2 = enrich_cluster(c)
  120. c2.setdefault("topic", topic)
  121. enriched.append(c2)
  122. logger.info("refresh enriched topic=%s clusters=%s", topic, len(enriched))
  123. return enriched
  124. def _cluster_age_ok(cluster: dict, max_age_hours: float) -> bool:
  125. """Deprecated alias — use _cluster_is_within_age_window from cluster.py."""
  126. return _cluster_is_within_age_window(cluster, max_age_hours=max_age_hours)
  127. async def refresh_clusters(topic: str | None = None, limit: int = 80) -> None:
  128. logger = logging.getLogger("news_mcp.refresh")
  129. store = SQLiteClusterStore(DB_PATH)
  130. logger.info("refresh start topic=%s limit=%s", topic, limit)
  131. # Get enabled feed URLs from store (seeds new ones as enabled by default).
  132. configured_urls = _load_feed_urls()
  133. enabled_urls = store.get_enabled_feed_urls(configured_urls)
  134. logger.info("refresh enabled feeds=%d / configured=%d", len(enabled_urls), len(configured_urls))
  135. # fetch_news_articles is now fully async (concurrent RSS fetching)
  136. articles = await fetch_news_articles(limit, url_list=enabled_urls)
  137. logger.info("refresh fetched articles=%s", len(articles))
  138. # Drop legacy aggregate feed-state rows so the dashboard only reflects
  139. # real per-feed poll status from this point forward.
  140. with store._conn() as conn:
  141. conn.execute("DELETE FROM feed_state WHERE feed_key LIKE 'newsfeeds:%'")
  142. # Track feed freshness per RSS URL so unchanged feeds can be skipped.
  143. per_feed: dict[str, list[dict[str, Any]]] = defaultdict(list)
  144. for article in articles:
  145. feed_url = str(article.get("feed_url") or NEWS_FEED_URL).strip() or NEWS_FEED_URL
  146. per_feed[feed_url].append(article)
  147. changed_articles: list[dict[str, Any]] = []
  148. changed_feed_urls: list[str] = []
  149. for feed_url, feed_articles in per_feed.items():
  150. logger.info("refresh feed batch start feed_url=%s count=%s", feed_url, len(feed_articles))
  151. material = "\n".join(
  152. f"{a.get('title','')}|{a.get('url','')}|{a.get('timestamp','')}"
  153. for a in feed_articles
  154. )
  155. last_hash = hashlib.sha1(material.encode("utf-8")).hexdigest()
  156. feed_key = feed_url
  157. prev_hash = store.get_feed_hash(feed_key)
  158. if prev_hash == last_hash:
  159. logger.info("refresh unchanged feed_url=%s count=%s topic=%s", feed_url, len(feed_articles), topic)
  160. else:
  161. logger.info("refresh changed feed_url=%s count=%s topic=%s", feed_url, len(feed_articles), topic)
  162. changed_feed_urls.append(feed_url)
  163. changed_articles.extend(feed_articles)
  164. logger.info("refresh feed batch complete feed_url=%s changed_total=%s", feed_url, len(changed_articles))
  165. if not changed_articles:
  166. logger.info("refresh unchanged all feeds topic=%s", topic)
  167. store.set_meta("last_refresh_at", datetime.now(timezone.utc).isoformat())
  168. prune_result = store.prune_if_due(
  169. pruning_enabled=NEWS_PRUNING_ENABLED,
  170. retention_days=NEWS_RETENTION_DAYS,
  171. interval_hours=NEWS_PRUNE_INTERVAL_HOURS,
  172. )
  173. logger.info("refresh prune_result=%s", prune_result)
  174. return
  175. articles = changed_articles
  176. # Pre-filter: drop articles whose RSS timestamp is older than retention.
  177. # This prevents stale feed items from being re-ingested after pruning.
  178. if NEWS_RETENTION_DAYS > 0:
  179. retention_cutoff = datetime.now(timezone.utc) - timedelta(days=NEWS_RETENTION_DAYS)
  180. fresh_articles = []
  181. for a in articles:
  182. ts_str = a.get("timestamp", "")
  183. if not ts_str:
  184. fresh_articles.append(a)
  185. continue
  186. dt = _parse_ts(ts_str)
  187. if dt is None:
  188. fresh_articles.append(a)
  189. continue
  190. if dt >= retention_cutoff:
  191. fresh_articles.append(a)
  192. else:
  193. logger.debug("drop stale article title=%s ts=%s", a.get("title", "")[:60], ts_str)
  194. dropped = len(articles) - len(fresh_articles)
  195. if dropped:
  196. logger.info("refresh retention-filter dropped=%d remaining=%d retention_days=%.0f", dropped, len(fresh_articles), NEWS_RETENTION_DAYS)
  197. articles = fresh_articles
  198. if not articles:
  199. logger.info("refresh no articles after retention filter topic=%s", topic)
  200. store.set_meta("last_refresh_at", datetime.now(timezone.utc).isoformat())
  201. prune_result = store.prune_if_due(
  202. pruning_enabled=NEWS_PRUNING_ENABLED,
  203. retention_days=NEWS_RETENTION_DAYS,
  204. interval_hours=NEWS_PRUNE_INTERVAL_HOURS,
  205. )
  206. logger.info("refresh prune_result=%s", prune_result)
  207. return
  208. logger.info("refresh clustering start articles=%s topic=%s", len(articles), topic)
  209. # Pre-seed with recent clusters from the DB so new articles can merge
  210. # into existing clusters across polling cycles.
  211. max_age = NEWS_CLUSTER_MAX_AGE_HOURS
  212. recent_clusters: list[dict] = []
  213. if max_age != 0:
  214. lookback = max_age if max_age > 0 else 72
  215. all_recent = store.get_latest_clusters_all_topics(
  216. ttl_hours=lookback,
  217. limit=500,
  218. )
  219. recent_clusters = [c for c in all_recent if _cluster_age_ok(c, max_age)]
  220. logger.info(
  221. "refresh pre-seeded existing_clusters=%s max_age_h=%s",
  222. len(recent_clusters), max_age,
  223. )
  224. # Clustering is sync but may do concurrent embedding fetches internally.
  225. # Run off-thread so the event loop stays responsive for MCP tool calls.
  226. clustered_by_topic = await asyncio.to_thread(
  227. dedup_and_cluster_articles,
  228. articles,
  229. None, # use default similarity_threshold
  230. existing_clusters=recent_clusters if recent_clusters else None,
  231. max_age_hours=max_age,
  232. )
  233. logger.info("refresh clustered topics=%s", list(clustered_by_topic.keys()))
  234. # Build LLM concurrency semaphore from the extract provider's config.
  235. max_llm_concurrent = llm_concurrency(NEWS_EXTRACT_PROVIDER)
  236. llm_semaphore = asyncio.Semaphore(max_llm_concurrent)
  237. logger.info("refresh llm semaphore limit=%s provider=%s", max_llm_concurrent, NEWS_EXTRACT_PROVIDER)
  238. from news_mcp.config import llm_rate_limit as _rl
  239. _rate = _rl(NEWS_EXTRACT_PROVIDER)
  240. logger.info("refresh llm rate-limit=%s/s provider=%s", _rate, NEWS_EXTRACT_PROVIDER)
  241. # Enrich each topic's clusters concurrently.
  242. topic_tasks = []
  243. for t, clusters in clustered_by_topic.items():
  244. if topic and t != topic:
  245. continue
  246. # Determine how many clusters to LLM-enrich.
  247. # ENRICHMENT_MAX_PER_REFRESH=0 means enrich every cluster (no cap).
  248. enrich_limit = ENRICHMENT_MAX_PER_REFRESH or len(clusters)
  249. topic_tasks.append(
  250. _enrich_topic_clusters(
  251. clusters=clusters,
  252. topic=t,
  253. semaphore=llm_semaphore,
  254. store=store,
  255. logger=logger,
  256. enrich_limit=enrich_limit,
  257. )
  258. )
  259. # Run all topic enrichment phases concurrently
  260. topic_results = await asyncio.gather(*topic_tasks, return_exceptions=False)
  261. # Persist enriched clusters grouped by their final topic
  262. for enriched in topic_results:
  263. by_final_topic: Dict[str, list] = {}
  264. for c2 in enriched:
  265. final_topic = str(c2.get("topic") or "other").strip().lower()
  266. if final_topic not in {x.lower() for x in DEFAULT_TOPICS}:
  267. final_topic = "other"
  268. by_final_topic.setdefault(final_topic, []).append(c2)
  269. for final_topic, group in by_final_topic.items():
  270. store.upsert_clusters(group, topic=final_topic)
  271. logger.info("refresh stored topic=%s clusters=%s", final_topic, len(group))
  272. # Retry previously failed enrichments
  273. failed_clusters = store.get_failed_enrichment_clusters(max_retries=3)
  274. if failed_clusters:
  275. logger.info("retry enrich failed clusters count=%d", len(failed_clusters))
  276. retry_tasks = [
  277. _enrich_single_cluster(
  278. c, str(c.get("topic") or "other"), True, llm_semaphore, store, logger,
  279. )
  280. for c in failed_clusters
  281. ]
  282. retry_results = await asyncio.gather(*retry_tasks, return_exceptions=False)
  283. # Persist retried results
  284. by_topic_retry: Dict[str, list] = {}
  285. for c2 in retry_results:
  286. # Clear stale failure marker on success
  287. if not c2.get("enrichment_failed_at") or c2.get("entities"):
  288. c2.pop("enrichment_failed_at", None)
  289. c2.pop("enrichment_retry_count", None)
  290. t = str(c2.get("topic") or "other").strip().lower()
  291. if t not in {x.lower() for x in DEFAULT_TOPICS}:
  292. t = "other"
  293. by_topic_retry.setdefault(t, []).append(c2)
  294. for t, group in by_topic_retry.items():
  295. store.upsert_clusters(group, topic=t)
  296. logger.info("retry stored topic=%s clusters=%s", t, len(group))
  297. prune_result = store.prune_if_due(
  298. pruning_enabled=NEWS_PRUNING_ENABLED,
  299. retention_days=NEWS_RETENTION_DAYS,
  300. interval_hours=NEWS_PRUNE_INTERVAL_HOURS,
  301. )
  302. for feed_url in changed_feed_urls:
  303. feed_articles = per_feed[feed_url]
  304. material = "\n".join(
  305. f"{a.get('title','')}|{a.get('url','')}|{a.get('timestamp','')}"
  306. for a in feed_articles
  307. )
  308. last_hash = hashlib.sha1(material.encode("utf-8")).hexdigest()
  309. store.set_feed_state(feed_url, last_hash, len(feed_articles))
  310. store.set_meta("last_refresh_at", datetime.now(timezone.utc).isoformat())
  311. logger.info("refresh prune_result=%s", prune_result)