poller.py 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. from __future__ import annotations
  2. import asyncio
  3. import hashlib
  4. import logging
  5. from collections import defaultdict
  6. from datetime import datetime, timezone
  7. from typing import Any, Dict
  8. from news_mcp.config import (
  9. DEFAULT_LOOKBACK_HOURS,
  10. DEFAULT_TOPICS,
  11. DB_PATH,
  12. ENRICH_OTHER_TOPICS_ONLY,
  13. ENRICHMENT_MAX_PER_REFRESH,
  14. NEWS_EXTRACT_PROVIDER,
  15. NEWS_FEED_URL,
  16. NEWS_FEED_URLS,
  17. NEWS_PRUNE_INTERVAL_HOURS,
  18. NEWS_PRUNING_ENABLED,
  19. NEWS_RETENTION_DAYS,
  20. llm_concurrency,
  21. )
  22. from news_mcp.dedup.cluster import dedup_and_cluster_articles
  23. from news_mcp.enrichment.enrich import enrich_cluster
  24. from news_mcp.enrichment.llm_enrich import classify_cluster_llm
  25. from news_mcp.sources.news_feeds import fetch_news_articles
  26. from news_mcp.storage.sqlite_store import SQLiteClusterStore
  27. from news_mcp.trends_resolution import resolve_entity_via_trends
  28. async def _enrich_single_cluster(
  29. c: dict,
  30. topic: str,
  31. llm_enabled: bool,
  32. semaphore: asyncio.Semaphore,
  33. store: SQLiteClusterStore,
  34. logger: logging.Logger,
  35. ) -> dict:
  36. """Enrich one cluster: heuristic + optional LLM extraction, concurrency-limited."""
  37. c2 = enrich_cluster(c)
  38. c2.setdefault("topic", topic)
  39. cluster_id = c2.get("cluster_id")
  40. if llm_enabled and cluster_id:
  41. # Cache: if we already have entities/sentiment for this cluster, skip LLM call.
  42. existing = store.get_cluster_by_id(cluster_id)
  43. if existing and existing.get("entities"):
  44. c2 = dict(c2)
  45. c2["entities"] = existing.get("entities", [])
  46. existing_resolutions = existing.get("entityResolutions", None)
  47. if isinstance(existing_resolutions, list) and existing_resolutions:
  48. c2["entityResolutions"] = existing_resolutions
  49. else:
  50. c2["entityResolutions"] = [resolve_entity_via_trends(e) for e in c2["entities"]]
  51. if existing.get("sentiment"):
  52. c2["sentiment"] = existing.get("sentiment")
  53. if existing.get("sentimentScore") is not None:
  54. c2["sentimentScore"] = existing.get("sentimentScore")
  55. if existing.get("keywords"):
  56. c2["keywords"] = existing.get("keywords")
  57. if existing.get("topic"):
  58. c2["topic"] = existing.get("topic")
  59. else:
  60. # Acquire semaphore before making outbound LLM call
  61. async with semaphore:
  62. try:
  63. c2 = await classify_cluster_llm(c2)
  64. except Exception:
  65. logger.exception(
  66. "LLM enrichment failed for cluster %s (topic %s)",
  67. c2.get("cluster_id"), topic,
  68. )
  69. c2["enrichment_failed_at"] = datetime.now(timezone.utc).isoformat()
  70. return c2
  71. async def _enrich_topic_clusters(
  72. clusters: list[dict],
  73. topic: str,
  74. semaphore: asyncio.Semaphore,
  75. store: SQLiteClusterStore,
  76. logger: logging.Logger,
  77. enrich_limit: int,
  78. ) -> list[dict]:
  79. """Enrich all clusters for a single topic concurrently."""
  80. llm_enabled = (not ENRICH_OTHER_TOPICS_ONLY) or (topic == "other")
  81. # Persist the raw clusters first so a slow enrichment pass does not
  82. # leave the first bootstrap run with nothing stored.
  83. store.upsert_clusters(clusters, topic=topic)
  84. logger.info("refresh stored raw topic=%s clusters=%s", topic, len(clusters))
  85. targets = clusters[:enrich_limit]
  86. tasks = [
  87. _enrich_single_cluster(c, topic, llm_enabled, semaphore, store, logger)
  88. for c in targets
  89. ]
  90. enriched = await asyncio.gather(*tasks, return_exceptions=False)
  91. # Any clusters beyond enrich_limit still need importance enrichment
  92. for c in clusters[enrich_limit:]:
  93. c2 = enrich_cluster(c)
  94. c2.setdefault("topic", topic)
  95. enriched.append(c2)
  96. logger.info("refresh enriched topic=%s clusters=%s", topic, len(enriched))
  97. return enriched
  98. async def refresh_clusters(topic: str | None = None, limit: int = 80) -> None:
  99. logger = logging.getLogger("news_mcp.refresh")
  100. store = SQLiteClusterStore(DB_PATH)
  101. logger.info("refresh start topic=%s limit=%s", topic, limit)
  102. # fetch_news_articles is now fully async (concurrent RSS fetching)
  103. articles = await fetch_news_articles(limit)
  104. logger.info("refresh fetched articles=%s", len(articles))
  105. # Drop legacy aggregate feed-state rows so the dashboard only reflects
  106. # real per-feed poll status from this point forward.
  107. with store._conn() as conn:
  108. conn.execute("DELETE FROM feed_state WHERE feed_key LIKE 'newsfeeds:%'")
  109. # Track feed freshness per RSS URL so unchanged feeds can be skipped.
  110. per_feed: dict[str, list[dict[str, Any]]] = defaultdict(list)
  111. for article in articles:
  112. feed_url = str(article.get("feed_url") or NEWS_FEED_URL).strip() or NEWS_FEED_URL
  113. per_feed[feed_url].append(article)
  114. changed_articles: list[dict[str, Any]] = []
  115. changed_feed_urls: list[str] = []
  116. for feed_url, feed_articles in per_feed.items():
  117. logger.info("refresh feed batch start feed_url=%s count=%s", feed_url, len(feed_articles))
  118. material = "\n".join(
  119. f"{a.get('title','')}|{a.get('url','')}|{a.get('timestamp','')}"
  120. for a in feed_articles
  121. )
  122. last_hash = hashlib.sha1(material.encode("utf-8")).hexdigest()
  123. feed_key = feed_url
  124. prev_hash = store.get_feed_hash(feed_key)
  125. if prev_hash == last_hash:
  126. logger.info("refresh unchanged feed_url=%s count=%s topic=%s", feed_url, len(feed_articles), topic)
  127. else:
  128. logger.info("refresh changed feed_url=%s count=%s topic=%s", feed_url, len(feed_articles), topic)
  129. changed_feed_urls.append(feed_url)
  130. changed_articles.extend(feed_articles)
  131. logger.info("refresh feed batch complete feed_url=%s changed_total=%s", feed_url, len(changed_articles))
  132. if not changed_articles:
  133. logger.info("refresh unchanged all feeds topic=%s", topic)
  134. store.set_meta("last_refresh_at", datetime.now(timezone.utc).isoformat())
  135. prune_result = store.prune_if_due(
  136. pruning_enabled=NEWS_PRUNING_ENABLED,
  137. retention_days=NEWS_RETENTION_DAYS,
  138. interval_hours=NEWS_PRUNE_INTERVAL_HOURS,
  139. )
  140. logger.info("refresh prune_result=%s", prune_result)
  141. return
  142. articles = changed_articles
  143. logger.info("refresh clustering start articles=%s topic=%s", len(articles), topic)
  144. # Clustering is sync but may do concurrent embedding fetches internally.
  145. # Run off-thread so the event loop stays responsive for MCP tool calls.
  146. clustered_by_topic = await asyncio.to_thread(dedup_and_cluster_articles, articles)
  147. logger.info("refresh clustered topics=%s", list(clustered_by_topic.keys()))
  148. # Build LLM concurrency semaphore from the extract provider's config.
  149. max_llm_concurrent = llm_concurrency(NEWS_EXTRACT_PROVIDER)
  150. llm_semaphore = asyncio.Semaphore(max_llm_concurrent)
  151. logger.info("refresh llm semaphore limit=%s provider=%s", max_llm_concurrent, NEWS_EXTRACT_PROVIDER)
  152. # Enrich each topic's clusters concurrently.
  153. topic_tasks = []
  154. for t, clusters in clustered_by_topic.items():
  155. if topic and t != topic:
  156. continue
  157. # Determine how many clusters to LLM-enrich.
  158. # ENRICHMENT_MAX_PER_REFRESH=0 means enrich every cluster (no cap).
  159. enrich_limit = ENRICHMENT_MAX_PER_REFRESH or len(clusters)
  160. topic_tasks.append(
  161. _enrich_topic_clusters(
  162. clusters=clusters,
  163. topic=t,
  164. semaphore=llm_semaphore,
  165. store=store,
  166. logger=logger,
  167. enrich_limit=enrich_limit,
  168. )
  169. )
  170. # Run all topic enrichment phases concurrently
  171. topic_results = await asyncio.gather(*topic_tasks, return_exceptions=False)
  172. # Persist enriched clusters grouped by their final topic
  173. for enriched in topic_results:
  174. by_final_topic: Dict[str, list] = {}
  175. for c2 in enriched:
  176. final_topic = str(c2.get("topic") or "other").strip().lower()
  177. if final_topic not in {x.lower() for x in DEFAULT_TOPICS}:
  178. final_topic = "other"
  179. by_final_topic.setdefault(final_topic, []).append(c2)
  180. for final_topic, group in by_final_topic.items():
  181. store.upsert_clusters(group, topic=final_topic)
  182. logger.info("refresh stored topic=%s clusters=%s", final_topic, len(group))
  183. prune_result = store.prune_if_due(
  184. pruning_enabled=NEWS_PRUNING_ENABLED,
  185. retention_days=NEWS_RETENTION_DAYS,
  186. interval_hours=NEWS_PRUNE_INTERVAL_HOURS,
  187. )
  188. for feed_url in changed_feed_urls:
  189. feed_articles = per_feed[feed_url]
  190. material = "\n".join(
  191. f"{a.get('title','')}|{a.get('url','')}|{a.get('timestamp','')}"
  192. for a in feed_articles
  193. )
  194. last_hash = hashlib.sha1(material.encode("utf-8")).hexdigest()
  195. store.set_feed_state(feed_url, last_hash, len(feed_articles))
  196. store.set_meta("last_refresh_at", datetime.now(timezone.utc).isoformat())
  197. logger.info("refresh prune_result=%s", prune_result)