news_feeds.py 3.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. from __future__ import annotations
  2. import hashlib
  3. import logging
  4. from typing import Any, Dict, List
  5. import feedparser
  6. from news_mcp.config import NEWS_FEED_URL, NEWS_FEED_URLS
  7. logger = logging.getLogger(__name__)
  8. def _canonical_url(url: str) -> str:
  9. # Minimal canonicalization for v1.
  10. return url.strip()
  11. def _feed_urls() -> List[str]:
  12. urls = [u.strip() for u in NEWS_FEED_URLS.split(",") if u.strip()]
  13. if not urls:
  14. urls = [NEWS_FEED_URL]
  15. return urls
  16. def fetch_news_articles(limit: int = 50) -> List[Dict[str, Any]]:
  17. feed_urls = _feed_urls()
  18. articles: List[Dict[str, Any]] = []
  19. logger.info("news ingestion start feeds=%s limit=%s", len(feed_urls), limit)
  20. # Evenly pull from feeds; keep total below `limit`.
  21. per_feed_limit = max(1, int(limit / max(1, len(feed_urls))))
  22. for feed_url in feed_urls:
  23. feed = feedparser.parse(feed_url)
  24. feed_name = getattr(feed.feed, "title", None) or feed_url
  25. parsed_entries = len(getattr(feed, "entries", []) or [])
  26. logger.info("news feed parsed feed_url=%s feed_name=%s entries=%s", feed_url, feed_name, parsed_entries)
  27. kept_before = len(articles)
  28. for entry in feed.entries[:per_feed_limit]:
  29. title = str(getattr(entry, "title", "")).strip()
  30. url = _canonical_url(str(getattr(entry, "link", "")).strip())
  31. timestamp = str(getattr(entry, "published", "")) or str(getattr(entry, "updated", ""))
  32. summary = str(getattr(entry, "summary", "")) or str(getattr(entry, "description", ""))
  33. if not title or not url:
  34. continue
  35. articles.append(
  36. {
  37. "title": title,
  38. "url": url,
  39. "source": str(feed_name),
  40. "feed_url": feed_url,
  41. "timestamp": timestamp,
  42. "summary": summary,
  43. }
  44. )
  45. if len(articles) >= limit:
  46. logger.info("news ingestion limit reached feed_url=%s total_kept=%s", feed_url, len(articles))
  47. return articles
  48. logger.info(
  49. "news feed completed feed_url=%s kept=%s",
  50. feed_url,
  51. len(articles) - kept_before,
  52. )
  53. logger.info("news ingestion complete total_kept=%s", len(articles))
  54. return articles
  55. def normalize_topic_from_title(title: str) -> str:
  56. t = title.lower()
  57. if any(k in t for k in ["btc", "bitcoin", "eth", "ethereum", "crypto"]):
  58. return "crypto"
  59. if any(k in t for k in ["rate", "rates", "inflation", "fed", "treasury", "euro"]):
  60. return "macro"
  61. if any(k in t for k in ["regulation", "sec", "ban", "law"]):
  62. return "regulation"
  63. if any(k in t for k in ["ai", "llm", "model", "openai", "anthropic"]):
  64. return "ai"
  65. return "other"
  66. def cluster_id_for_title(topic: str, title: str) -> str:
  67. key = f"{topic}|{title.strip().lower()}"
  68. return hashlib.sha1(key.encode("utf-8")).hexdigest()