| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161 |
- from __future__ import annotations
- import asyncio
- import hashlib
- import logging
- import re
- from typing import Any, Dict, List
- from urllib.error import URLError, HTTPError
- from urllib.request import Request, urlopen
- import feedparser
- import httpx
- from news_mcp.config import (
- NEWS_FEED_ITEMS_PER_POLL,
- NEWS_FEED_URL,
- NEWS_FEED_URLS,
- _NEEDLE_RSS_MAX_CONCURRENCY,
- )
- logger = logging.getLogger(__name__)
- FEED_FETCH_TIMEOUT_SECONDS = 20
- def _canonical_url(url: str) -> str:
- # Minimal canonicalization for v1.
- return url.strip()
- def _strip_html(text: str) -> str:
- """Remove obvious HTML so downstream summaries stay readable."""
- text = re.sub(r"<script.*?</script>", "", text, flags=re.I | re.S)
- text = re.sub(r"<style.*?</style>", "", text, flags=re.I | re.S)
- text = re.sub(r"<[^>]+>", " ", text)
- text = re.sub(r"\s+", " ", text)
- return text.strip()
- def _feed_urls() -> List[str]:
- urls = [u.strip() for u in NEWS_FEED_URLS.split(",") if u.strip()]
- if not urls:
- urls = [NEWS_FEED_URL]
- return urls
- def _parse_feed_from_bytes(data: bytes, feed_url: str):
- """Parse feed from raw bytes (sync, but fast — just XML parsing)."""
- return feedparser.parse(data)
- async def _fetch_feed_async(
- client: httpx.AsyncClient,
- semaphore: asyncio.Semaphore,
- feed_url: str,
- ) -> tuple[str, bytes | None]:
- """Fetch a single RSS feed concurrently. Returns (feed_url, raw_bytes)."""
- async with semaphore:
- try:
- resp = await client.get(feed_url, follow_redirects=True)
- resp.raise_for_status()
- return (feed_url, resp.content)
- except (httpx.HTTPStatusError, httpx.TimeoutException, httpx.ConnectError, OSError) as exc:
- logger.exception("news feed fetch failed feed_url=%s error=%s", feed_url, exc)
- return (feed_url, None)
- except Exception as exc:
- logger.exception("news feed fetch unexpected error feed_url=%s error=%s", feed_url, exc)
- return (feed_url, None)
- def _extract_articles_from_feed(
- feed_url: str,
- parsed,
- per_feed_limit: int,
- ) -> List[Dict[str, Any]]:
- """Extract article dicts from a parsed feedparser object (sync)."""
- articles: List[Dict[str, Any]] = []
- feed_name = getattr(parsed.feed, "title", None) or feed_url
- parsed_entries = len(getattr(parsed, "entries", []) or [])
- logger.info(
- "news feed parsed feed_url=%s feed_name=%s entries=%s",
- feed_url, feed_name, parsed_entries,
- )
- kept = 0
- for entry in parsed.entries[:per_feed_limit]:
- title = str(getattr(entry, "title", "")).strip()
- url = _canonical_url(str(getattr(entry, "link", "")).strip())
- timestamp = str(getattr(entry, "published", "")) or str(getattr(entry, "updated", ""))
- summary = _strip_html(
- str(getattr(entry, "summary", "")) or str(getattr(entry, "description", ""))
- )
- if not title or not url:
- continue
- articles.append({
- "title": title,
- "url": url,
- "source": str(feed_name),
- "feed_url": feed_url,
- "timestamp": timestamp,
- "summary": summary,
- })
- kept += 1
- logger.info("news feed completed feed_url=%s kept=%s", feed_url, kept)
- return articles
- async def fetch_news_articles(limit: int = NEWS_FEED_ITEMS_PER_POLL) -> List[Dict[str, Any]]:
- """Fetch all RSS feeds concurrently, parse, and return articles."""
- feed_urls = _feed_urls()
- per_feed_limit = max(1, int(limit))
- logger.info(
- "news ingestion start feeds=%s limit=%s timeout_s=%s",
- len(feed_urls), per_feed_limit, FEED_FETCH_TIMEOUT_SECONDS,
- )
- semaphore = asyncio.Semaphore(_NEEDLE_RSS_MAX_CONCURRENCY)
- async with httpx.AsyncClient(
- timeout=httpx.Timeout(FEED_FETCH_TIMEOUT_SECONDS),
- headers={"User-Agent": "news-mcp/1.0"},
- ) as client:
- tasks = [
- _fetch_feed_async(client, semaphore, url)
- for url in feed_urls
- ]
- results = await asyncio.gather(*tasks, return_exceptions=False)
- articles: List[Dict[str, Any]] = []
- for feed_url, raw in results:
- if raw is None:
- continue
- # feedparser.parse is CPU-light but sync — parse inline (fast enough)
- parsed = feedparser.parse(raw)
- articles.extend(_extract_articles_from_feed(feed_url, parsed, per_feed_limit))
- logger.info("news ingestion complete total_kept=%s", len(articles))
- return articles
- def normalize_topic_from_title(title: str) -> str:
- t = title.lower()
- if any(k in t for k in ["btc", "bitcoin", "eth", "ethereum", "crypto"]):
- return "crypto"
- if any(k in t for k in ["rate", "rates", "inflation", "fed", "treasury", "euro"]):
- return "macro"
- if any(k in t for k in ["regulation", "sec", "ban", "law"]):
- return "regulation"
- if any(k in t for k in ["ai", "llm", "model", "openai", "anthropic"]):
- return "ai"
- return "other"
- def cluster_id_for_title(topic: str, title: str) -> str:
- import hashlib
- key = f"{topic}|{title.strip().lower()}"
- return hashlib.sha1(key.encode("utf-8")).hexdigest()
|