news_feeds.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. from __future__ import annotations
  2. import hashlib
  3. import logging
  4. import re
  5. from typing import Any, Dict, List
  6. from urllib.error import URLError, HTTPError
  7. from urllib.request import Request, urlopen
  8. import feedparser
  9. from news_mcp.config import NEWS_FEED_URL, NEWS_FEED_URLS
  10. logger = logging.getLogger(__name__)
  11. FEED_FETCH_TIMEOUT_SECONDS = 15
  12. def _canonical_url(url: str) -> str:
  13. # Minimal canonicalization for v1.
  14. return url.strip()
  15. def _strip_html(text: str) -> str:
  16. """Remove obvious HTML so downstream summaries stay readable."""
  17. text = re.sub(r"<script.*?</script>", "", text, flags=re.I | re.S)
  18. text = re.sub(r"<style.*?</style>", "", text, flags=re.I | re.S)
  19. text = re.sub(r"<[^>]+>", " ", text)
  20. text = re.sub(r"\s+", " ", text)
  21. return text.strip()
  22. def _feed_urls() -> List[str]:
  23. urls = [u.strip() for u in NEWS_FEED_URLS.split(",") if u.strip()]
  24. if not urls:
  25. urls = [NEWS_FEED_URL]
  26. return urls
  27. def _fetch_feed(feed_url: str):
  28. req = Request(feed_url, headers={"User-Agent": "news-mcp/1.0"})
  29. with urlopen(req, timeout=FEED_FETCH_TIMEOUT_SECONDS) as resp:
  30. return feedparser.parse(resp.read())
  31. def fetch_news_articles(limit: int = 50) -> List[Dict[str, Any]]:
  32. feed_urls = _feed_urls()
  33. articles: List[Dict[str, Any]] = []
  34. logger.info("news ingestion start feeds=%s limit=%s timeout_s=%s", len(feed_urls), limit, FEED_FETCH_TIMEOUT_SECONDS)
  35. # Evenly pull from feeds; keep total below `limit`.
  36. per_feed_limit = max(1, int(limit / max(1, len(feed_urls))))
  37. for feed_url in feed_urls:
  38. try:
  39. feed = _fetch_feed(feed_url)
  40. feed_name = getattr(feed.feed, "title", None) or feed_url
  41. parsed_entries = len(getattr(feed, "entries", []) or [])
  42. logger.info("news feed parsed feed_url=%s feed_name=%s entries=%s", feed_url, feed_name, parsed_entries)
  43. except (HTTPError, URLError, TimeoutError, OSError) as exc:
  44. logger.exception("news feed fetch failed feed_url=%s error=%s", feed_url, exc)
  45. continue
  46. except Exception as exc:
  47. logger.exception("news feed parse failed feed_url=%s error=%s", feed_url, exc)
  48. continue
  49. kept_before = len(articles)
  50. for entry in feed.entries[:per_feed_limit]:
  51. title = str(getattr(entry, "title", "")).strip()
  52. url = _canonical_url(str(getattr(entry, "link", "")).strip())
  53. timestamp = str(getattr(entry, "published", "")) or str(getattr(entry, "updated", ""))
  54. summary = _strip_html(str(getattr(entry, "summary", "")) or str(getattr(entry, "description", "")))
  55. if not title or not url:
  56. continue
  57. articles.append(
  58. {
  59. "title": title,
  60. "url": url,
  61. "source": str(feed_name),
  62. "feed_url": feed_url,
  63. "timestamp": timestamp,
  64. "summary": summary,
  65. }
  66. )
  67. if len(articles) >= limit:
  68. logger.info("news ingestion limit reached feed_url=%s total_kept=%s", feed_url, len(articles))
  69. return articles
  70. logger.info(
  71. "news feed completed feed_url=%s kept=%s",
  72. feed_url,
  73. len(articles) - kept_before,
  74. )
  75. logger.info("news ingestion complete total_kept=%s", len(articles))
  76. return articles
  77. def normalize_topic_from_title(title: str) -> str:
  78. t = title.lower()
  79. if any(k in t for k in ["btc", "bitcoin", "eth", "ethereum", "crypto"]):
  80. return "crypto"
  81. if any(k in t for k in ["rate", "rates", "inflation", "fed", "treasury", "euro"]):
  82. return "macro"
  83. if any(k in t for k in ["regulation", "sec", "ban", "law"]):
  84. return "regulation"
  85. if any(k in t for k in ["ai", "llm", "model", "openai", "anthropic"]):
  86. return "ai"
  87. return "other"
  88. def cluster_id_for_title(topic: str, title: str) -> str:
  89. key = f"{topic}|{title.strip().lower()}"
  90. return hashlib.sha1(key.encode("utf-8")).hexdigest()