Quellcode durchsuchen

Initialize news-mcp scaffold

Lukas Goldschmidt vor 1 Monat
Commit
13f8f1d5ab

+ 11 - 0
.gitignore

@@ -0,0 +1,11 @@
+__pycache__/
+*.pyc
+*.pyo
+*.pyd
+.venv/
+.env
+.env.*
+data/
+server.pid
+uvicorn.log
+logs/

+ 390 - 0
OUTLOOK.md

@@ -0,0 +1,390 @@
+
+# 📰 News MCP Server — Requirements Spec
+
+## 🎯 Goal
+
+Provide **structured, deduplicated, topic-aware news signals**
+that an agent can use for reasoning about:
+
+* events
+* narratives
+* sentiment shifts
+
+👉 Not a feed reader
+👉 Not a headline dump
+👉 A **signal extraction layer**
+
+---
+
+# 🧠 Core Design Principle
+
+> Raw news is useless to agents.
+> **Processed news is powerful.**
+
+---
+
+# 🏗️ 1. Internal Architecture
+
+## 🧩 Data Sources Layer (`sources/`)
+
+Mix of:
+
+* RSS feeds (primary)
+* optional APIs later
+
+Examples:
+
+* Reuters
+* Bloomberg
+* CoinDesk
+
+### Responsibilities:
+
+* fetch articles
+* normalize format
+
+---
+
+## 🔄 Ingestion Pipeline
+
+Runs periodically (e.g. every few minutes)
+
+Steps:
+
+1. fetch articles
+2. normalize fields:
+
+   * title
+   * url
+   * source
+   * timestamp
+   * summary (if available)
+
+---
+
+## 🧹 Deduplication Layer
+
+### Problem:
+
+Same story appears across many sources.
+
+### Solution:
+
+Cluster articles by similarity:
+
+Methods:
+
+* title similarity (fuzzy match / embeddings)
+* URL canonicalization
+* content similarity (optional later)
+
+### Output:
+
+```json id="cluster"
+{
+  "cluster_id": "...",
+  "headline": "Canonical headline",
+  "articles": [...],
+  "sources": ["Reuters", "Bloomberg"],
+  "first_seen": "...",
+  "last_updated": "..."
+}
+```
+
+👉 This is your **core unit of truth**, not individual articles
+
+---
+
+## 🧠 Enrichment Layer
+
+Adds meaning to clusters.
+
+### 1. Entity extraction
+
+* assets (BTC, ETH)
+* companies
+* macro topics (inflation, rates)
+
+---
+
+### 2. Topic classification
+
+Examples:
+
+* crypto
+* macro
+* regulation
+* AI
+
+---
+
+### 3. Sentiment (lightweight)
+
+* positive / negative / neutral
+* or simple score
+
+👉 Keep this simple in v1 (don’t over-engineer NLP)
+
+---
+
+### 4. Importance scoring (VERY useful)
+
+Heuristic:
+
+* number of sources covering it
+* recency
+* source credibility
+* keyword weighting
+
+---
+
+## 🗃️ Storage Layer
+
+You need short-term memory:
+
+* clusters (not raw articles)
+* TTL: e.g. 24–72h
+
+Optional:
+
+* in-memory store (start)
+* later: DB
+
+we have a choice of storage possibilites including qdrant, postgresql, couchdb
+
+---
+
+# 🧰 2. Agent-Facing Tools (IMPORTANT)
+
+Keep tools **high-level and semantic**
+
+---
+
+## 1. `get_latest_events`
+
+> “What is happening right now?”
+
+Input:
+
+```json id="n1"
+{
+  "topic": "crypto",
+  "limit": 5
+}
+```
+
+Output:
+
+```json id="n2"
+[
+  {
+    "headline": "...",
+    "summary": "...",
+    "entities": ["BTC"],
+    "sentiment": "positive",
+    "importance": 0.82,
+    "sources": ["Reuters", "CoinDesk"],
+    "timestamp": "..."
+  }
+]
+```
+
+---
+
+## 2. `get_events_for_entity`
+
+> “What’s happening with X?”
+
+```json id="n3"
+{
+  "entity": "BTC"
+}
+```
+
+👉 filters clusters by entity
+
+---
+
+## 3. `get_event_summary`
+
+> “Explain this event clearly”
+
+```json id="n4"
+{
+  "event_id": "cluster_id"
+}
+```
+
+Output:
+
+* merged summary
+* key facts
+* sources
+
+👉 This is where you compress multiple articles into one clean narrative
+
+---
+
+## 4. `get_news_sentiment`
+
+> “What’s the tone around X?”
+
+```json id="n5"
+{
+  "entity": "BTC",
+  "timeframe": "24h"
+}
+```
+
+Output:
+
+```json id="n6"
+{
+  "sentiment": "positive",
+  "score": 0.64,
+  "article_count": 42
+}
+```
+
+---
+
+## 5. `detect_emerging_topics` (very valuable)
+
+> “What is gaining attention?”
+
+Output:
+
+```json id="n7"
+[
+  {
+    "topic": "Ethereum ETF",
+    "trend_score": 0.91,
+    "related_entities": ["ETH"]
+  }
+]
+```
+
+---
+
+# ⚠️ 3. What NOT to expose
+
+Avoid:
+
+* raw RSS feeds
+* individual article endpoints
+* unprocessed headlines
+
+❌ Bad:
+
+```id="bad-news"
+get_raw_articles()
+```
+
+👉 This destroys signal quality for agents
+
+---
+
+# 🔁 4. Caching & Freshness Strategy
+
+## Key difference from crypto:
+
+* News is **append-only + evolving**
+* Not real-time tick data
+
+---
+
+## Strategy:
+
+### Fetch layer:
+
+* poll every few minutes
+
+### Cluster layer:
+
+* update clusters incrementally
+
+### Tool responses:
+
+* no heavy recomputation
+* serve from processed store
+
+---
+
+# 🧠 5. Deduplication Strategy (critical)
+
+Start simple:
+
+### v1:
+
+* normalize titles (lowercase, strip punctuation)
+* fuzzy match (threshold ~0.8)
+
+### v2:
+
+* embeddings / semantic similarity
+
+---
+
+# ⚡ 6. Signal Quality Rules
+
+Your MCP should:
+
+### ✅ Do:
+
+* reduce 100 articles → 5–10 clusters
+* highlight consensus
+* surface importance
+
+### ❌ Don’t:
+
+* overwhelm agent with volume
+* pass conflicting duplicates
+* expose noise
+
+---
+
+# 🧩 7. Relationship to Other MCPs
+
+This MCP becomes powerful when combined with:
+
+* crypto MCP → price
+* trends MCP → attention
+
+👉 News MCP provides:
+
+> **causal narratives**
+
+---
+
+# 🧭 8. Design Philosophy
+
+Each tool should answer:
+
+> “What is happening, and why should I care?”
+
+---
+
+# 🚀 9. Suggested Build Order
+
+1. RSS ingestion
+2. normalization
+3. basic deduplication
+4. clustering
+5. simple summarization
+6. entity tagging
+
+👉 Only then expose tools
+
+---
+
+# 🧠 Final takeaway
+
+> Crypto MCP gives you **facts**
+> News MCP gives you **meaning**
+
+But only if you:
+
+* aggressively deduplicate
+* cluster events
+* compress information
+

