news_feeds.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. from __future__ import annotations
  2. import asyncio
  3. import hashlib
  4. import logging
  5. import re
  6. from typing import Any, Dict, List
  7. from urllib.error import URLError, HTTPError
  8. from urllib.request import Request, urlopen
  9. import feedparser
  10. import httpx
  11. from news_mcp.config import (
  12. NEWS_FEED_ITEMS_PER_POLL,
  13. NEWS_FEED_URL,
  14. NEWS_FEED_URLS,
  15. _NEEDLE_RSS_MAX_CONCURRENCY,
  16. )
  17. logger = logging.getLogger(__name__)
  18. FEED_FETCH_TIMEOUT_SECONDS = 20
  19. def _canonical_url(url: str) -> str:
  20. # Minimal canonicalization for v1.
  21. return url.strip()
  22. def _strip_html(text: str) -> str:
  23. """Remove obvious HTML so downstream summaries stay readable."""
  24. text = re.sub(r"<script.*?</script>", "", text, flags=re.I | re.S)
  25. text = re.sub(r"<style.*?</style>", "", text, flags=re.I | re.S)
  26. text = re.sub(r"<[^>]+>", " ", text)
  27. text = re.sub(r"\s+", " ", text)
  28. return text.strip()
  29. def _feed_urls() -> List[str]:
  30. urls = [u.strip() for u in NEWS_FEED_URLS.split(",") if u.strip()]
  31. if not urls:
  32. urls = [NEWS_FEED_URL]
  33. return urls
  34. def _parse_feed_from_bytes(data: bytes, feed_url: str):
  35. """Parse feed from raw bytes (sync, but fast — just XML parsing)."""
  36. return feedparser.parse(data)
  37. async def _fetch_feed_async(
  38. client: httpx.AsyncClient,
  39. semaphore: asyncio.Semaphore,
  40. feed_url: str,
  41. ) -> tuple[str, bytes | None]:
  42. """Fetch a single RSS feed concurrently. Returns (feed_url, raw_bytes)."""
  43. async with semaphore:
  44. try:
  45. resp = await client.get(feed_url, follow_redirects=True)
  46. resp.raise_for_status()
  47. return (feed_url, resp.content)
  48. except (httpx.HTTPStatusError, httpx.TimeoutException, httpx.ConnectError, OSError) as exc:
  49. logger.exception("news feed fetch failed feed_url=%s error=%s", feed_url, exc)
  50. return (feed_url, None)
  51. except Exception as exc:
  52. logger.exception("news feed fetch unexpected error feed_url=%s error=%s", feed_url, exc)
  53. return (feed_url, None)
  54. def _extract_articles_from_feed(
  55. feed_url: str,
  56. parsed,
  57. per_feed_limit: int,
  58. ) -> List[Dict[str, Any]]:
  59. """Extract article dicts from a parsed feedparser object (sync)."""
  60. articles: List[Dict[str, Any]] = []
  61. feed_name = getattr(parsed.feed, "title", None) or feed_url
  62. parsed_entries = len(getattr(parsed, "entries", []) or [])
  63. logger.info(
  64. "news feed parsed feed_url=%s feed_name=%s entries=%s",
  65. feed_url, feed_name, parsed_entries,
  66. )
  67. kept = 0
  68. for entry in parsed.entries[:per_feed_limit]:
  69. title = str(getattr(entry, "title", "")).strip()
  70. url = _canonical_url(str(getattr(entry, "link", "")).strip())
  71. timestamp = str(getattr(entry, "published", "")) or str(getattr(entry, "updated", ""))
  72. summary = _strip_html(
  73. str(getattr(entry, "summary", "")) or str(getattr(entry, "description", ""))
  74. )
  75. if not title or not url:
  76. continue
  77. articles.append({
  78. "title": title,
  79. "url": url,
  80. "source": str(feed_name),
  81. "feed_url": feed_url,
  82. "timestamp": timestamp,
  83. "summary": summary,
  84. })
  85. kept += 1
  86. logger.info("news feed completed feed_url=%s kept=%s", feed_url, kept)
  87. return articles
  88. async def fetch_news_articles(limit: int = NEWS_FEED_ITEMS_PER_POLL) -> List[Dict[str, Any]]:
  89. """Fetch all RSS feeds concurrently, parse, and return articles."""
  90. feed_urls = _feed_urls()
  91. per_feed_limit = max(1, int(limit))
  92. logger.info(
  93. "news ingestion start feeds=%s limit=%s timeout_s=%s",
  94. len(feed_urls), per_feed_limit, FEED_FETCH_TIMEOUT_SECONDS,
  95. )
  96. semaphore = asyncio.Semaphore(_NEEDLE_RSS_MAX_CONCURRENCY)
  97. async with httpx.AsyncClient(
  98. timeout=httpx.Timeout(FEED_FETCH_TIMEOUT_SECONDS),
  99. headers={"User-Agent": "news-mcp/1.0"},
  100. ) as client:
  101. tasks = [
  102. _fetch_feed_async(client, semaphore, url)
  103. for url in feed_urls
  104. ]
  105. results = await asyncio.gather(*tasks, return_exceptions=False)
  106. articles: List[Dict[str, Any]] = []
  107. for feed_url, raw in results:
  108. if raw is None:
  109. continue
  110. # feedparser.parse is CPU-light but sync — parse inline (fast enough)
  111. parsed = feedparser.parse(raw)
  112. articles.extend(_extract_articles_from_feed(feed_url, parsed, per_feed_limit))
  113. logger.info("news ingestion complete total_kept=%s", len(articles))
  114. return articles
  115. def normalize_topic_from_title(title: str) -> str:
  116. t = title.lower()
  117. if any(k in t for k in ["btc", "bitcoin", "eth", "ethereum", "crypto"]):
  118. return "crypto"
  119. if any(k in t for k in ["rate", "rates", "inflation", "fed", "treasury", "euro"]):
  120. return "macro"
  121. if any(k in t for k in ["regulation", "sec", "ban", "law"]):
  122. return "regulation"
  123. if any(k in t for k in ["ai", "llm", "model", "openai", "anthropic"]):
  124. return "ai"
  125. return "other"
  126. def cluster_id_for_title(topic: str, title: str) -> str:
  127. import hashlib
  128. key = f"{topic}|{title.strip().lower()}"
  129. return hashlib.sha1(key.encode("utf-8")).hexdigest()