from __future__ import annotations import asyncio import hashlib import logging import re from typing import Any, Dict, List from urllib.error import URLError, HTTPError from urllib.request import Request, urlopen import feedparser import httpx from news_mcp.config import ( NEWS_FEED_ITEMS_PER_POLL, NEWS_FEED_URL, NEWS_FEED_URLS, _NEEDLE_RSS_MAX_CONCURRENCY, ) logger = logging.getLogger(__name__) FEED_FETCH_TIMEOUT_SECONDS = 20 def _canonical_url(url: str) -> str: # Minimal canonicalization for v1. return url.strip() def _strip_html(text: str) -> str: """Remove obvious HTML so downstream summaries stay readable.""" text = re.sub(r"", "", text, flags=re.I | re.S) text = re.sub(r"", "", text, flags=re.I | re.S) text = re.sub(r"<[^>]+>", " ", text) text = re.sub(r"\s+", " ", text) return text.strip() def _feed_urls() -> List[str]: urls = [u.strip() for u in NEWS_FEED_URLS.split(",") if u.strip()] if not urls: urls = [NEWS_FEED_URL] return urls def _parse_feed_from_bytes(data: bytes, feed_url: str): """Parse feed from raw bytes (sync, but fast — just XML parsing).""" return feedparser.parse(data) async def _fetch_feed_async( client: httpx.AsyncClient, semaphore: asyncio.Semaphore, feed_url: str, ) -> tuple[str, bytes | None]: """Fetch a single RSS feed concurrently. Returns (feed_url, raw_bytes).""" async with semaphore: try: resp = await client.get(feed_url, follow_redirects=True) resp.raise_for_status() return (feed_url, resp.content) except (httpx.HTTPStatusError, httpx.TimeoutException, httpx.ConnectError, OSError) as exc: logger.exception("news feed fetch failed feed_url=%s error=%s", feed_url, exc) return (feed_url, None) except Exception as exc: logger.exception("news feed fetch unexpected error feed_url=%s error=%s", feed_url, exc) return (feed_url, None) def _extract_articles_from_feed( feed_url: str, parsed, per_feed_limit: int, ) -> List[Dict[str, Any]]: """Extract article dicts from a parsed feedparser object (sync).""" articles: List[Dict[str, Any]] = [] feed_name = getattr(parsed.feed, "title", None) or feed_url parsed_entries = len(getattr(parsed, "entries", []) or []) logger.info( "news feed parsed feed_url=%s feed_name=%s entries=%s", feed_url, feed_name, parsed_entries, ) kept = 0 for entry in parsed.entries[:per_feed_limit]: title = str(getattr(entry, "title", "")).strip() url = _canonical_url(str(getattr(entry, "link", "")).strip()) timestamp = str(getattr(entry, "published", "")) or str(getattr(entry, "updated", "")) summary = _strip_html( str(getattr(entry, "summary", "")) or str(getattr(entry, "description", "")) ) if not title or not url: continue articles.append({ "title": title, "url": url, "source": str(feed_name), "feed_url": feed_url, "timestamp": timestamp, "summary": summary, }) kept += 1 logger.info("news feed completed feed_url=%s kept=%s", feed_url, kept) return articles async def fetch_news_articles( limit: int = NEWS_FEED_ITEMS_PER_POLL, url_list: list[str] | None = None, ) -> list[dict[str, Any]]: """Fetch all RSS feeds concurrently, parse, and return articles. Args: limit: Maximum number of articles per feed. url_list: Optional explicit list of feed URLs to fetch. If None, the configured NEWS_FEED_URLS / NEWS_FEED_URL is used. """ feed_urls = url_list if url_list is not None else _feed_urls() per_feed_limit = max(1, int(limit)) logger.info( "news ingestion start feeds=%s limit=%s timeout_s=%s", len(feed_urls), per_feed_limit, FEED_FETCH_TIMEOUT_SECONDS, ) semaphore = asyncio.Semaphore(_NEEDLE_RSS_MAX_CONCURRENCY) async with httpx.AsyncClient( timeout=httpx.Timeout(FEED_FETCH_TIMEOUT_SECONDS), headers={"User-Agent": "news-mcp/1.0"}, ) as client: tasks = [ _fetch_feed_async(client, semaphore, url) for url in feed_urls ] results = await asyncio.gather(*tasks, return_exceptions=False) articles: List[Dict[str, Any]] = [] for feed_url, raw in results: if raw is None: continue # feedparser.parse is CPU-light but sync — parse inline (fast enough) parsed = feedparser.parse(raw) articles.extend(_extract_articles_from_feed(feed_url, parsed, per_feed_limit)) logger.info("news ingestion complete total_kept=%s", len(articles)) return articles def normalize_topic_from_title(title: str) -> str: t = title.lower() if any(k in t for k in ["btc", "bitcoin", "eth", "ethereum", "crypto"]): return "crypto" if any(k in t for k in ["rate", "rates", "inflation", "fed", "treasury", "euro"]): return "macro" if any(k in t for k in ["regulation", "sec", "ban", "law"]): return "regulation" if any(k in t for k in ["ai", "llm", "model", "openai", "anthropic"]): return "ai" return "other" def cluster_id_for_title(topic: str, title: str) -> str: import hashlib key = f"{topic}|{title.strip().lower()}" return hashlib.sha1(key.encode("utf-8")).hexdigest()