news_feeds.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. from __future__ import annotations
  2. import asyncio
  3. import hashlib
  4. import logging
  5. import re
  6. from typing import Any, Dict, List
  7. from urllib.error import URLError, HTTPError
  8. from urllib.request import Request, urlopen
  9. import feedparser
  10. import httpx
  11. from news_mcp.config import (
  12. NEWS_FEED_ITEMS_PER_POLL,
  13. NEWS_FEED_URL,
  14. NEWS_FEED_URLS,
  15. _NEEDLE_RSS_MAX_CONCURRENCY,
  16. )
  17. logger = logging.getLogger(__name__)
  18. FEED_FETCH_TIMEOUT_SECONDS = 20
  19. def _canonical_url(url: str) -> str:
  20. # Minimal canonicalization for v1.
  21. return url.strip()
  22. def _strip_html(text: str) -> str:
  23. """Remove obvious HTML so downstream summaries stay readable."""
  24. text = re.sub(r"<script.*?</script>", "", text, flags=re.I | re.S)
  25. text = re.sub(r"<style.*?</style>", "", text, flags=re.I | re.S)
  26. text = re.sub(r"<[^>]+>", " ", text)
  27. text = re.sub(r"\s+", " ", text)
  28. return text.strip()
  29. def _feed_urls() -> List[str]:
  30. urls = [u.strip() for u in NEWS_FEED_URLS.split(",") if u.strip()]
  31. if not urls:
  32. urls = [NEWS_FEED_URL]
  33. return urls
  34. def _parse_feed_from_bytes(data: bytes, feed_url: str):
  35. """Parse feed from raw bytes (sync, but fast — just XML parsing)."""
  36. return feedparser.parse(data)
  37. async def _fetch_feed_async(
  38. client: httpx.AsyncClient,
  39. semaphore: asyncio.Semaphore,
  40. feed_url: str,
  41. ) -> tuple[str, bytes | None]:
  42. """Fetch a single RSS feed concurrently. Returns (feed_url, raw_bytes)."""
  43. async with semaphore:
  44. try:
  45. resp = await client.get(feed_url, follow_redirects=True)
  46. resp.raise_for_status()
  47. return (feed_url, resp.content)
  48. except (httpx.HTTPStatusError, httpx.TimeoutException, httpx.ConnectError, OSError) as exc:
  49. logger.exception("news feed fetch failed feed_url=%s error=%s", feed_url, exc)
  50. return (feed_url, None)
  51. except Exception as exc:
  52. logger.exception("news feed fetch unexpected error feed_url=%s error=%s", feed_url, exc)
  53. return (feed_url, None)
  54. def _extract_articles_from_feed(
  55. feed_url: str,
  56. parsed,
  57. per_feed_limit: int,
  58. ) -> List[Dict[str, Any]]:
  59. """Extract article dicts from a parsed feedparser object (sync)."""
  60. articles: List[Dict[str, Any]] = []
  61. feed_name = getattr(parsed.feed, "title", None) or feed_url
  62. parsed_entries = len(getattr(parsed, "entries", []) or [])
  63. logger.info(
  64. "news feed parsed feed_url=%s feed_name=%s entries=%s",
  65. feed_url, feed_name, parsed_entries,
  66. )
  67. kept = 0
  68. for entry in parsed.entries[:per_feed_limit]:
  69. title = str(getattr(entry, "title", "")).strip()
  70. url = _canonical_url(str(getattr(entry, "link", "")).strip())
  71. timestamp = str(getattr(entry, "published", "")) or str(getattr(entry, "updated", ""))
  72. summary = _strip_html(
  73. str(getattr(entry, "summary", "")) or str(getattr(entry, "description", ""))
  74. )
  75. if not title or not url:
  76. continue
  77. articles.append({
  78. "title": title,
  79. "url": url,
  80. "source": str(feed_name),
  81. "feed_url": feed_url,
  82. "timestamp": timestamp,
  83. "summary": summary,
  84. })
  85. kept += 1
  86. logger.info("news feed completed feed_url=%s kept=%s", feed_url, kept)
  87. return articles
  88. async def fetch_news_articles(
  89. limit: int = NEWS_FEED_ITEMS_PER_POLL,
  90. url_list: list[str] | None = None,
  91. ) -> list[dict[str, Any]]:
  92. """Fetch all RSS feeds concurrently, parse, and return articles.
  93. Args:
  94. limit: Maximum number of articles per feed.
  95. url_list: Optional explicit list of feed URLs to fetch.
  96. If None, the configured NEWS_FEED_URLS / NEWS_FEED_URL is used.
  97. """
  98. feed_urls = url_list if url_list is not None else _feed_urls()
  99. per_feed_limit = max(1, int(limit))
  100. logger.info(
  101. "news ingestion start feeds=%s limit=%s timeout_s=%s",
  102. len(feed_urls), per_feed_limit, FEED_FETCH_TIMEOUT_SECONDS,
  103. )
  104. semaphore = asyncio.Semaphore(_NEEDLE_RSS_MAX_CONCURRENCY)
  105. async with httpx.AsyncClient(
  106. timeout=httpx.Timeout(FEED_FETCH_TIMEOUT_SECONDS),
  107. headers={"User-Agent": "news-mcp/1.0"},
  108. ) as client:
  109. tasks = [
  110. _fetch_feed_async(client, semaphore, url)
  111. for url in feed_urls
  112. ]
  113. results = await asyncio.gather(*tasks, return_exceptions=False)
  114. articles: List[Dict[str, Any]] = []
  115. for feed_url, raw in results:
  116. if raw is None:
  117. continue
  118. # feedparser.parse is CPU-light but sync — parse inline (fast enough)
  119. parsed = feedparser.parse(raw)
  120. articles.extend(_extract_articles_from_feed(feed_url, parsed, per_feed_limit))
  121. logger.info("news ingestion complete total_kept=%s", len(articles))
  122. return articles
  123. def normalize_topic_from_title(title: str) -> str:
  124. t = title.lower()
  125. if any(k in t for k in ["btc", "bitcoin", "eth", "ethereum", "crypto"]):
  126. return "crypto"
  127. if any(k in t for k in ["rate", "rates", "inflation", "fed", "treasury", "euro"]):
  128. return "macro"
  129. if any(k in t for k in ["regulation", "sec", "ban", "law"]):
  130. return "regulation"
  131. if any(k in t for k in ["ai", "llm", "model", "openai", "anthropic"]):
  132. return "ai"
  133. return "other"
  134. def cluster_id_for_title(topic: str, title: str) -> str:
  135. import hashlib
  136. key = f"{topic}|{title.strip().lower()}"
  137. return hashlib.sha1(key.encode("utf-8")).hexdigest()