poller.py 3.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. from __future__ import annotations
  2. import logging
  3. from typing import Any, Dict
  4. from news_mcp.config import CLUSTERS_TTL_HOURS, DB_PATH, NEWS_FEED_URL, NEWS_FEED_URLS
  5. from news_mcp.dedup.cluster import dedup_and_cluster_articles
  6. from news_mcp.enrichment.enrich import enrich_cluster
  7. from news_mcp.enrichment.groq_enrich import classify_cluster_groq
  8. from news_mcp.sources.news_feeds import fetch_news_articles
  9. from news_mcp.storage.sqlite_store import SQLiteClusterStore
  10. from news_mcp.config import GROQ_ENRICH_OTHER_ONLY, GROQ_MAX_CLUSTERS_PER_REFRESH
  11. async def refresh_clusters(topic: str | None = None, limit: int = 80) -> None:
  12. logger = logging.getLogger("news_mcp.refresh")
  13. store = SQLiteClusterStore(DB_PATH)
  14. logger.info("refresh start topic=%s limit=%s", topic, limit)
  15. articles = fetch_news_articles(limit=limit)
  16. logger.info("refresh fetched articles=%s", len(articles))
  17. # Skip expensive work if the feed content (titles/urls/timestamps) didn't change.
  18. import hashlib
  19. rss_urls = [u.strip() for u in NEWS_FEED_URLS.split(",") if u.strip()]
  20. if not rss_urls:
  21. rss_urls = [NEWS_FEED_URL]
  22. feed_key = "newsfeeds:" + hashlib.sha1(",".join(rss_urls).encode("utf-8")).hexdigest()
  23. material = "\n".join(
  24. f"{a.get('title','')}|{a.get('url','')}|{a.get('timestamp','')}"
  25. for a in articles
  26. )
  27. last_hash = hashlib.sha1(material.encode("utf-8")).hexdigest()
  28. prev_hash = store.get_feed_hash(feed_key)
  29. if prev_hash == last_hash:
  30. logger.info("refresh unchanged feed_key=%s topic=%s", feed_key, topic)
  31. return
  32. logger.info("refresh changed feed_key=%s topic=%s", feed_key, topic)
  33. store.set_feed_hash(feed_key, last_hash)
  34. clustered_by_topic = dedup_and_cluster_articles(articles)
  35. logger.info("refresh clustered topics=%s", list(clustered_by_topic.keys()))
  36. for t, clusters in clustered_by_topic.items():
  37. if topic and t != topic:
  38. continue
  39. enriched = []
  40. # Always compute cheap enrichment first.
  41. for idx, c in enumerate(clusters[:GROQ_MAX_CLUSTERS_PER_REFRESH]):
  42. c2 = enrich_cluster(c)
  43. # Groq enrichment only when configured.
  44. if (not GROQ_ENRICH_OTHER_ONLY) or (t == "other"):
  45. # Cache Groq: if we already have entities/sentiment for this cluster, skip.
  46. existing = store.get_cluster_by_id(c2.get("cluster_id"))
  47. if existing and existing.get("entities"):
  48. c2 = dict(c2)
  49. # Keep existing enriched fields.
  50. c2["entities"] = existing.get("entities", [])
  51. if existing.get("sentiment"):
  52. c2["sentiment"] = existing.get("sentiment")
  53. if existing.get("sentimentScore") is not None:
  54. c2["sentimentScore"] = existing.get("sentimentScore")
  55. if existing.get("keywords"):
  56. c2["keywords"] = existing.get("keywords")
  57. else:
  58. c2 = await classify_cluster_groq(c2)
  59. enriched.append(c2)
  60. store.upsert_clusters(enriched, topic=t)
  61. logger.info("refresh stored topic=%s clusters=%s", t, len(enriched))