poller.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. from __future__ import annotations
  2. import logging
  3. from typing import Any, Dict
  4. from news_mcp.config import DEFAULT_LOOKBACK_HOURS, DB_PATH, NEWS_FEED_URL, NEWS_FEED_URLS
  5. from news_mcp.dedup.cluster import dedup_and_cluster_articles
  6. from news_mcp.enrichment.enrich import enrich_cluster
  7. from news_mcp.enrichment.llm_enrich import classify_cluster_llm
  8. from news_mcp.trends_resolution import resolve_entity_via_trends
  9. from news_mcp.sources.news_feeds import fetch_news_articles
  10. from news_mcp.storage.sqlite_store import SQLiteClusterStore
  11. from news_mcp.config import (
  12. GROQ_ENRICH_OTHER_ONLY,
  13. GROQ_MAX_CLUSTERS_PER_REFRESH,
  14. NEWS_PRUNE_INTERVAL_HOURS,
  15. NEWS_PRUNING_ENABLED,
  16. NEWS_RETENTION_DAYS,
  17. )
  18. async def refresh_clusters(topic: str | None = None, limit: int = 80) -> None:
  19. logger = logging.getLogger("news_mcp.refresh")
  20. store = SQLiteClusterStore(DB_PATH)
  21. logger.info("refresh start topic=%s limit=%s", topic, limit)
  22. articles = fetch_news_articles(limit=limit)
  23. logger.info("refresh fetched articles=%s", len(articles))
  24. # Skip expensive work if the feed content (titles/urls/timestamps) didn't change.
  25. import hashlib
  26. rss_urls = [u.strip() for u in NEWS_FEED_URLS.split(",") if u.strip()]
  27. if not rss_urls:
  28. rss_urls = [NEWS_FEED_URL]
  29. feed_key = "newsfeeds:" + hashlib.sha1(",".join(rss_urls).encode("utf-8")).hexdigest()
  30. material = "\n".join(
  31. f"{a.get('title','')}|{a.get('url','')}|{a.get('timestamp','')}"
  32. for a in articles
  33. )
  34. last_hash = hashlib.sha1(material.encode("utf-8")).hexdigest()
  35. prev_hash = store.get_feed_hash(feed_key)
  36. if prev_hash == last_hash:
  37. logger.info("refresh unchanged feed_key=%s topic=%s", feed_key, topic)
  38. prune_result = store.prune_if_due(
  39. pruning_enabled=NEWS_PRUNING_ENABLED,
  40. retention_days=NEWS_RETENTION_DAYS,
  41. interval_hours=NEWS_PRUNE_INTERVAL_HOURS,
  42. )
  43. logger.info("refresh prune_result=%s", prune_result)
  44. return
  45. logger.info("refresh changed feed_key=%s topic=%s", feed_key, topic)
  46. store.set_feed_hash(feed_key, last_hash)
  47. clustered_by_topic = dedup_and_cluster_articles(articles)
  48. logger.info("refresh clustered topics=%s", list(clustered_by_topic.keys()))
  49. for t, clusters in clustered_by_topic.items():
  50. if topic and t != topic:
  51. continue
  52. enriched = []
  53. # Always compute cheap enrichment first.
  54. for idx, c in enumerate(clusters[:GROQ_MAX_CLUSTERS_PER_REFRESH]):
  55. c2 = enrich_cluster(c)
  56. # Groq enrichment only when configured.
  57. if (not GROQ_ENRICH_OTHER_ONLY) or (t == "other"):
  58. # Cache Groq: if we already have entities/sentiment for this cluster, skip.
  59. existing = store.get_cluster_by_id(c2.get("cluster_id"))
  60. if existing and existing.get("entities"):
  61. c2 = dict(c2)
  62. # Keep existing enriched fields.
  63. c2["entities"] = existing.get("entities", [])
  64. # IMPORTANT: entityResolutions must stay consistent with entities.
  65. # Older rows may have entities but missing/malformed resolutions.
  66. existing_resolutions = existing.get("entityResolutions", None)
  67. if isinstance(existing_resolutions, list) and existing_resolutions:
  68. c2["entityResolutions"] = existing_resolutions
  69. else:
  70. # Recompute resolutions deterministically from the stored entities.
  71. c2["entityResolutions"] = [resolve_entity_via_trends(e) for e in c2["entities"]]
  72. if existing.get("sentiment"):
  73. c2["sentiment"] = existing.get("sentiment")
  74. if existing.get("sentimentScore") is not None:
  75. c2["sentimentScore"] = existing.get("sentimentScore")
  76. if existing.get("keywords"):
  77. c2["keywords"] = existing.get("keywords")
  78. else:
  79. c2 = await classify_cluster_llm(c2)
  80. enriched.append(c2)
  81. store.upsert_clusters(enriched, topic=t)
  82. logger.info("refresh stored topic=%s clusters=%s", t, len(enriched))
  83. prune_result = store.prune_if_due(
  84. pruning_enabled=NEWS_PRUNING_ENABLED,
  85. retention_days=NEWS_RETENTION_DAYS,
  86. interval_hours=NEWS_PRUNE_INTERVAL_HOURS,
  87. )
  88. logger.info("refresh prune_result=%s", prune_result)