poller.py 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
  1. from __future__ import annotations
  2. from typing import Any, Dict
  3. from news_mcp.config import CLUSTERS_TTL_HOURS, DB_PATH, RSS_FEED_URL, RSS_FEED_URLS
  4. from news_mcp.dedup.cluster import dedup_and_cluster_articles
  5. from news_mcp.enrichment.enrich import enrich_cluster
  6. from news_mcp.enrichment.groq_enrich import classify_cluster_groq
  7. from news_mcp.sources.rss_breakingthenews import fetch_breakingthenews_articles
  8. from news_mcp.storage.sqlite_store import SQLiteClusterStore
  9. from news_mcp.config import GROQ_ENRICH_OTHER_ONLY, GROQ_MAX_CLUSTERS_PER_REFRESH
  10. async def refresh_clusters(topic: str | None = None, limit: int = 80) -> None:
  11. store = SQLiteClusterStore(DB_PATH)
  12. articles = fetch_breakingthenews_articles(limit=limit)
  13. # Skip expensive work if the feed content (titles/urls/timestamps) didn't change.
  14. import hashlib
  15. rss_urls = [u.strip() for u in RSS_FEED_URLS.split(",") if u.strip()]
  16. if not rss_urls:
  17. rss_urls = [RSS_FEED_URL]
  18. feed_key = "breakingthenews:" + hashlib.sha1(",".join(rss_urls).encode("utf-8")).hexdigest()
  19. material = "\n".join(
  20. f"{a.get('title','')}|{a.get('url','')}|{a.get('timestamp','')}"
  21. for a in articles
  22. )
  23. last_hash = hashlib.sha1(material.encode("utf-8")).hexdigest()
  24. prev_hash = store.get_feed_hash(feed_key)
  25. if prev_hash == last_hash:
  26. return
  27. store.set_feed_hash(feed_key, last_hash)
  28. clustered_by_topic = dedup_and_cluster_articles(articles)
  29. for t, clusters in clustered_by_topic.items():
  30. if topic and t != topic:
  31. continue
  32. enriched = []
  33. # Always compute cheap enrichment first.
  34. for idx, c in enumerate(clusters[:GROQ_MAX_CLUSTERS_PER_REFRESH]):
  35. c2 = enrich_cluster(c)
  36. # Groq enrichment only when configured.
  37. if (not GROQ_ENRICH_OTHER_ONLY) or (t == "other"):
  38. # Cache Groq: if we already have entities/sentiment for this cluster, skip.
  39. existing = store.get_cluster_by_id(c2.get("cluster_id"))
  40. if existing and existing.get("entities"):
  41. c2 = dict(c2)
  42. # Keep existing enriched fields.
  43. c2["entities"] = existing.get("entities", [])
  44. if existing.get("sentiment"):
  45. c2["sentiment"] = existing.get("sentiment")
  46. if existing.get("sentimentScore") is not None:
  47. c2["sentimentScore"] = existing.get("sentimentScore")
  48. if existing.get("keywords"):
  49. c2["keywords"] = existing.get("keywords")
  50. else:
  51. c2 = await classify_cluster_groq(c2)
  52. enriched.append(c2)
  53. store.upsert_clusters(enriched, topic=t)