| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120 |
- from __future__ import annotations
- import hashlib
- import logging
- import re
- from typing import Any, Dict, List
- from urllib.error import URLError, HTTPError
- from urllib.request import Request, urlopen
- import feedparser
- from news_mcp.config import NEWS_FEED_URL, NEWS_FEED_URLS
- logger = logging.getLogger(__name__)
- FEED_FETCH_TIMEOUT_SECONDS = 15
- def _canonical_url(url: str) -> str:
- # Minimal canonicalization for v1.
- return url.strip()
- def _strip_html(text: str) -> str:
- """Remove obvious HTML so downstream summaries stay readable."""
- text = re.sub(r"<script.*?</script>", "", text, flags=re.I | re.S)
- text = re.sub(r"<style.*?</style>", "", text, flags=re.I | re.S)
- text = re.sub(r"<[^>]+>", " ", text)
- text = re.sub(r"\s+", " ", text)
- return text.strip()
- def _feed_urls() -> List[str]:
- urls = [u.strip() for u in NEWS_FEED_URLS.split(",") if u.strip()]
- if not urls:
- urls = [NEWS_FEED_URL]
- return urls
- def _fetch_feed(feed_url: str):
- req = Request(feed_url, headers={"User-Agent": "news-mcp/1.0"})
- with urlopen(req, timeout=FEED_FETCH_TIMEOUT_SECONDS) as resp:
- return feedparser.parse(resp.read())
- def fetch_news_articles(limit: int = 50) -> List[Dict[str, Any]]:
- feed_urls = _feed_urls()
- articles: List[Dict[str, Any]] = []
- logger.info("news ingestion start feeds=%s limit=%s timeout_s=%s", len(feed_urls), limit, FEED_FETCH_TIMEOUT_SECONDS)
- # Evenly pull from feeds; keep total below `limit`.
- per_feed_limit = max(1, int(limit / max(1, len(feed_urls))))
- for feed_url in feed_urls:
- try:
- feed = _fetch_feed(feed_url)
- feed_name = getattr(feed.feed, "title", None) or feed_url
- parsed_entries = len(getattr(feed, "entries", []) or [])
- logger.info("news feed parsed feed_url=%s feed_name=%s entries=%s", feed_url, feed_name, parsed_entries)
- except (HTTPError, URLError, TimeoutError, OSError) as exc:
- logger.exception("news feed fetch failed feed_url=%s error=%s", feed_url, exc)
- continue
- except Exception as exc:
- logger.exception("news feed parse failed feed_url=%s error=%s", feed_url, exc)
- continue
- kept_before = len(articles)
- for entry in feed.entries[:per_feed_limit]:
- title = str(getattr(entry, "title", "")).strip()
- url = _canonical_url(str(getattr(entry, "link", "")).strip())
- timestamp = str(getattr(entry, "published", "")) or str(getattr(entry, "updated", ""))
- summary = _strip_html(str(getattr(entry, "summary", "")) or str(getattr(entry, "description", "")))
- if not title or not url:
- continue
- articles.append(
- {
- "title": title,
- "url": url,
- "source": str(feed_name),
- "feed_url": feed_url,
- "timestamp": timestamp,
- "summary": summary,
- }
- )
- if len(articles) >= limit:
- logger.info("news ingestion limit reached feed_url=%s total_kept=%s", feed_url, len(articles))
- return articles
- logger.info(
- "news feed completed feed_url=%s kept=%s",
- feed_url,
- len(articles) - kept_before,
- )
- logger.info("news ingestion complete total_kept=%s", len(articles))
- return articles
- def normalize_topic_from_title(title: str) -> str:
- t = title.lower()
- if any(k in t for k in ["btc", "bitcoin", "eth", "ethereum", "crypto"]):
- return "crypto"
- if any(k in t for k in ["rate", "rates", "inflation", "fed", "treasury", "euro"]):
- return "macro"
- if any(k in t for k in ["regulation", "sec", "ban", "law"]):
- return "regulation"
- if any(k in t for k in ["ai", "llm", "model", "openai", "anthropic"]):
- return "ai"
- return "other"
- def cluster_id_for_title(topic: str, title: str) -> str:
- key = f"{topic}|{title.strip().lower()}"
- return hashlib.sha1(key.encode("utf-8")).hexdigest()
|