Sfoglia il codice sorgente

Polish news-mcp docs + add emerging topics and tests

Lukas Goldschmidt 1 mese fa
parent
commit
600fcdbd55

+ 29 - 0
PROJECT.md

@@ -0,0 +1,29 @@
+# Project: news-mcp
+
+## Goal
+Provide a signal-extraction MCP server that converts RSS into **deduplicated, enriched news clusters** that are easy for agents to use.
+
+## Current architecture (v1)
+- FastMCP SSE server mounted at `/mcp`
+- SQLite cache for clusters + Groq summary caches
+- RSS fetch (breakingthenews.net)
+- v1 dedup via fuzzy title similarity
+- Groq enrichment (topic/entities/sentiment/keywords)
+- Tools expose semantic queries over cached clusters
+
+## MCP tools (current)
+- `get_latest_events(topic, limit)`
+- `get_events_for_entity(entity, limit)`
+- `get_event_summary(event_id)`
+- `detect_emerging_topics(limit)`
+
+## Refresh & caching
+- Background refresh every `NEWS_REFRESH_INTERVAL_SECONDS` (default 900s)
+- Feed-hash skipping to avoid redundant RSS+Groq work
+- Cluster TTL (`NEWS_CLUSTERS_TTL_HOURS` via `CLUSTERS_TTL_HOURS`)
+- Summary caching for `get_event_summary`
+
+## Definition of “committable”
+- Tests pass offline (dedup/storage unit tests)
+- Server exposes tool surface with valid schemas
+- Caching prevents repeated Groq calls for unchanged clusters

+ 30 - 7
README.md

@@ -1,24 +1,47 @@
 # 📰 News MCP Server
 
-FastMCP-based MCP server exposing deduplicated, topic-aware news clusters.
+FastMCP-based MCP server that turns RSS into **deduplicated, enriched news clusters**.
 
 ## Quick start
 
 ```bash
 cd news-mcp
-python -m venv .venv || true
 source .venv/bin/activate
 pip install -r requirements.txt
 ./run.sh
 ```
 
-Default URL:
+Default SSE mount (FastMCP):
 - `http://127.0.0.1:8506/mcp/sse`
 
-## Tool
+Health:
+- `http://127.0.0.1:8506/health`
 
-- `get_latest_events(topic, limit)`
+## What this server provides
+- Fetches RSS from `https://breakingthenews.net/news-feed.xml`
+- Deduplicates articles into clusters (v1 fuzzy title similarity)
+- Enriches clusters with Groq (topic/entities/sentiment/keywords)
+- Caches clusters + Groq fields in SQLite
 
-## Source
+## Tools (MCP)
 
-- RSS: https://breakingthenews.net/news-feed.xml
+1) `get_latest_events(topic, limit)`
+- `topic` is a coarse category: `crypto | macro | regulation | ai | other`
+
+2) `get_events_for_entity(entity, limit)`
+- substring, case-insensitive match over extracted `entities`
+
+3) `get_event_summary(event_id)`
+- Groq-written compressed narrative for a given `cluster_id`
+
+4) `detect_emerging_topics(limit)`
+- derives “emerging” signals from recent cached clusters
+
+## Configuration
+
+See `news-mcp/.env`.
+Key variables:
+- `GROQ_API_KEY`, `GROQ_MODEL`, `GROQ_DEBUG`
+- `NEWS_REFRESH_INTERVAL_SECONDS` (default 900)
+- `NEWS_BACKGROUND_REFRESH_ON_START` (default true)
+- `NEWS_CLUSTERS_TTL_HOURS`

+ 5 - 0
killserver.sh

@@ -16,3 +16,8 @@ if [ -f "$PIDFILE" ]; then
   stop_pid "$PID"
   rm -f "$PIDFILE"
 fi
+
+# Sweep up stale server processes started from this project.
+for pid in $(pgrep -f 'news_mcp\.mcp_server_fastmcp:app|uvicorn .*news_mcp\.mcp_server_fastmcp:app' || true); do
+  stop_pid "$pid"
+done

+ 22 - 0
news_mcp/config.py

@@ -1,6 +1,12 @@
 import os
 from pathlib import Path
 
+from dotenv import load_dotenv
+
+# Load .env from project folder so Groq/debug flags are available under uvicorn/nohup.
+_HERE = Path(__file__).resolve().parent.parent
+load_dotenv(_HERE / ".env")
+
 DATA_DIR = Path(os.getenv("NEWS_MCP_DATA_DIR", Path(__file__).resolve().parent / "data"))
 DATA_DIR.mkdir(parents=True, exist_ok=True)
 
