news_feeds.py 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. from __future__ import annotations
  2. import hashlib
  3. import logging
  4. import re
  5. from typing import Any, Dict, List
  6. import feedparser
  7. from news_mcp.config import NEWS_FEED_URL, NEWS_FEED_URLS
  8. logger = logging.getLogger(__name__)
  9. def _canonical_url(url: str) -> str:
  10. # Minimal canonicalization for v1.
  11. return url.strip()
  12. def _strip_html(text: str) -> str:
  13. """Remove obvious HTML so downstream summaries stay readable."""
  14. text = re.sub(r"<script.*?</script>", "", text, flags=re.I | re.S)
  15. text = re.sub(r"<style.*?</style>", "", text, flags=re.I | re.S)
  16. text = re.sub(r"<[^>]+>", " ", text)
  17. text = re.sub(r"\s+", " ", text)
  18. return text.strip()
  19. def _feed_urls() -> List[str]:
  20. urls = [u.strip() for u in NEWS_FEED_URLS.split(",") if u.strip()]
  21. if not urls:
  22. urls = [NEWS_FEED_URL]
  23. return urls
  24. def fetch_news_articles(limit: int = 50) -> List[Dict[str, Any]]:
  25. feed_urls = _feed_urls()
  26. articles: List[Dict[str, Any]] = []
  27. logger.info("news ingestion start feeds=%s limit=%s", len(feed_urls), limit)
  28. # Evenly pull from feeds; keep total below `limit`.
  29. per_feed_limit = max(1, int(limit / max(1, len(feed_urls))))
  30. for feed_url in feed_urls:
  31. feed = feedparser.parse(feed_url)
  32. feed_name = getattr(feed.feed, "title", None) or feed_url
  33. parsed_entries = len(getattr(feed, "entries", []) or [])
  34. logger.info("news feed parsed feed_url=%s feed_name=%s entries=%s", feed_url, feed_name, parsed_entries)
  35. kept_before = len(articles)
  36. for entry in feed.entries[:per_feed_limit]:
  37. title = str(getattr(entry, "title", "")).strip()
  38. url = _canonical_url(str(getattr(entry, "link", "")).strip())
  39. timestamp = str(getattr(entry, "published", "")) or str(getattr(entry, "updated", ""))
  40. summary = _strip_html(str(getattr(entry, "summary", "")) or str(getattr(entry, "description", "")))
  41. if not title or not url:
  42. continue
  43. articles.append(
  44. {
  45. "title": title,
  46. "url": url,
  47. "source": str(feed_name),
  48. "feed_url": feed_url,
  49. "timestamp": timestamp,
  50. "summary": summary,
  51. }
  52. )
  53. if len(articles) >= limit:
  54. logger.info("news ingestion limit reached feed_url=%s total_kept=%s", feed_url, len(articles))
  55. return articles
  56. logger.info(
  57. "news feed completed feed_url=%s kept=%s",
  58. feed_url,
  59. len(articles) - kept_before,
  60. )
  61. logger.info("news ingestion complete total_kept=%s", len(articles))
  62. return articles
  63. def normalize_topic_from_title(title: str) -> str:
  64. t = title.lower()
  65. if any(k in t for k in ["btc", "bitcoin", "eth", "ethereum", "crypto"]):
  66. return "crypto"
  67. if any(k in t for k in ["rate", "rates", "inflation", "fed", "treasury", "euro"]):
  68. return "macro"
  69. if any(k in t for k in ["regulation", "sec", "ban", "law"]):
  70. return "regulation"
  71. if any(k in t for k in ["ai", "llm", "model", "openai", "anthropic"]):
  72. return "ai"
  73. return "other"
  74. def cluster_id_for_title(topic: str, title: str) -> str:
  75. key = f"{topic}|{title.strip().lower()}"
  76. return hashlib.sha1(key.encode("utf-8")).hexdigest()