| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192 |
- from __future__ import annotations
- import hashlib
- import logging
- from typing import Any, Dict, List
- import feedparser
- from news_mcp.config import NEWS_FEED_URL, NEWS_FEED_URLS
- logger = logging.getLogger(__name__)
- def _canonical_url(url: str) -> str:
- # Minimal canonicalization for v1.
- return url.strip()
- def _feed_urls() -> List[str]:
- urls = [u.strip() for u in NEWS_FEED_URLS.split(",") if u.strip()]
- if not urls:
- urls = [NEWS_FEED_URL]
- return urls
- def fetch_news_articles(limit: int = 50) -> List[Dict[str, Any]]:
- feed_urls = _feed_urls()
- articles: List[Dict[str, Any]] = []
- logger.info("news ingestion start feeds=%s limit=%s", len(feed_urls), limit)
- # Evenly pull from feeds; keep total below `limit`.
- per_feed_limit = max(1, int(limit / max(1, len(feed_urls))))
- for feed_url in feed_urls:
- feed = feedparser.parse(feed_url)
- feed_name = getattr(feed.feed, "title", None) or feed_url
- parsed_entries = len(getattr(feed, "entries", []) or [])
- logger.info("news feed parsed feed_url=%s feed_name=%s entries=%s", feed_url, feed_name, parsed_entries)
- kept_before = len(articles)
- for entry in feed.entries[:per_feed_limit]:
- title = str(getattr(entry, "title", "")).strip()
- url = _canonical_url(str(getattr(entry, "link", "")).strip())
- timestamp = str(getattr(entry, "published", "")) or str(getattr(entry, "updated", ""))
- summary = str(getattr(entry, "summary", "")) or str(getattr(entry, "description", ""))
- if not title or not url:
- continue
- articles.append(
- {
- "title": title,
- "url": url,
- "source": str(feed_name),
- "feed_url": feed_url,
- "timestamp": timestamp,
- "summary": summary,
- }
- )
- if len(articles) >= limit:
- logger.info("news ingestion limit reached feed_url=%s total_kept=%s", feed_url, len(articles))
- return articles
- logger.info(
- "news feed completed feed_url=%s kept=%s",
- feed_url,
- len(articles) - kept_before,
- )
- logger.info("news ingestion complete total_kept=%s", len(articles))
- return articles
- def normalize_topic_from_title(title: str) -> str:
- t = title.lower()
- if any(k in t for k in ["btc", "bitcoin", "eth", "ethereum", "crypto"]):
- return "crypto"
- if any(k in t for k in ["rate", "rates", "inflation", "fed", "treasury", "euro"]):
- return "macro"
- if any(k in t for k in ["regulation", "sec", "ban", "law"]):
- return "regulation"
- if any(k in t for k in ["ai", "llm", "model", "openai", "anthropic"]):
- return "ai"
- return "other"
- def cluster_id_for_title(topic: str, title: str) -> str:
- key = f"{topic}|{title.strip().lower()}"
- return hashlib.sha1(key.encode("utf-8")).hexdigest()
|