+ 24 - 0
README.md

@@ -0,0 +1,24 @@
+# 📰 News MCP Server
+
+FastMCP-based MCP server exposing deduplicated, topic-aware news clusters.
+
+## Quick start
+
+```bash
+cd news-mcp
+python -m venv .venv || true
+source .venv/bin/activate
+pip install -r requirements.txt
+./run.sh
+```
+
+Default URL:
+- `http://127.0.0.1:8506/mcp/sse`
+
+## Tool
+
+- `get_latest_events(topic, limit)`
+
+## Source
+
+- RSS: https://breakingthenews.net/news-feed.xml

+ 18 - 0
killserver.sh

@@ -0,0 +1,18 @@
+#!/usr/bin/env bash
+set -euo pipefail
+PIDFILE=${PIDFILE:-server.pid}
+
+stop_pid() {
+  local pid="$1"
+  if [ -n "$pid" ] && ps -p "$pid" > /dev/null 2>&1; then
+    kill "$pid" 2>/dev/null || true
+    sleep 1
+    ps -p "$pid" > /dev/null 2>&1 && kill -9 "$pid" 2>/dev/null || true
+  fi
+}
+
+if [ -f "$PIDFILE" ]; then
+  PID=$(cat "$PIDFILE" 2>/dev/null || true)
+  stop_pid "$PID"
+  rm -f "$PIDFILE"
+fi

+ 0 - 0
news_mcp/__init__.py


+ 14 - 0
news_mcp/config.py