@@ -12,3 +18,19 @@ RSS_FEED_URL = os.getenv("NEWS_RSS_FEED_URL", "https://breakingthenews.net/news-
 CLUSTERS_TTL_HOURS = float(os.getenv("NEWS_CLUSTERS_TTL_HOURS", "24"))
 
 DEFAULT_TOPICS = ["crypto", "macro", "regulation", "ai", "other"]
+
+# Optional LLM enrichment (Groq)
+GROQ_API_KEY = os.getenv("GROQ_API_KEY")
+GROQ_MODEL = os.getenv("GROQ_MODEL", "llama4-16e")
+GROQ_DEBUG = os.getenv("GROQ_DEBUG", "false").lower() == "true"
+
+# Cost control: only enrich clusters whose heuristic topic is "other" by default.
+GROQ_ENRICH_OTHER_ONLY = os.getenv("GROQ_ENRICH_OTHER_ONLY", "true").lower() == "true"
+
+# Limit enriched clusters per refresh call.
+GROQ_MAX_CLUSTERS_PER_REFRESH = int(os.getenv("GROQ_MAX_CLUSTERS_PER_REFRESH", "20"))
+
+# Background refresh
+NEWS_REFRESH_INTERVAL_SECONDS = int(os.getenv("NEWS_REFRESH_INTERVAL_SECONDS", "900"))
+NEWS_BACKGROUND_REFRESH_ENABLED = os.getenv("NEWS_BACKGROUND_REFRESH_ENABLED", "true").lower() == "true"
+NEWS_BACKGROUND_REFRESH_ON_START = os.getenv("NEWS_BACKGROUND_REFRESH_ON_START", "true").lower() == "true"

+ 63 - 29
news_mcp/dedup/cluster.py

@@ -1,45 +1,79 @@
 from __future__ import annotations
 
-from typing import Any, Dict, List
+from typing import Any, Dict, List, Tuple
 
-from news_mcp.sources.rss_breakingthenews import cluster_id_for_title, normalize_topic_from_title
+from news_mcp.sources.rss_breakingthenews import normalize_topic_from_title
 
+import re
+from difflib import SequenceMatcher
 
-def dedup_and_cluster_articles(articles: List[Dict[str, Any]]) -> Dict[str, List[Dict[str, Any]]]:
-    """v1 dedup: cluster by normalized title hash per topic.
 
-    Returns topic -> clusters[]
+def _normalize_title(title: str) -> str:
+    t = title.lower().strip()
+    # Remove punctuation-ish characters for similarity scoring.
+    t = re.sub(r"[^a-z0-9\s]", " ", t)
+    t = re.sub(r"\s+", " ", t).strip()
+    return t
+
+
+def _title_similarity(a: str, b: str) -> float:
+    return SequenceMatcher(None, _normalize_title(a), _normalize_title(b)).ratio()
+
+
+def dedup_and_cluster_articles(
+    articles: List[Dict[str, Any]],
+    similarity_threshold: float = 0.87,
+) -> Dict[str, List[Dict[str, Any]]]:
+    """v1 dedup: fuzzy title similarity per topic.
+
+    Instead of strict hashing, we merge clusters whose normalized titles are
+    similar enough. This helps create richer clusters (multiple sources/articles)
+    and therefore better importance.
     """
-    by_topic: Dict[str, Dict[str, Dict[str, Any]]] = {}
+
+    by_topic: Dict[str, List[Dict[str, Any]]] = {}
 
     for a in articles:
         title = a["title"]
         topic = normalize_topic_from_title(title)
-        cid = cluster_id_for_title(topic, title)
-
-        by_topic.setdefault(topic, {})
-        cluster_map = by_topic[topic]
-        if cid not in cluster_map:
-            cluster_map[cid] = {
-                "cluster_id": cid,
-                "headline": title,
-                "summary": a.get("summary", ""),
-                "entities": [],
-                "sentiment": "neutral",
-                "importance": 0.0,
-                "sources": [a["source"]],
-                "timestamp": a["timestamp"],
-                "articles": [a],
-                "first_seen": a["timestamp"],
-                "last_updated": a["timestamp"],
-            }
-        else:
-            c = cluster_map[cid]
+
+        by_topic.setdefault(topic, [])
+        clusters = by_topic[topic]
+
+        best_idx: int | None = None
+        best_sim = 0.0
+        for idx, c in enumerate(clusters):
+            sim = _title_similarity(title, c.get("headline", ""))
+            if sim > best_sim:
+                best_sim = sim
+                best_idx = idx
+
+        if best_idx is not None and best_sim >= similarity_threshold:
+            c = clusters[best_idx]
             c["articles"].append(a)
             if a["source"] not in c["sources"]:
                 c["sources"].append(a["source"])
-
-            # Keep latest timestamp as last_updated (v1 heuristic)
             c["last_updated"] = max(str(c["last_updated"]), str(a["timestamp"]))
+        else:
+            # Stable-ish cluster id: based on topic + normalized canonical title.
+            import hashlib
+
+            key = f"{topic}|{_normalize_title(title)}"
+            cid = hashlib.sha1(key.encode("utf-8")).hexdigest()
+            clusters.append(
+                {
+                    "cluster_id": cid,
+                    "headline": title,
+                    "summary": a.get("summary", ""),
+                    "entities": [],
+                    "sentiment": "neutral",
+                    "importance": 0.0,
+                    "sources": [a["source"]],
+                    "timestamp": a["timestamp"],
+                    "articles": [a],
+                    "first_seen": a["timestamp"],
+                    "last_updated": a["timestamp"],
+                }
+            )
 
-    return {topic: list(clusters.values()) for topic, clusters in by_topic.items()}
+    return {topic: clusters for topic, clusters in by_topic.items()}

+ 194 - 0
news_mcp/enrichment/groq_enrich.py

@@ -0,0 +1,194 @@
+from __future__ import annotations
+
+import json
+import logging
+from typing import Any, Dict, List
+
+import httpx
+
+from news_mcp.config import GROQ_API_KEY, GROQ_MODEL, GROQ_DEBUG
+
+
+logger = logging.getLogger(__name__)
+
+
+_SYSTEM = "You are a news signal extraction engine. Return STRICT JSON only."
+
+
+def _build_prompt(articles: List[Dict[str, Any]], headline: str, summary: str | None) -> str:
+    # Keep prompt compact: clusters already deduped.
+    sample = articles[:6]
+    return json.dumps(
+        {
+            "cluster": {
+                "headline": headline,
+                "summary": summary or "",
+                "articles": [
+                    {
+                        "title": a.get("title"),
+                        "url": a.get("url"),
+                        "source": a.get("source"),
+                        "timestamp": a.get("timestamp"),
+                        "summary": a.get("summary", ""),
+                    }
+                    for a in sample
+                ],
+            }
+        },
+        ensure_ascii=False,
+    )
+
+
+async def classify_cluster_groq(cluster: Dict[str, Any]) -> Dict[str, Any]:
+    if not GROQ_API_KEY:
+        # No enrichment configured.
+        return cluster
+
+    headline = cluster.get("headline", "")
+    summary = cluster.get("summary", "")
+    articles = cluster.get("articles", [])
+
+    user_payload = _build_prompt(articles=articles, headline=headline, summary=summary)
+
+    prompt = (
+        f"Input cluster JSON:\n{user_payload}\n\n"
+        "You MUST extract a news signal from the headline AND summary. Do not leave entities empty when the text mentions obvious names.\n"
+        "Task:\n"
+        "1) infer the best top-level topic\n"
+        "2) extract concise entities from the cluster\n"
+        "3) assign sentiment from the wording/context\n"
+        "4) provide short keywords that justify the classification\n\n"
+        "Entity rules (strict):\n"
+        "- Use short strings (1-5 words).\n"
+        "- Include all obvious named entities mentioned in headline or summary: people, countries, regions, organizations, ministries, presidents, leaders, wars/conflicts if named.\n"
+        "- Also include finance/crypto entities when present: BTC, ETH, Bitcoin, Ethereum, ETF, SEC, ECB, Fed, euro, inflation, rates.\n"
+        "- If the cluster mentions Iran, UAE, Egypt, Germany, Europe, Trump, Merz, Sisi, those should appear in entities.\n"
+        "- Do NOT return empty entities if any such names/places appear.\n\n"
+        "Sentiment rules:\n"
+        "- positive: clearly encouraging, improving, or supportive tone\n"
+        "- negative: clearly alarming, worsening, severe, conflict, loss, risk, warning tone\n"
+        "- neutral: factual, balanced, or mixed\n"
+        "- sentimentScore must be a number from -1.0 to 1.0 and should reflect the sentiment label.\n\n"
+        "Return STRICT JSON with EXACT keys only:\n"
+        "{ topic, entities, sentiment, sentimentScore, keywords }\n"
+        "where topic is one of [crypto, macro, regulation, ai, other].\n"
+    )
+
+    if GROQ_DEBUG:
+        msg = f"[GROQ PROMPT] {prompt}"
+        logger.warning(msg)
+        print(msg, flush=True)
+
+    req = {
+        "model": GROQ_MODEL,
+        "messages": [
+            {"role": "system", "content": _SYSTEM},
+            {"role": "user", "content": prompt},
+        ],
+        "temperature": 0.2,
+        "response_format": {"type": "json_object"},
+    }
+
+    async with httpx.AsyncClient(timeout=30.0) as client:
+        resp = await client.post(
+            "https://api.groq.com/openai/v1/chat/completions",
+            headers={"Authorization": f"Bearer {GROQ_API_KEY}"},
+            json=req,
+        )
+        resp.raise_for_status()
+        data = resp.json()
+
+    content = data["choices"][0]["message"]["content"]
+
+    if GROQ_DEBUG:
+        msg = f"[GROQ RAW RESPONSE] {content}"
+        logger.warning(msg)
+        print(msg, flush=True)
+
+    parsed = json.loads(content)
+
+    # Normalize output types into our cluster shape.
+    topic = parsed.get("topic") or cluster.get("topic")
+    entities = parsed.get("entities") or []
+    sentiment = parsed.get("sentiment") or "neutral"
+    sentiment_score = parsed.get("sentimentScore")
+    keywords = parsed.get("keywords") or []
+
+    out = dict(cluster)
+    if topic:
+        out["topic"] = topic
+    out["entities"] = entities
+    out["sentiment"] = sentiment
+    if sentiment_score is not None:
+        out["sentimentScore"] = float(sentiment_score)
+    out["keywords"] = keywords
+    return out
+
+
+async def summarize_cluster_groq(cluster: Dict[str, Any]) -> Dict[str, Any]:
+    """Produce a compact agent-facing summary.
+
+    Returns:
+      {
+        "headline": str,
+        "mergedSummary": str,
+        "keyFacts": [str,...],
+        "sources": [str,...]
+      }
+    """
+    if not GROQ_API_KEY:
+        return {
+            "headline": cluster.get("headline"),
+            "mergedSummary": cluster.get("summary"),
+            "keyFacts": [],
+            "sources": cluster.get("sources", []),
+        }
+
+    headline = cluster.get("headline", "")
+    summary = cluster.get("summary", "")
+    articles = cluster.get("articles", [])
+
+    sample = articles[:5]
+    req = {
+        "model": GROQ_MODEL,
+        "messages": [
+            {
+                "role": "system",
+                "content": "You are a summarization engine for news clusters. Return strict JSON only.",
+            },
+            {
+                "role": "user",
+                "content": json.dumps(
+                    {
+                        "headline": headline,
+                        "summary": summary,
+                        "articles": [
+                            {
+                                "title": a.get("title"),
+                                "url": a.get("url"),
+                                "source": a.get("source"),
+                                "timestamp": a.get("timestamp"),
+                            }
+                            for a in sample
+                        ],
+                    },
+                    ensure_ascii=False,
+                )
+                + "\n\nReturn keys: headline, mergedSummary, keyFacts (5-8 strings), sources. mergedSummary should be 2-4 sentences.",
+            },
+        ],
+        "temperature": 0.2,
+        "response_format": {"type": "json_object"},
+    }
+
+    async with httpx.AsyncClient(timeout=45.0) as client:
+        resp = await client.post(
+            "https://api.groq.com/openai/v1/chat/completions",
+            headers={"Authorization": f"Bearer {GROQ_API_KEY}"},
+            json=req,
+        )
+        resp.raise_for_status()
+        data = resp.json()
+    content = data["choices"][0]["message"]["content"]
+    parsed = json.loads(content)
+    return parsed

+ 44 - 2
news_mcp/jobs/poller.py

@@ -5,17 +5,59 @@ from typing import Any, Dict
 from news_mcp.config import CLUSTERS_TTL_HOURS, DB_PATH
 from news_mcp.dedup.cluster import dedup_and_cluster_articles
 from news_mcp.enrichment.enrich import enrich_cluster
+from news_mcp.enrichment.groq_enrich import classify_cluster_groq
 from news_mcp.sources.rss_breakingthenews import fetch_breakingthenews_articles
 from news_mcp.storage.sqlite_store import SQLiteClusterStore
 
+from news_mcp.config import GROQ_ENRICH_OTHER_ONLY, GROQ_MAX_CLUSTERS_PER_REFRESH
 
-def refresh_clusters(topic: str | None = None, limit: int = 80) -> None:
+
+async def refresh_clusters(topic: str | None = None, limit: int = 80) -> None:
     store = SQLiteClusterStore(DB_PATH)
+
     articles = fetch_breakingthenews_articles(limit=limit)
+
+    # Skip expensive work if the feed content (titles/urls/timestamps) didn't change.
+    import hashlib
+    feed_key = "breakingthenews"  # v1: single feed
+    material = "\n".join(
+        f"{a.get('title','')}|{a.get('url','')}|{a.get('timestamp','')}"
+        for a in articles
+    )
+    last_hash = hashlib.sha1(material.encode("utf-8")).hexdigest()
+    prev_hash = store.get_feed_hash(feed_key)
+    if prev_hash == last_hash:
+        return
+    store.set_feed_hash(feed_key, last_hash)
     clustered_by_topic = dedup_and_cluster_articles(articles)
 
     for t, clusters in clustered_by_topic.items():
         if topic and t != topic:
             continue
-        enriched = [enrich_cluster(c) for c in clusters]
+        enriched = []
+        # Always compute cheap enrichment first.
+        for idx, c in enumerate(clusters[:GROQ_MAX_CLUSTERS_PER_REFRESH]):
+            c2 = enrich_cluster(c)
+
+            # Groq enrichment only when configured.
+            if (not GROQ_ENRICH_OTHER_ONLY) or (t == "other"):
+                # Cache Groq: if we already have entities/sentiment for this cluster, skip.
+                existing = store.get_cluster_by_id(c2.get("cluster_id"))
+                if existing and existing.get("entities"):
+                    c2 = dict(c2)
+                    # Keep existing enriched fields.
+                    c2["entities"] = existing.get("entities", [])
+                    if existing.get("sentiment"):
+                        c2["sentiment"] = existing.get("sentiment")
+                    if existing.get("sentimentScore") is not None:
+                        c2["sentimentScore"] = existing.get("sentimentScore")
+                    if existing.get("keywords"):
+                        c2["keywords"] = existing.get("keywords")
+                else:
+                    c2 = await classify_cluster_groq(c2)
+
+            enriched.append(c2)
+
         store.upsert_clusters(enriched, topic=t)
+
+            

+ 180 - 6
news_mcp/mcp_server_fastmcp.py

@@ -5,8 +5,10 @@ from mcp.server.fastmcp import FastMCP
 from mcp.server.transport_security import TransportSecuritySettings
 
 from news_mcp.config import CLUSTERS_TTL_HOURS, DEFAULT_TOPICS, DB_PATH
+from news_mcp.config import NEWS_REFRESH_INTERVAL_SECONDS, NEWS_BACKGROUND_REFRESH_ENABLED, NEWS_BACKGROUND_REFRESH_ON_START
 from news_mcp.jobs.poller import refresh_clusters
 from news_mcp.storage.sqlite_store import SQLiteClusterStore
+from news_mcp.enrichment.groq_enrich import summarize_cluster_groq
 
 
 mcp = FastMCP(
@@ -18,15 +20,25 @@ mcp = FastMCP(
 @mcp.tool(description="What is happening right now? Return the latest deduplicated news clusters for a topic.")
 async def get_latest_events(topic: str = "crypto", limit: int = 5):
     limit = max(1, min(int(limit), 20))
-    # Refresh opportunistically (v1 simple: refresh every call but bounded to small RSS pull)
-    refresh_clusters(topic=topic, limit=50)
-
+    # In v1, `topic` is a coarse category. If the caller passes an entity name
+    # (e.g. "trump"/"iran"), gracefully fall back to `other`.
+    topic_norm = str(topic).strip().lower()
+    allowed = {t.lower() for t in DEFAULT_TOPICS}
+    if topic_norm not in allowed:
+        topic_norm = "other"
     store = SQLiteClusterStore(DB_PATH)
-    clusters = store.get_latest_clusters(topic=topic, ttl_hours=CLUSTERS_TTL_HOURS, limit=limit)
+
+    # Cache-first: only refresh if we currently have no fresh clusters for this topic.
+    clusters = store.get_latest_clusters(topic=topic_norm, ttl_hours=CLUSTERS_TTL_HOURS, limit=limit)
+    if not clusters:
+        await refresh_clusters(topic=topic_norm, limit=200)
+        clusters = store.get_latest_clusters(topic=topic_norm, ttl_hours=CLUSTERS_TTL_HOURS, limit=limit)
 
     # Ensure the response is compact and agent-friendly.
+    clusters_sorted = sorted(clusters, key=lambda x: float(x.get("importance", 0.0)), reverse=True)
+
     out = []
-    for c in clusters:
+    for c in clusters_sorted:
         out.append(
             {
                 "cluster_id": c.get("cluster_id"),
@@ -43,10 +55,166 @@ async def get_latest_events(topic: str = "crypto", limit: int = 5):
     return out
 
 
+@mcp.tool(description="What's happening with X? Filter latest clusters by extracted entity substring (case-insensitive).")
+async def get_events_for_entity(entity: str, limit: int = 10):
+    limit = max(1, min(int(limit), 30))
+    query = str(entity).strip().lower()
+    if not query:
+        return []
+
+    # Cache-first: search recent clusters across all topics.
+    store = SQLiteClusterStore(DB_PATH)
+    clusters = store.get_latest_clusters_all_topics(ttl_hours=CLUSTERS_TTL_HOURS, limit=limit * 5)
+
+    hits = []
+    for c in clusters:
+        ents = c.get("entities") or []
+        if any(query in str(e).lower() for e in ents):
+            hits.append(c)
+        if len(hits) >= limit:
+            break
+
+    # Compress to tool response shape.
+    out = []
+    for c in hits:
+        out.append(
+            {
+                "cluster_id": c.get("cluster_id"),
+                "headline": c.get("headline"),
+                "summary": c.get("summary"),
+                "entities": c.get("entities", []),
+                "sentiment": c.get("sentiment", "neutral"),
+                "importance": c.get("importance", 0.0),
+                "sources": c.get("sources", []),
+                "timestamp": c.get("timestamp"),
+            }
+        )
+    return out
+
+
+@mcp.tool(description="Explain an event clearly by cluster_id (Groq summary).")
+async def get_event_summary(event_id: str):
+    store = SQLiteClusterStore(DB_PATH)
+
+    # Summary cache: reuse if present within TTL.
+    cached_summary = store.get_cluster_summary(
+        cluster_id=event_id,
+        ttl_hours=CLUSTERS_TTL_HOURS,
+    )
+    if cached_summary:
+        return {
+            "event_id": event_id,
+            "headline": cached_summary.get("headline"),
+            "mergedSummary": cached_summary.get("mergedSummary"),
+            "keyFacts": cached_summary.get("keyFacts", []),
+            "sources": cached_summary.get("sources", []),
+        }
+
+    cluster = store.get_cluster_by_id(event_id)
+    if not cluster:
+        return {
+            "event_id": event_id,
+            "error": "NOT_FOUND",
+        }
+
+    summary = await summarize_cluster_groq(cluster)
+
+    store.upsert_cluster_summary(event_id, summary)
+    return {
+        "event_id": event_id,
+        "headline": summary.get("headline"),
+        "mergedSummary": summary.get("mergedSummary"),
+        "keyFacts": summary.get("keyFacts", []),
+        "sources": summary.get("sources", []),
+    }
+
+
+@mcp.tool(description="Detect emerging topics/entities from recent cached news clusters.")
+async def detect_emerging_topics(limit: int = 10):
+    limit = max(1, min(int(limit), 20))
+    store = SQLiteClusterStore(DB_PATH)
+    clusters = store.get_latest_clusters_all_topics(ttl_hours=CLUSTERS_TTL_HOURS, limit=200)
+
+    from collections import Counter
+    import re
+
+    entity_counts = Counter()
+    phrase_counts = Counter()
+    topic_counts = Counter()
+
+    for c in clusters:
+        topic_counts[c.get("topic", "other")] += 1
+        for ent in c.get("entities", []) or []:
+            key = str(ent).strip().lower()
+            if key:
+                entity_counts[key] += 1
+
+        text = f"{c.get('headline','')} {c.get('summary','')}"
+        words = [w for w in re.findall(r"[A-Za-z][A-Za-z0-9\-]{2,}", text.lower())]
+        for i in range(len(words) - 1):
+            phrase = f"{words[i]} {words[i+1]}"
+            if len(phrase) > 6:
+                phrase_counts[phrase] += 1
+
+    emerging = []
+    for ent, count in entity_counts.most_common(limit):
+        emerging.append({
+            "topic": ent,
+            "trend_score": min(0.99, round(0.25 + 0.15 * count, 2)),
+            "related_entities": [ent],
+            "signal_type": "entity",
+            "count": count,
+        })
+
+    for phrase, count in phrase_counts.most_common(limit * 2):
+        if any(item["topic"] == phrase for item in emerging):
+            continue
+        emerging.append({
+            "topic": phrase.title(),
+            "trend_score": min(0.99, round(0.20 + 0.10 * count, 2)),
+            "related_entities": [],
+            "signal_type": "phrase",
+            "count": count,
+        })
+        if len(emerging) >= limit:
+            break
+
+    return emerging[:limit]
+
+
 app = FastAPI(title="News MCP Server")
 app.mount("/mcp", mcp.sse_app())
 
 
+_background_task_started = False
+
+
+@app.on_event("startup")
+async def _start_background_refresh():
+    global _background_task_started
+    if _background_task_started:
+        return
+    if not NEWS_BACKGROUND_REFRESH_ENABLED:
+        return
+    _background_task_started = True
+
+    async def _loop():
+        if not NEWS_BACKGROUND_REFRESH_ON_START:
+            await asyncio.sleep(float(NEWS_REFRESH_INTERVAL_SECONDS))
+        while True:
+            try:
+                # Refresh all topics by passing topic=None
+                await refresh_clusters(topic=None, limit=200)
+            except Exception:
+                # Avoid crashing the server on network errors.
+                pass
+            await asyncio.sleep(float(NEWS_REFRESH_INTERVAL_SECONDS))
+
+    import asyncio
+
+    asyncio.create_task(_loop())
+
+
 @app.get("/")
 def root():
     return {"status": "ok", "transport": "fastmcp+sse", "mount": "/mcp", "tools": ["get_latest_events"]}
@@ -54,4 +222,10 @@ def root():
 
 @app.get("/health")
 def health():
-    return {"status": "ok", "ttl_hours": CLUSTERS_TTL_HOURS, "db": str(DB_PATH)}
+    store = SQLiteClusterStore(DB_PATH)
+    return {
+        "status": "ok",
+        "ttl_hours": CLUSTERS_TTL_HOURS,
+        "db": str(DB_PATH),
+        "refresh": store.get_feed_state("breakingthenews"),
+    }

+ 109 - 1
news_mcp/storage/sqlite_store.py

@@ -33,14 +33,38 @@ class SQLiteClusterStore:
                   cluster_id TEXT PRIMARY KEY,
                   topic TEXT NOT NULL,
                   payload TEXT NOT NULL,
-                  updated_at TEXT NOT NULL
+                  updated_at TEXT NOT NULL,
+                  summary_payload TEXT,
+                  summary_updated_at TEXT
                 )
                 """
             )
