from __future__ import annotations import hashlib import logging import re from typing import Any, Dict, List from urllib.error import URLError, HTTPError from urllib.request import Request, urlopen import feedparser from news_mcp.config import NEWS_FEED_URL, NEWS_FEED_URLS logger = logging.getLogger(__name__) FEED_FETCH_TIMEOUT_SECONDS = 15 def _canonical_url(url: str) -> str: # Minimal canonicalization for v1. return url.strip() def _strip_html(text: str) -> str: """Remove obvious HTML so downstream summaries stay readable.""" text = re.sub(r"", "", text, flags=re.I | re.S) text = re.sub(r"", "", text, flags=re.I | re.S) text = re.sub(r"<[^>]+>", " ", text) text = re.sub(r"\s+", " ", text) return text.strip() def _feed_urls() -> List[str]: urls = [u.strip() for u in NEWS_FEED_URLS.split(",") if u.strip()] if not urls: urls = [NEWS_FEED_URL] return urls def _fetch_feed(feed_url: str): req = Request(feed_url, headers={"User-Agent": "news-mcp/1.0"}) with urlopen(req, timeout=FEED_FETCH_TIMEOUT_SECONDS) as resp: return feedparser.parse(resp.read()) def fetch_news_articles(limit: int = 50) -> List[Dict[str, Any]]: feed_urls = _feed_urls() articles: List[Dict[str, Any]] = [] logger.info("news ingestion start feeds=%s limit=%s timeout_s=%s", len(feed_urls), limit, FEED_FETCH_TIMEOUT_SECONDS) # Evenly pull from feeds; keep total below `limit`. per_feed_limit = max(1, int(limit / max(1, len(feed_urls)))) for feed_url in feed_urls: try: feed = _fetch_feed(feed_url) feed_name = getattr(feed.feed, "title", None) or feed_url parsed_entries = len(getattr(feed, "entries", []) or []) logger.info("news feed parsed feed_url=%s feed_name=%s entries=%s", feed_url, feed_name, parsed_entries) except (HTTPError, URLError, TimeoutError, OSError) as exc: logger.exception("news feed fetch failed feed_url=%s error=%s", feed_url, exc) continue except Exception as exc: logger.exception("news feed parse failed feed_url=%s error=%s", feed_url, exc) continue kept_before = len(articles) for entry in feed.entries[:per_feed_limit]: title = str(getattr(entry, "title", "")).strip() url = _canonical_url(str(getattr(entry, "link", "")).strip()) timestamp = str(getattr(entry, "published", "")) or str(getattr(entry, "updated", "")) summary = _strip_html(str(getattr(entry, "summary", "")) or str(getattr(entry, "description", ""))) if not title or not url: continue articles.append( { "title": title, "url": url, "source": str(feed_name), "feed_url": feed_url, "timestamp": timestamp, "summary": summary, } ) if len(articles) >= limit: logger.info("news ingestion limit reached feed_url=%s total_kept=%s", feed_url, len(articles)) return articles logger.info( "news feed completed feed_url=%s kept=%s", feed_url, len(articles) - kept_before, ) logger.info("news ingestion complete total_kept=%s", len(articles)) return articles def normalize_topic_from_title(title: str) -> str: t = title.lower() if any(k in t for k in ["btc", "bitcoin", "eth", "ethereum", "crypto"]): return "crypto" if any(k in t for k in ["rate", "rates", "inflation", "fed", "treasury", "euro"]): return "macro" if any(k in t for k in ["regulation", "sec", "ban", "law"]): return "regulation" if any(k in t for k in ["ai", "llm", "model", "openai", "anthropic"]): return "ai" return "other" def cluster_id_for_title(topic: str, title: str) -> str: key = f"{topic}|{title.strip().lower()}" return hashlib.sha1(key.encode("utf-8")).hexdigest()