@@ -0,0 +1,14 @@
+import os
+from pathlib import Path
+
+DATA_DIR = Path(os.getenv("NEWS_MCP_DATA_DIR", Path(__file__).resolve().parent / "data"))
+DATA_DIR.mkdir(parents=True, exist_ok=True)
+
+DB_PATH = Path(os.getenv("NEWS_MCP_DB_PATH", str(DATA_DIR / "news.sqlite")))
+
+RSS_FEED_URL = os.getenv("NEWS_RSS_FEED_URL", "https://breakingthenews.net/news-feed.xml")
+
+# Clusters TTL (hours)
+CLUSTERS_TTL_HOURS = float(os.getenv("NEWS_CLUSTERS_TTL_HOURS", "24"))
+
+DEFAULT_TOPICS = ["crypto", "macro", "regulation", "ai", "other"]

+ 0 - 0
news_mcp/dedup/__init__.py


+ 45 - 0
news_mcp/dedup/cluster.py

@@ -0,0 +1,45 @@
+from __future__ import annotations
+
+from typing import Any, Dict, List
+
+from news_mcp.sources.rss_breakingthenews import cluster_id_for_title, normalize_topic_from_title
+
+
+def dedup_and_cluster_articles(articles: List[Dict[str, Any]]) -> Dict[str, List[Dict[str, Any]]]:
+    """v1 dedup: cluster by normalized title hash per topic.
+
+    Returns topic -> clusters[]
+    """
+    by_topic: Dict[str, Dict[str, Dict[str, Any]]] = {}
+
+    for a in articles:
+        title = a["title"]
+        topic = normalize_topic_from_title(title)
+        cid = cluster_id_for_title(topic, title)
+
+        by_topic.setdefault(topic, {})
+        cluster_map = by_topic[topic]
+        if cid not in cluster_map:
+            cluster_map[cid] = {
+                "cluster_id": cid,
+                "headline": title,
+                "summary": a.get("summary", ""),
+                "entities": [],
+                "sentiment": "neutral",
+                "importance": 0.0,
+                "sources": [a["source"]],
+                "timestamp": a["timestamp"],
+                "articles": [a],
+                "first_seen": a["timestamp"],
+                "last_updated": a["timestamp"],
+            }
+        else:
+            c = cluster_map[cid]
+            c["articles"].append(a)
+            if a["source"] not in c["sources"]:
+                c["sources"].append(a["source"])
+
+            # Keep latest timestamp as last_updated (v1 heuristic)
+            c["last_updated"] = max(str(c["last_updated"]), str(a["timestamp"]))
+
+    return {topic: list(clusters.values()) for topic, clusters in by_topic.items()}

+ 0 - 0
news_mcp/enrichment/__init__.py


+ 12 - 0
news_mcp/enrichment/enrich.py

@@ -0,0 +1,12 @@
+from __future__ import annotations
+
+from typing import Any, Dict
+
+from news_mcp.enrichment.importance import compute_importance
+
+
+def enrich_cluster(cluster: Dict[str, Any]) -> Dict[str, Any]:
+    cluster = dict(cluster)
+    cluster["importance"] = compute_importance(cluster)
+    # v1: sentiment/entities left as placeholders (neutral/empty)
+    return cluster

+ 11 - 0
news_mcp/enrichment/importance.py

@@ -0,0 +1,11 @@
+from __future__ import annotations
+
+from typing import Any, Dict
+
+
+def compute_importance(cluster: Dict[str, Any]) -> float:
+    # v1 heuristic: more sources/number of articles => higher importance; capped.
+    sources = len(set(cluster.get("sources", [])))
+    article_count = len(cluster.get("articles", []))
+    score = 0.15 * sources + 0.02 * article_count
+    return min(0.99, round(score, 2))

+ 0 - 0
news_mcp/ingestion/__init__.py


+ 0 - 0
news_mcp/jobs/__init__.py


+ 21 - 0
news_mcp/jobs/poller.py

@@ -0,0 +1,21 @@
+from __future__ import annotations
+
+from typing import Any, Dict
+
+from news_mcp.config import CLUSTERS_TTL_HOURS, DB_PATH
+from news_mcp.dedup.cluster import dedup_and_cluster_articles
+from news_mcp.enrichment.enrich import enrich_cluster
+from news_mcp.sources.rss_breakingthenews import fetch_breakingthenews_articles
+from news_mcp.storage.sqlite_store import SQLiteClusterStore
+
+
+def refresh_clusters(topic: str | None = None, limit: int = 80) -> None:
+    store = SQLiteClusterStore(DB_PATH)
+    articles = fetch_breakingthenews_articles(limit=limit)
+    clustered_by_topic = dedup_and_cluster_articles(articles)
+
+    for t, clusters in clustered_by_topic.items():
+        if topic and t != topic:
+            continue
+        enriched = [enrich_cluster(c) for c in clusters]
+        store.upsert_clusters(enriched, topic=t)

