Browse Source

news-mcp: cleanup feed naming, improve ingestion logs, add mcporter README examples

Lukas Goldschmidt 1 month ago
parent
commit
a4096b9dfb

+ 61 - 2
README.md

@@ -1,6 +1,6 @@
 # 📰 News MCP Server
 
-FastMCP-based MCP server that turns RSS into **deduplicated, enriched news clusters**.
+FastMCP-based MCP server that turns news feeds into **deduplicated, enriched clusters**.
 
 ## Quick start
 
@@ -18,7 +18,7 @@ Health:
 - `http://127.0.0.1:8506/health`
 
 ## What this server provides
-- Fetches RSS from `https://breakingthenews.net/news-feed.xml`
+- Fetches from one or more configured news feeds (`NEWS_FEED_URL` / `NEWS_FEED_URLS`)
 - Deduplicates articles into clusters (v1 fuzzy title similarity)
 - Enriches clusters with Groq (topic/entities/sentiment/keywords)
 - Caches clusters + Groq fields in SQLite
@@ -37,11 +37,70 @@ Health:
 4) `detect_emerging_topics(limit)`
 - derives “emerging” signals from recent cached clusters
 
+5) `get_news_sentiment(entity, timeframe)`
+- aggregates sentiment around an entity from cached enriched clusters
+
 ## Configuration
 
 See `news-mcp/.env`.
 Key variables:
 - `GROQ_API_KEY`, `GROQ_MODEL`, `GROQ_DEBUG`
+- `NEWS_FEED_URL` (single feed fallback)
+- `NEWS_FEED_URLS` (comma-separated feed URLs; overrides `NEWS_FEED_URL`)
 - `NEWS_REFRESH_INTERVAL_SECONDS` (default 900)
 - `NEWS_BACKGROUND_REFRESH_ON_START` (default true)
+- `NEWS_BACKGROUND_REFRESH_ENABLED` (default true)
 - `NEWS_CLUSTERS_TTL_HOURS`
+- `GROQ_ENRICH_OTHER_ONLY` (default false; set true for cost control)
+
+## mcporter examples (all news-mcp calls)
+
+Use your existing config path:
+
+```bash
+CONFIG=/home/lucky/.openclaw/workspace/config/mcporter.json
+```
+
+Inspect server + tools:
+
+```bash
+mcporter --config "$CONFIG" list news --schema
+```
+
+### 1) Latest events
+
+```bash
+mcporter --config "$CONFIG" call news.get_latest_events topic=crypto limit=10
+mcporter --config "$CONFIG" call news.get_latest_events topic=macro limit=5
+```
+
+### 2) Events for an entity
+
+```bash
+mcporter --config "$CONFIG" call news.get_events_for_entity entity=Bitcoin limit=10
+mcporter --config "$CONFIG" call news.get_events_for_entity entity=ETH limit=10
+mcporter --config "$CONFIG" call news.get_events_for_entity entity=ETF limit=10
+```
+
+### 3) Event summary (by cluster_id)
+
+```bash
+# First fetch an event id
+mcporter --config "$CONFIG" call news.get_latest_events topic=crypto limit=1
+
+# Then summarize it
+mcporter --config "$CONFIG" call news.get_event_summary event_id=<cluster_id>
+```
+
+### 4) Emerging topics
+
+```bash
+mcporter --config "$CONFIG" call news.detect_emerging_topics limit=10
+```
+
+### 5) Sentiment for an entity
+
+```bash
+mcporter --config "$CONFIG" call news.get_news_sentiment entity=Bitcoin timeframe=24h
+mcporter --config "$CONFIG" call news.get_news_sentiment entity=Ethereum timeframe=72h
+```

+ 31 - 3
killserver.sh

@@ -2,6 +2,20 @@
 set -euo pipefail
 PIDFILE=${PIDFILE:-server.pid}
 
+SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
+
+is_news_server() {
+  local cmd="$1"
+  case "$cmd" in
+    *"news_mcp.mcp_server_fastmcp:app"*|*"run.sh"*|*"uvicorn"*)
+      return 0
+      ;;
+    *)
+      return 1
+      ;;
+  esac
+}
+
 stop_pid() {
   local pid="$1"
   if [ -n "$pid" ] && ps -p "$pid" > /dev/null 2>&1; then
@@ -18,6 +32,20 @@ if [ -f "$PIDFILE" ]; then
 fi
 
 # Sweep up stale server processes started from this project.
-for pid in $(pgrep -f 'news_mcp\.mcp_server_fastmcp:app|uvicorn .*news_mcp\.mcp_server_fastmcp:app' || true); do
-  stop_pid "$pid"
-done
+while IFS= read -r line; do
+  pid="${line%% *}"
+  cmd="${line#* }"
+  if is_news_server "$cmd"; then
+    stop_pid "$pid"
+  fi
+done < <(pgrep -af 'news_mcp\.mcp_server_fastmcp:app|uvicorn|run\.sh' || true)
+
+# Extra safety: if we were started from the project dir, kill any lingering
+# process that still has the app module in its command line.
+while IFS= read -r line; do
+  pid="${line%% *}"
+  cmd="${line#* }"
+  if [[ "$cmd" == *"$SCRIPT_DIR"* ]] && [[ "$cmd" == *"news_mcp.mcp_server_fastmcp:app"* ]]; then
+    stop_pid "$pid"
+  fi
+done < <(pgrep -af 'news_mcp\.mcp_server_fastmcp:app' || true)

+ 12 - 6
news_mcp/config.py

@@ -12,10 +12,15 @@ DATA_DIR.mkdir(parents=True, exist_ok=True)
 
 DB_PATH = Path(os.getenv("NEWS_MCP_DB_PATH", str(DATA_DIR / "news.sqlite")))
 
-RSS_FEED_URL = os.getenv("NEWS_RSS_FEED_URL", "https://breakingthenews.net/news-feed.xml")
-# Optional multi-feed mode: comma-separated RSS URLs.
-# If set (non-empty), this overrides RSS_FEED_URL.
-RSS_FEED_URLS = os.getenv("NEWS_RSS_FEED_URLS", "").strip()
+# Backward-compatible aliases for older config names.
+NEWS_FEED_URL = os.getenv("NEWS_FEED_URL", os.getenv("NEWS_RSS_FEED_URL", "https://breakingthenews.net/news-feed.xml"))
+# Optional multi-feed mode: comma-separated feed URLs.
+# If set (non-empty), this overrides NEWS_FEED_URL.
+NEWS_FEED_URLS = os.getenv("NEWS_FEED_URLS", os.getenv("NEWS_RSS_FEED_URLS", "")).strip()
+
+# Legacy names kept for compatibility.
+RSS_FEED_URL = NEWS_FEED_URL
+RSS_FEED_URLS = NEWS_FEED_URLS
 
 # Clusters TTL (hours)
 CLUSTERS_TTL_HOURS = float(os.getenv("NEWS_CLUSTERS_TTL_HOURS", "24"))
@@ -27,8 +32,9 @@ GROQ_API_KEY = os.getenv("GROQ_API_KEY")
 GROQ_MODEL = os.getenv("GROQ_MODEL", "llama4-16e")
 GROQ_DEBUG = os.getenv("GROQ_DEBUG", "false").lower() == "true"
 
-# Cost control: only enrich clusters whose heuristic topic is "other" by default.
-GROQ_ENRICH_OTHER_ONLY = os.getenv("GROQ_ENRICH_OTHER_ONLY", "true").lower() == "true"
+# Groq enrichment is the default for all incoming news.
+# Set GROQ_ENRICH_OTHER_ONLY=true only if you want to restrict it for cost control.
+GROQ_ENRICH_OTHER_ONLY = os.getenv("GROQ_ENRICH_OTHER_ONLY", "false").lower() == "true"
 
 # Limit enriched clusters per refresh call.
 GROQ_MAX_CLUSTERS_PER_REFRESH = int(os.getenv("GROQ_MAX_CLUSTERS_PER_REFRESH", "20"))

+ 1 - 1
news_mcp/dedup/cluster.py

@@ -2,7 +2,7 @@ from __future__ import annotations
 
 from typing import Any, Dict, List, Tuple
 
-from news_mcp.sources.rss_breakingthenews import normalize_topic_from_title
+from news_mcp.sources.news_feeds import normalize_topic_from_title
 
 import re
 from difflib import SequenceMatcher

+ 14 - 6
news_mcp/jobs/poller.py

@@ -1,28 +1,32 @@
 from __future__ import annotations
 
+import logging
 from typing import Any, Dict
 
-from news_mcp.config import CLUSTERS_TTL_HOURS, DB_PATH, RSS_FEED_URL, RSS_FEED_URLS
+from news_mcp.config import CLUSTERS_TTL_HOURS, DB_PATH, NEWS_FEED_URL, NEWS_FEED_URLS
 from news_mcp.dedup.cluster import dedup_and_cluster_articles
 from news_mcp.enrichment.enrich import enrich_cluster
 from news_mcp.enrichment.groq_enrich import classify_cluster_groq
-from news_mcp.sources.rss_breakingthenews import fetch_breakingthenews_articles
+from news_mcp.sources.news_feeds import fetch_news_articles
 from news_mcp.storage.sqlite_store import SQLiteClusterStore
 
 from news_mcp.config import GROQ_ENRICH_OTHER_ONLY, GROQ_MAX_CLUSTERS_PER_REFRESH
 
 
 async def refresh_clusters(topic: str | None = None, limit: int = 80) -> None:
+    logger = logging.getLogger("news_mcp.refresh")
     store = SQLiteClusterStore(DB_PATH)
 
-    articles = fetch_breakingthenews_articles(limit=limit)
+    logger.info("refresh start topic=%s limit=%s", topic, limit)
+    articles = fetch_news_articles(limit=limit)
+    logger.info("refresh fetched articles=%s", len(articles))
 
     # Skip expensive work if the feed content (titles/urls/timestamps) didn't change.
     import hashlib
-    rss_urls = [u.strip() for u in RSS_FEED_URLS.split(",") if u.strip()]
+    rss_urls = [u.strip() for u in NEWS_FEED_URLS.split(",") if u.strip()]
     if not rss_urls:
-        rss_urls = [RSS_FEED_URL]
-    feed_key = "breakingthenews:" + hashlib.sha1(",".join(rss_urls).encode("utf-8")).hexdigest()
+        rss_urls = [NEWS_FEED_URL]
+    feed_key = "newsfeeds:" + hashlib.sha1(",".join(rss_urls).encode("utf-8")).hexdigest()
     material = "\n".join(
         f"{a.get('title','')}|{a.get('url','')}|{a.get('timestamp','')}"
         for a in articles
@@ -30,9 +34,12 @@ async def refresh_clusters(topic: str | None = None, limit: int = 80) -> None:
     last_hash = hashlib.sha1(material.encode("utf-8")).hexdigest()
     prev_hash = store.get_feed_hash(feed_key)
     if prev_hash == last_hash:
+        logger.info("refresh unchanged feed_key=%s topic=%s", feed_key, topic)
         return
+    logger.info("refresh changed feed_key=%s topic=%s", feed_key, topic)
     store.set_feed_hash(feed_key, last_hash)
     clustered_by_topic = dedup_and_cluster_articles(articles)
+    logger.info("refresh clustered topics=%s", list(clustered_by_topic.keys()))
 
     for t, clusters in clustered_by_topic.items():
         if topic and t != topic:
@@ -62,5 +69,6 @@ async def refresh_clusters(topic: str | None = None, limit: int = 80) -> None:
             enriched.append(c2)
 
         store.upsert_clusters(enriched, topic=t)
+        logger.info("refresh stored topic=%s clusters=%s", t, len(enriched))
 
             

+ 92 - 0
news_mcp/sources/news_feeds.py

@@ -0,0 +1,92 @@
+from __future__ import annotations
+
+import hashlib
+import logging
+from typing import Any, Dict, List
+
+import feedparser
+
+from news_mcp.config import NEWS_FEED_URL, NEWS_FEED_URLS
+
+
+logger = logging.getLogger(__name__)
+
+
+def _canonical_url(url: str) -> str:
+    # Minimal canonicalization for v1.
+    return url.strip()
+
+
+def _feed_urls() -> List[str]:
+    urls = [u.strip() for u in NEWS_FEED_URLS.split(",") if u.strip()]
+    if not urls:
+        urls = [NEWS_FEED_URL]
+    return urls
+
+
+def fetch_news_articles(limit: int = 50) -> List[Dict[str, Any]]:
+    feed_urls = _feed_urls()
+    articles: List[Dict[str, Any]] = []
+
+    logger.info("news ingestion start feeds=%s limit=%s", len(feed_urls), limit)
+
+    # Evenly pull from feeds; keep total below `limit`.
+    per_feed_limit = max(1, int(limit / max(1, len(feed_urls))))
+
+    for feed_url in feed_urls:
+        feed = feedparser.parse(feed_url)
+        feed_name = getattr(feed.feed, "title", None) or feed_url
+        parsed_entries = len(getattr(feed, "entries", []) or [])
+        logger.info("news feed parsed feed_url=%s feed_name=%s entries=%s", feed_url, feed_name, parsed_entries)
+
+        kept_before = len(articles)
+        for entry in feed.entries[:per_feed_limit]:
+            title = str(getattr(entry, "title", "")).strip()
+            url = _canonical_url(str(getattr(entry, "link", "")).strip())
+            timestamp = str(getattr(entry, "published", "")) or str(getattr(entry, "updated", ""))
+            summary = str(getattr(entry, "summary", "")) or str(getattr(entry, "description", ""))
+
+            if not title or not url:
+                continue
+
+            articles.append(
+                {
+                    "title": title,
+                    "url": url,
+                    "source": str(feed_name),
+                    "feed_url": feed_url,
+                    "timestamp": timestamp,
+                    "summary": summary,
+                }
+            )
+
+            if len(articles) >= limit:
+                logger.info("news ingestion limit reached feed_url=%s total_kept=%s", feed_url, len(articles))
+                return articles
+
+        logger.info(
+            "news feed completed feed_url=%s kept=%s",
+            feed_url,
+            len(articles) - kept_before,
+        )
+
+    logger.info("news ingestion complete total_kept=%s", len(articles))
+    return articles
+
+
+def normalize_topic_from_title(title: str) -> str:
+    t = title.lower()
+    if any(k in t for k in ["btc", "bitcoin", "eth", "ethereum", "crypto"]):
+        return "crypto"
+    if any(k in t for k in ["rate", "rates", "inflation", "fed", "treasury", "euro"]):
+        return "macro"
+    if any(k in t for k in ["regulation", "sec", "ban", "law"]):
+        return "regulation"
+    if any(k in t for k in ["ai", "llm", "model", "openai", "anthropic"]):
+        return "ai"
+    return "other"
+
+
+def cluster_id_for_title(topic: str, title: str) -> str:
+    key = f"{topic}|{title.strip().lower()}"
+    return hashlib.sha1(key.encode("utf-8")).hexdigest()

+ 3 - 66
news_mcp/sources/rss_breakingthenews.py

@@ -1,69 +1,6 @@
 from __future__ import annotations
 
-import hashlib
-from typing import Any, Dict, List
+from news_mcp.sources.news_feeds import cluster_id_for_title, fetch_news_articles, normalize_topic_from_title
 
-import feedparser
-
-from news_mcp.config import RSS_FEED_URL, RSS_FEED_URLS
-
-
-def _canonical_url(url: str) -> str:
-    # Minimal canonicalization for v1.
-    return url.strip()
-
-
-def fetch_breakingthenews_articles(limit: int = 50) -> List[Dict[str, Any]]:
-    rss_urls = [u.strip() for u in RSS_FEED_URLS.split(",") if u.strip()]
-    if not rss_urls:
-        rss_urls = [RSS_FEED_URL]
-
-    articles: List[Dict[str, Any]] = []
-
-    # Evenly pull from feeds; keep total below `limit`.
-    per_feed_limit = max(1, int(limit / max(1, len(rss_urls))))
-
-    for feed_url in rss_urls:
-        feed = feedparser.parse(feed_url)
-        for entry in feed.entries[:per_feed_limit]:
-            title = str(getattr(entry, "title", "")).strip()
-            url = _canonical_url(str(getattr(entry, "link", "")).strip())
-            source = "RSS"
-            timestamp = str(getattr(entry, "published", "")) or str(getattr(entry, "updated", ""))
-            summary = str(getattr(entry, "summary", "")) or str(getattr(entry, "description", ""))
-
-            if not title or not url:
-                continue
-
-            articles.append(
-                {
-                    "title": title,
-                    "url": url,
-                    "source": source,
-                    "timestamp": timestamp,
-                    "summary": summary,
-                }
-            )
-
-            if len(articles) >= limit:
-                return articles
-
-    return articles
-
-
-def normalize_topic_from_title(title: str) -> str:
-    t = title.lower()
-    if any(k in t for k in ["btc", "bitcoin", "eth", "ethereum", "crypto"]):
-        return "crypto"
-    if any(k in t for k in ["rate", "rates", "inflation", "fed", "treasury", "euro"]):
-        return "macro"
-    if any(k in t for k in ["regulation", "sec", "ban", "law"]):
-        return "regulation"
-    if any(k in t for k in ["ai", "llm", "model", "openai", "anthropic"]):
-        return "ai"
-    return "other"
-
-
-def cluster_id_for_title(topic: str, title: str) -> str:
-    key = f"{topic}|{title.strip().lower()}"
-    return hashlib.sha1(key.encode("utf-8")).hexdigest()
+# Backward-compatible aliases for older imports.
+fetch_breakingthenews_articles = fetch_news_articles