+
+            # If the table already exists without the summary columns,
+            # add them (SQLite-friendly incremental migrations).
+            for col_def in [
+                "summary_payload TEXT",
+                "summary_updated_at TEXT",
+            ]:
+                col = col_def.split()[0]
+                try:
+                    conn.execute(f"ALTER TABLE clusters ADD COLUMN {col_def}")
+                except sqlite3.OperationalError:
+                    pass
             conn.execute(
                 "CREATE INDEX IF NOT EXISTS idx_clusters_topic ON clusters(topic)"
             )
 
+            conn.execute(
+                """
+                CREATE TABLE IF NOT EXISTS feed_state (
+                  feed_key TEXT PRIMARY KEY,
+                  last_hash TEXT NOT NULL,
+                  updated_at TEXT NOT NULL
+                )
+                """
+            )
+
     def upsert_clusters(self, clusters: list[dict], topic: str) -> None:
         now = datetime.now(timezone.utc)
         with self._conn() as conn:
@@ -53,6 +77,42 @@ class SQLiteClusterStore:
                     (cluster_id, topic, payload, now.isoformat()),
                 )
 
+    def upsert_cluster_summary(
+        self,
+        cluster_id: str,
+        summary_payload: dict,
+    ) -> None:
+        now = datetime.now(timezone.utc).isoformat()
+        with self._conn() as conn:
+            conn.execute(
+                "INSERT INTO clusters(cluster_id, topic, payload, updated_at, summary_payload, summary_updated_at) "
+                "VALUES(?,?,?,?,?,?) "
+                "ON CONFLICT(cluster_id) DO UPDATE SET "
+                "summary_payload=excluded.summary_payload, summary_updated_at=excluded.summary_updated_at",
+                (
+                    cluster_id,
+                    "",  # topic not used for update
+                    json.dumps({}, ensure_ascii=False),
+                    now,
+                    json.dumps(summary_payload, ensure_ascii=False),
+                    now,
+                ),
+            )
+
+    def get_cluster_summary(self, cluster_id: str, ttl_hours: float) -> dict | None:
+        cutoff = datetime.now(timezone.utc) - timedelta(hours=ttl_hours)
+        cutoff_iso = cutoff.isoformat()
+        with self._conn() as conn:
+            cur = conn.execute(
+                "SELECT summary_payload, summary_updated_at FROM clusters "
+                "WHERE cluster_id=? AND summary_updated_at >= ?",
+                (cluster_id, cutoff_iso),
+            )
+            row = cur.fetchone()
+            if not row or not row[0]:
+                return None
+            return json.loads(row[0])
+
     def get_latest_clusters(self, topic: str, ttl_hours: float, limit: int) -> list[dict]:
         cutoff = datetime.now(timezone.utc) - timedelta(hours=ttl_hours)
         cutoff_iso = cutoff.isoformat()