+ 57 - 0
news_mcp/mcp_server_fastmcp.py

@@ -0,0 +1,57 @@
+from __future__ import annotations
+
+from fastapi import FastAPI
+from mcp.server.fastmcp import FastMCP
+from mcp.server.transport_security import TransportSecuritySettings
+
+from news_mcp.config import CLUSTERS_TTL_HOURS, DEFAULT_TOPICS, DB_PATH
+from news_mcp.jobs.poller import refresh_clusters
+from news_mcp.storage.sqlite_store import SQLiteClusterStore
+
+
+mcp = FastMCP(
+    "news-mcp",
+    transport_security=TransportSecuritySettings(enable_dns_rebinding_protection=False),
+)
+
+
+@mcp.tool(description="What is happening right now? Return the latest deduplicated news clusters for a topic.")
+async def get_latest_events(topic: str = "crypto", limit: int = 5):
+    limit = max(1, min(int(limit), 20))
+    # Refresh opportunistically (v1 simple: refresh every call but bounded to small RSS pull)
+    refresh_clusters(topic=topic, limit=50)
+
+    store = SQLiteClusterStore(DB_PATH)
+    clusters = store.get_latest_clusters(topic=topic, ttl_hours=CLUSTERS_TTL_HOURS, limit=limit)
+
+    # Ensure the response is compact and agent-friendly.
+    out = []
+    for c in clusters:
+        out.append(
+            {
+                "cluster_id": c.get("cluster_id"),
+                "headline": c.get("headline"),
+                "summary": c.get("summary"),
+                "entities": c.get("entities", []),
+                "sentiment": c.get("sentiment", "neutral"),
+                "importance": c.get("importance", 0.0),
+                "sources": c.get("sources", []),
+                "timestamp": c.get("timestamp"),
+            }
+        )
+
+    return out
+
+
+app = FastAPI(title="News MCP Server")
+app.mount("/mcp", mcp.sse_app())
+
+
+@app.get("/")
+def root():
+    return {"status": "ok", "transport": "fastmcp+sse", "mount": "/mcp", "tools": ["get_latest_events"]}
+
+
+@app.get("/health")
+def health():
+    return {"status": "ok", "ttl_hours": CLUSTERS_TTL_HOURS, "db": str(DB_PATH)}

+ 0 - 0
news_mcp/sources/__init__.py


+ 58 - 0
news_mcp/sources/rss_breakingthenews.py

@@ -0,0 +1,58 @@
+from __future__ import annotations
+
+import hashlib
+from typing import Any, Dict, List
+
+import feedparser
+
+from news_mcp.config import RSS_FEED_URL
+
+
+def _canonical_url(url: str) -> str:
+    # Minimal canonicalization for v1.
+    return url.strip()
+
+
+def fetch_breakingthenews_articles(limit: int = 50) -> List[Dict[str, Any]]:
+    feed = feedparser.parse(RSS_FEED_URL)
+    articles: List[Dict[str, Any]] = []
+
+    for entry in feed.entries[:limit]:
+        title = str(getattr(entry, "title", "")).strip()
+        url = _canonical_url(str(getattr(entry, "link", "")).strip())
+        source = "BreakingTheNews"
+        timestamp = str(getattr(entry, "published", "")) or str(getattr(entry, "updated", ""))
+        summary = str(getattr(entry, "summary", "")) or str(getattr(entry, "description", ""))
+
+        if not title or not url:
+            continue
+
+        articles.append(
+            {
+                "title": title,
+                "url": url,
+                "source": source,
+                "timestamp": timestamp,
+                "summary": summary,
+            }
+        )
+
+    return articles
+
+
+def normalize_topic_from_title(title: str) -> str:
+    t = title.lower()
+    if any(k in t for k in ["btc", "bitcoin", "eth", "ethereum", "crypto"]):
+        return "crypto"
+    if any(k in t for k in ["rate", "rates", "inflation", "fed", "treasury", "euro"]):
+        return "macro"
+    if any(k in t for k in ["regulation", "sec", "ban", "law"]):
+        return "regulation"
+    if any(k in t for k in ["ai", "llm", "model", "openai", "anthropic"]):
+        return "ai"
+    return "other"
+
+
+def cluster_id_for_title(topic: str, title: str) -> str:
+    key = f"{topic}|{title.strip().lower()}"
+    return hashlib.sha1(key.encode("utf-8")).hexdigest()

+ 0 - 0
news_mcp/storage/__init__.py


+ 65 - 0
news_mcp/storage/sqlite_store.py

