poller.py 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. from __future__ import annotations
  2. import asyncio
  3. import logging
  4. from collections import defaultdict
  5. from datetime import datetime, timezone
  6. from typing import Any, Dict
  7. from news_mcp.config import DEFAULT_LOOKBACK_HOURS, DEFAULT_TOPICS, DB_PATH, NEWS_FEED_URL, NEWS_FEED_URLS
  8. from news_mcp.dedup.cluster import dedup_and_cluster_articles
  9. from news_mcp.enrichment.enrich import enrich_cluster
  10. from news_mcp.enrichment.llm_enrich import classify_cluster_llm
  11. from news_mcp.trends_resolution import resolve_entity_via_trends
  12. from news_mcp.sources.news_feeds import fetch_news_articles
  13. from news_mcp.storage.sqlite_store import SQLiteClusterStore
  14. from news_mcp.config import (
  15. ENRICH_OTHER_TOPICS_ONLY,
  16. ENRICHMENT_MAX_PER_REFRESH,
  17. NEWS_PRUNE_INTERVAL_HOURS,
  18. NEWS_PRUNING_ENABLED,
  19. NEWS_RETENTION_DAYS,
  20. )
  21. async def refresh_clusters(topic: str | None = None, limit: int = 80) -> None:
  22. logger = logging.getLogger("news_mcp.refresh")
  23. store = SQLiteClusterStore(DB_PATH)
  24. logger.info("refresh start topic=%s limit=%s", topic, limit)
  25. articles = await asyncio.to_thread(fetch_news_articles, limit)
  26. logger.info("refresh fetched articles=%s", len(articles))
  27. # Drop legacy aggregate feed-state rows so the dashboard only reflects
  28. # real per-feed poll status from this point forward.
  29. with store._conn() as conn:
  30. conn.execute("DELETE FROM feed_state WHERE feed_key LIKE 'newsfeeds:%'")
  31. # Track feed freshness per RSS URL so unchanged feeds can be skipped.
  32. import hashlib
  33. per_feed: dict[str, list[dict[str, Any]]] = defaultdict(list)
  34. for article in articles:
  35. feed_url = str(article.get("feed_url") or NEWS_FEED_URL).strip() or NEWS_FEED_URL
  36. per_feed[feed_url].append(article)
  37. changed_articles: list[dict[str, Any]] = []
  38. changed_feed_urls: list[str] = []
  39. for feed_url, feed_articles in per_feed.items():
  40. material = "\n".join(
  41. f"{a.get('title','')}|{a.get('url','')}|{a.get('timestamp','')}"
  42. for a in feed_articles
  43. )
  44. last_hash = hashlib.sha1(material.encode("utf-8")).hexdigest()
  45. feed_key = feed_url
  46. prev_hash = store.get_feed_hash(feed_key)
  47. if prev_hash == last_hash:
  48. logger.info("refresh unchanged feed_url=%s count=%s topic=%s", feed_url, len(feed_articles), topic)
  49. else:
  50. logger.info("refresh changed feed_url=%s count=%s topic=%s", feed_url, len(feed_articles), topic)
  51. changed_feed_urls.append(feed_url)
  52. changed_articles.extend(feed_articles)
  53. if not changed_articles:
  54. logger.info("refresh unchanged all feeds topic=%s", topic)
  55. store.set_meta("last_refresh_at", datetime.now(timezone.utc).isoformat())
  56. prune_result = store.prune_if_due(
  57. pruning_enabled=NEWS_PRUNING_ENABLED,
  58. retention_days=NEWS_RETENTION_DAYS,
  59. interval_hours=NEWS_PRUNE_INTERVAL_HOURS,
  60. )
  61. logger.info("refresh prune_result=%s", prune_result)
  62. return
  63. articles = changed_articles
  64. clustered_by_topic = dedup_and_cluster_articles(articles)
  65. logger.info("refresh clustered topics=%s", list(clustered_by_topic.keys()))
  66. for t, clusters in clustered_by_topic.items():
  67. if topic and t != topic:
  68. continue
  69. enriched = []
  70. # Determine how many clusters to LLM-enrich.
  71. # ENRICHMENT_MAX_PER_REFRESH=0 means enrich every cluster (no cap).
  72. enrich_limit = ENRICHMENT_MAX_PER_REFRESH or len(clusters)
  73. # Track whether the LLM pipeline is available for this topic.
  74. _llm_enabled_for_topic = (
  75. (not ENRICH_OTHER_TOPICS_ONLY) or (t == "other")
  76. )
  77. for idx, c in enumerate(clusters[:enrich_limit]):
  78. c2 = enrich_cluster(c)
  79. # Seed the heuristic topic on the payload so classify_cluster_llm
  80. # has a sane fallback if the LLM omits or hallucinates one.
  81. c2.setdefault("topic", t)
  82. if _llm_enabled_for_topic:
  83. # Cache: if we already have entities/sentiment for this cluster, skip LLM call.
  84. existing = store.get_cluster_by_id(c2.get("cluster_id"))
  85. if existing and existing.get("entities"):
  86. c2 = dict(c2)
  87. # Keep existing enriched fields.
  88. c2["entities"] = existing.get("entities", [])
  89. # IMPORTANT: entityResolutions must stay consistent with entities.
  90. # Older rows may have entities but missing/malformed resolutions.
  91. existing_resolutions = existing.get("entityResolutions", None)
  92. if isinstance(existing_resolutions, list) and existing_resolutions:
  93. c2["entityResolutions"] = existing_resolutions
  94. else:
  95. # Recompute resolutions deterministically from the stored entities.
  96. c2["entityResolutions"] = [resolve_entity_via_trends(e) for e in c2["entities"]]
  97. if existing.get("sentiment"):
  98. c2["sentiment"] = existing.get("sentiment")
  99. if existing.get("sentimentScore") is not None:
  100. c2["sentimentScore"] = existing.get("sentimentScore")
  101. if existing.get("keywords"):
  102. c2["keywords"] = existing.get("keywords")
  103. # Preserve a previously-classified topic so we don't drift back
  104. # to the heuristic on cache hits.
  105. if existing.get("topic"):
  106. c2["topic"] = existing.get("topic")
  107. else:
  108. try:
  109. c2 = await classify_cluster_llm(c2)
  110. except Exception:
  111. logger.exception("LLM enrichment failed for cluster %s (topic %s)", c2.get("cluster_id"), t)
  112. # Mark so we can retry on next refresh.
  113. c2["enrichment_failed_at"] = datetime.now(timezone.utc).isoformat()
  114. enriched.append(c2)
  115. # Persist clusters under their *post-enrichment* topic so the SQL row
  116. # column matches what the LLM (or the validated heuristic fallback)
  117. # actually decided. Previously, every cluster from this bucket was
  118. # forced into the heuristic topic `t`, which caused a ~97% mismatch
  119. # between row-column topic and payload topic.
  120. by_final_topic: Dict[str, list] = {}
  121. for c2 in enriched:
  122. final_topic = str(c2.get("topic") or t or "other").strip().lower()
  123. if final_topic not in {x.lower() for x in DEFAULT_TOPICS}:
  124. final_topic = "other"
  125. by_final_topic.setdefault(final_topic, []).append(c2)
  126. for final_topic, group in by_final_topic.items():
  127. store.upsert_clusters(group, topic=final_topic)
  128. logger.info("refresh stored topic=%s clusters=%s (heuristic_topic=%s)", final_topic, len(group), t)
  129. prune_result = store.prune_if_due(
  130. pruning_enabled=NEWS_PRUNING_ENABLED,
  131. retention_days=NEWS_RETENTION_DAYS,
  132. interval_hours=NEWS_PRUNE_INTERVAL_HOURS,
  133. )
  134. for feed_url in changed_feed_urls:
  135. feed_articles = per_feed[feed_url]
  136. material = "\n".join(
  137. f"{a.get('title','')}|{a.get('url','')}|{a.get('timestamp','')}"
  138. for a in feed_articles
  139. )
  140. last_hash = hashlib.sha1(material.encode("utf-8")).hexdigest()
  141. store.set_feed_state(feed_url, last_hash, len(feed_articles))
  142. store.set_meta("last_refresh_at", datetime.now(timezone.utc).isoformat())
  143. logger.info("refresh prune_result=%s", prune_result)