@@ -63,3 +123,51 @@ class SQLiteClusterStore:
             )
             rows = [json.loads(r[0]) for r in cur.fetchall()]
         return rows
+
+    def get_latest_clusters_all_topics(self, ttl_hours: float, limit: int) -> list[dict]:
+        cutoff = datetime.now(timezone.utc) - timedelta(hours=ttl_hours)
+        cutoff_iso = cutoff.isoformat()
+        with self._conn() as conn:
+            cur = conn.execute(
+                "SELECT payload FROM clusters WHERE updated_at >= ? ORDER BY updated_at DESC LIMIT ?",
+                (cutoff_iso, int(limit)),
+            )
+            return [json.loads(r[0]) for r in cur.fetchall()]
+
+    def get_cluster_by_id(self, cluster_id: str) -> dict | None:
+        with self._conn() as conn:
+            cur = conn.execute(
+                "SELECT payload FROM clusters WHERE cluster_id=?",
+                (cluster_id,),
+            )
+            row = cur.fetchone()
+            return json.loads(row[0]) if row else None
+
+    def get_feed_hash(self, feed_key: str) -> str | None:
+        with self._conn() as conn:
+            cur = conn.execute(
+                "SELECT last_hash FROM feed_state WHERE feed_key=?",
+                (feed_key,),
+            )
+            row = cur.fetchone()
+            return row[0] if row else None
+
+    def set_feed_hash(self, feed_key: str, last_hash: str) -> None:
+        now = datetime.now(timezone.utc).isoformat()
+        with self._conn() as conn:
+            conn.execute(
+                "INSERT INTO feed_state(feed_key, last_hash, updated_at) VALUES(?,?,?) "
+                "ON CONFLICT(feed_key) DO UPDATE SET last_hash=excluded.last_hash, updated_at=excluded.updated_at",
+                (feed_key, last_hash, now),
+            )
+
+    def get_feed_state(self, feed_key: str) -> dict | None:
+        with self._conn() as conn:
+            cur = conn.execute(
+                "SELECT last_hash, updated_at FROM feed_state WHERE feed_key=?",
+                (feed_key,),
+            )
+            row = cur.fetchone()
+            if not row:
+                return None
+            return {"last_hash": row[0], "updated_at": row[1]}