@@ -0,0 +1,65 @@
+from __future__ import annotations
+
+import json
+import sqlite3
+from dataclasses import dataclass
+from datetime import datetime, timezone, timedelta
+from pathlib import Path
+from typing import Any
+
+
+@dataclass
+class ClusterRow:
+    cluster_id: str
+    topic: str
+    payload: dict
+    updated_at: datetime
+
+
+class SQLiteClusterStore:
+    def __init__(self, db_path: str | Path):
+        self.db_path = str(db_path)
+        self._init_db()
+
+    def _conn(self) -> sqlite3.Connection:
+        return sqlite3.connect(self.db_path)
+
+    def _init_db(self) -> None:
+        Path(self.db_path).parent.mkdir(parents=True, exist_ok=True)
+        with self._conn() as conn:
+            conn.execute(
+                """
+                CREATE TABLE IF NOT EXISTS clusters (
+                  cluster_id TEXT PRIMARY KEY,
+                  topic TEXT NOT NULL,
+                  payload TEXT NOT NULL,
+                  updated_at TEXT NOT NULL
+                )
+                """
+            )
+            conn.execute(
+                "CREATE INDEX IF NOT EXISTS idx_clusters_topic ON clusters(topic)"
+            )
+
+    def upsert_clusters(self, clusters: list[dict], topic: str) -> None:
+        now = datetime.now(timezone.utc)
+        with self._conn() as conn:
+            for c in clusters:
+                cluster_id = c["cluster_id"]
+                payload = json.dumps(c, ensure_ascii=False)
+                conn.execute(
+                    "INSERT INTO clusters(cluster_id, topic, payload, updated_at) VALUES(?,?,?,?) "
+                    "ON CONFLICT(cluster_id) DO UPDATE SET topic=excluded.topic, payload=excluded.payload, updated_at=excluded.updated_at",
+                    (cluster_id, topic, payload, now.isoformat()),
+                )
+
+    def get_latest_clusters(self, topic: str, ttl_hours: float, limit: int) -> list[dict]:
+        cutoff = datetime.now(timezone.utc) - timedelta(hours=ttl_hours)
+        cutoff_iso = cutoff.isoformat()
+        with self._conn() as conn:
+            cur = conn.execute(
+                "SELECT payload FROM clusters WHERE topic=? AND updated_at >= ? ORDER BY updated_at DESC LIMIT ?",
+                (topic, cutoff_iso, int(limit)),
+            )
+            rows = [json.loads(r[0]) for r in cur.fetchall()]
+        return rows

+ 7 - 0
requirements.txt

@@ -0,0 +1,7 @@
+fastapi>=0.111.0
+uvicorn[standard]>=0.23
+mcp>=1.0.0
+httpx>=0.27.0
+python-dateutil>=2.9.0.post0
+feedparser>=6.0.11
+pydantic>=2.7.0

+ 4 - 0
restart.sh

@@ -0,0 +1,4 @@
+#!/usr/bin/env bash
+set -euo pipefail
+./killserver.sh
+./run.sh

+ 27 - 0
run.sh

@@ -0,0 +1,27 @@
+#!/usr/bin/env bash
+set -euo pipefail
+
+PORT=${PORT:-8506}
+APP_MODULE=${APP_MODULE:-news_mcp.mcp_server_fastmcp:app}
+LOGFILE=${LOGFILE:-uvicorn.log}
+PIDFILE=${PIDFILE:-server.pid}
+
+mkdir -p "$(dirname "$LOGFILE")"
+
+if [ -f "$PIDFILE" ] && ps -p "$(cat "$PIDFILE" 2>/dev/null)" > /dev/null 2>&1; then
+  echo "Server already running (PID $(cat "$PIDFILE"))"
+  exit 0
+fi
+
+UVICORN_BIN="${UVICORN_BIN:-}"
+if [ -z "$UVICORN_BIN" ]; then
+  if [ -x ".venv/bin/uvicorn" ]; then
+    UVICORN_BIN=".venv/bin/uvicorn"
+  else
+    UVICORN_BIN="uvicorn"
+  fi
+fi
+
+nohup "$UVICORN_BIN" "$APP_MODULE" --host 0.0.0.0 --port "$PORT" > "$LOGFILE" 2>&1 &
+echo $! > "$PIDFILE"
+echo "Uvicorn started on port $PORT (PID $(cat "$PIDFILE"))"

+ 7 - 0
tests.sh

@@ -0,0 +1,7 @@
+#!/usr/bin/env bash
+set -euo pipefail
+cd "$(dirname "$0")"
+if [ -f ".venv/bin/activate" ]; then
+  .venv/bin/activate
+fi
+python -m pytest -q || true