+ 3 - 1
requirements.txt

@@ -4,4 +4,6 @@ mcp>=1.0.0
 httpx>=0.27.0
 python-dateutil>=2.9.0.post0
 feedparser>=6.0.11
-pydantic>=2.7.0
+pydantic>=2.7.0
+python-dotenv>=1.0.1
+pytest>=8.4

+ 73 - 0
test_news_mcp.py

@@ -0,0 +1,73 @@
+from __future__ import annotations
+
+import tempfile
+from pathlib import Path
+
+from news_mcp.dedup.cluster import dedup_and_cluster_articles
+from news_mcp.storage.sqlite_store import SQLiteClusterStore
+
+
+def _article(title: str, url: str = "https://example.com/x", source: str = "Src", ts: str = "Mon, 30 Mar 2026 12:00:00 GMT"):
+    return {
+        "title": title,
+        "url": url,
+        "source": source,
+        "timestamp": ts,
+        "summary": "summary text",
+    }
+
+
+def test_dedup_merges_similar_titles():
+    articles = [
+        _article("Trump warns Iran war could spread"),
+        _article("Trump warns Iran conflict could spread"),
+        _article("Unrelated sports result"),
+    ]
+    clustered = dedup_and_cluster_articles(articles, similarity_threshold=0.75)
+    # We expect the Trump/Iran items to be merged into one cluster in the same topic bucket.
+    total_clusters = sum(len(v) for v in clustered.values())
+    assert total_clusters == 2
+
+
+def test_sqlite_feed_hash_roundtrip():
+    with tempfile.TemporaryDirectory() as td:
+        db = Path(td) / "news.sqlite"
+        store = SQLiteClusterStore(db)
+        assert store.get_feed_hash("breakingthenews") is None
+        store.set_feed_hash("breakingthenews", "abc123")
+        assert store.get_feed_hash("breakingthenews") == "abc123"
+
+
+def test_sqlite_summary_cache_roundtrip():
+    with tempfile.TemporaryDirectory() as td:
+        db = Path(td) / "news.sqlite"
+        store = SQLiteClusterStore(db)
+        # Upsert a base cluster first.
+        store.upsert_clusters([
+            {
+                "cluster_id": "cid1",
+                "headline": "Headline",
+                "summary": "Summary",
+                "entities": ["Iran"],
+                "sentiment": "negative",
+                "importance": 0.5,
+                "sources": ["BreakingTheNews"],
+                "timestamp": "Mon, 30 Mar 2026 12:00:00 GMT",
+                "articles": [],
+                "first_seen": "Mon, 30 Mar 2026 12:00:00 GMT",
+                "last_updated": "Mon, 30 Mar 2026 12:00:00 GMT",
+            }
+        ], topic="other")
+        store.upsert_cluster_summary(
+            "cid1",
+            {
+                "headline": "Headline",
+                "mergedSummary": "Merged summary",
+                "keyFacts": ["Fact 1"],
+                "sources": ["BreakingTheNews"],
+            },
+        )
+        cached = store.get_cluster_summary("cid1", ttl_hours=24)
+        assert cached is not None
+        assert cached["mergedSummary"] == "Merged summary"
+        assert cached["keyFacts"] == ["Fact 1"]

+ 3 - 2
tests.sh

@@ -2,6 +2,7 @@
 set -euo pipefail
 cd "$(dirname "$0")"
 if [ -f ".venv/bin/activate" ]; then
-  .venv/bin/activate
+  # shellcheck disable=SC1091
+  source .venv/bin/activate
 fi
-python -m pytest -q || true
+python -m pytest -q