from __future__ import annotations from fastapi import FastAPI from mcp.server.fastmcp import FastMCP from mcp.server.transport_security import TransportSecuritySettings from news_mcp.config import CLUSTERS_TTL_HOURS, DEFAULT_TOPICS, DB_PATH from news_mcp.config import NEWS_REFRESH_INTERVAL_SECONDS, NEWS_BACKGROUND_REFRESH_ENABLED, NEWS_BACKGROUND_REFRESH_ON_START from news_mcp.jobs.poller import refresh_clusters from news_mcp.storage.sqlite_store import SQLiteClusterStore from news_mcp.enrichment.groq_enrich import summarize_cluster_groq from collections import Counter mcp = FastMCP( "news-mcp", transport_security=TransportSecuritySettings(enable_dns_rebinding_protection=False), ) @mcp.tool(description="What is happening right now? Return the latest deduplicated news clusters for a topic.") async def get_latest_events(topic: str = "crypto", limit: int = 5): limit = max(1, min(int(limit), 20)) # In v1, `topic` is a coarse category. If the caller passes an entity name # (e.g. "trump"/"iran"), gracefully fall back to `other`. topic_norm = str(topic).strip().lower() allowed = {t.lower() for t in DEFAULT_TOPICS} if topic_norm not in allowed: topic_norm = "other" store = SQLiteClusterStore(DB_PATH) # Cache-first: only refresh if we currently have no fresh clusters for this topic. clusters = store.get_latest_clusters(topic=topic_norm, ttl_hours=CLUSTERS_TTL_HOURS, limit=limit) if not clusters: await refresh_clusters(topic=topic_norm, limit=200) clusters = store.get_latest_clusters(topic=topic_norm, ttl_hours=CLUSTERS_TTL_HOURS, limit=limit) # Ensure the response is compact and agent-friendly. clusters_sorted = sorted(clusters, key=lambda x: float(x.get("importance", 0.0)), reverse=True) out = [] for c in clusters_sorted: out.append( { "cluster_id": c.get("cluster_id"), "headline": c.get("headline"), "summary": c.get("summary"), "entities": c.get("entities", []), "sentiment": c.get("sentiment", "neutral"), "importance": c.get("importance", 0.0), "sources": c.get("sources", []), "timestamp": c.get("timestamp"), } ) return out @mcp.tool(description="What's happening with X? Filter latest clusters by extracted entity substring (case-insensitive).") async def get_events_for_entity(entity: str, limit: int = 10): limit = max(1, min(int(limit), 30)) query = str(entity).strip().lower() if not query: return [] # Cache-first: search recent clusters across all topics. store = SQLiteClusterStore(DB_PATH) clusters = store.get_latest_clusters_all_topics(ttl_hours=CLUSTERS_TTL_HOURS, limit=limit * 5) hits = [] for c in clusters: ents = c.get("entities") or [] if any(query in str(e).lower() for e in ents): hits.append(c) if len(hits) >= limit: break # Compress to tool response shape. out = [] for c in hits: out.append( { "cluster_id": c.get("cluster_id"), "headline": c.get("headline"), "summary": c.get("summary"), "entities": c.get("entities", []), "sentiment": c.get("sentiment", "neutral"), "importance": c.get("importance", 0.0), "sources": c.get("sources", []), "timestamp": c.get("timestamp"), } ) return out @mcp.tool(description="Explain an event clearly by cluster_id (Groq summary).") async def get_event_summary(event_id: str): store = SQLiteClusterStore(DB_PATH) # Summary cache: reuse if present within TTL. cached_summary = store.get_cluster_summary( cluster_id=event_id, ttl_hours=CLUSTERS_TTL_HOURS, ) if cached_summary: return { "event_id": event_id, "headline": cached_summary.get("headline"), "mergedSummary": cached_summary.get("mergedSummary"), "keyFacts": cached_summary.get("keyFacts", []), "sources": cached_summary.get("sources", []), } cluster = store.get_cluster_by_id(event_id) if not cluster: return { "event_id": event_id, "error": "NOT_FOUND", } summary = await summarize_cluster_groq(cluster) store.upsert_cluster_summary(event_id, summary) return { "event_id": event_id, "headline": summary.get("headline"), "mergedSummary": summary.get("mergedSummary"), "keyFacts": summary.get("keyFacts", []), "sources": summary.get("sources", []), } @mcp.tool(description="Detect emerging topics/entities from recent cached news clusters.") async def detect_emerging_topics(limit: int = 10): limit = max(1, min(int(limit), 20)) store = SQLiteClusterStore(DB_PATH) clusters = store.get_latest_clusters_all_topics(ttl_hours=CLUSTERS_TTL_HOURS, limit=200) from collections import Counter import re entity_counts = Counter() phrase_counts = Counter() topic_counts = Counter() for c in clusters: topic_counts[c.get("topic", "other")] += 1 for ent in c.get("entities", []) or []: key = str(ent).strip().lower() if key: entity_counts[key] += 1 text = f"{c.get('headline','')} {c.get('summary','')}" words = [w for w in re.findall(r"[A-Za-z][A-Za-z0-9\-]{2,}", text.lower())] for i in range(len(words) - 1): phrase = f"{words[i]} {words[i+1]}" if len(phrase) > 6: phrase_counts[phrase] += 1 emerging = [] for ent, count in entity_counts.most_common(limit): emerging.append({ "topic": ent, "trend_score": min(0.99, round(0.25 + 0.15 * count, 2)), "related_entities": [ent], "signal_type": "entity", "count": count, }) for phrase, count in phrase_counts.most_common(limit * 2): if any(item["topic"] == phrase for item in emerging): continue emerging.append({ "topic": phrase.title(), "trend_score": min(0.99, round(0.20 + 0.10 * count, 2)), "related_entities": [], "signal_type": "phrase", "count": count, }) if len(emerging) >= limit: break return emerging[:limit] @mcp.tool(description="What's the overall sentiment around an entity within a timeframe?") async def get_news_sentiment(entity: str, timeframe: str = "24h"): store = SQLiteClusterStore(DB_PATH) ent = str(entity).strip().lower() if not ent: return { "entity": entity, "sentiment": "neutral", "score": 0.0, "cluster_count": 0, } # timeframe: accept '24h' or '24' tf = str(timeframe).strip().lower() try: hours = int(tf[:-1]) if tf.endswith("h") else int(tf) except Exception: hours = 24 hours = max(1, min(int(hours), 168)) clusters = store.get_latest_clusters_all_topics(ttl_hours=hours, limit=500) matched = [] for c in clusters: ents = c.get("entities") or [] if any(ent in str(e).lower() for e in ents): matched.append(c) if not matched: return { "entity": entity, "sentiment": "neutral", "score": 0.0, "cluster_count": 0, } scores = [] labels = [] for c in matched: s = c.get("sentimentScore") if s is not None: try: scores.append(float(s)) except Exception: pass lbl = c.get("sentiment") if lbl: labels.append(str(lbl).lower()) avg_score = sum(scores) / len(scores) if scores else 0.0 # Majority vote on sentiment label, fall back to sign of avg score. if labels: majority = Counter(labels).most_common(1)[0][0] if majority in {"positive", "negative", "neutral"}: sentiment = majority else: sentiment = "positive" if avg_score > 0 else "negative" if avg_score < 0 else "neutral" else: sentiment = "positive" if avg_score > 0 else "negative" if avg_score < 0 else "neutral" return { "entity": entity, "sentiment": sentiment, "score": round(avg_score, 3), "cluster_count": len(matched), } app = FastAPI(title="News MCP Server") app.mount("/mcp", mcp.sse_app()) _background_task_started = False @app.on_event("startup") async def _start_background_refresh(): global _background_task_started if _background_task_started: return if not NEWS_BACKGROUND_REFRESH_ENABLED: return _background_task_started = True async def _loop(): if not NEWS_BACKGROUND_REFRESH_ON_START: await asyncio.sleep(float(NEWS_REFRESH_INTERVAL_SECONDS)) while True: try: # Refresh all topics by passing topic=None await refresh_clusters(topic=None, limit=200) except Exception: # Avoid crashing the server on network errors. pass await asyncio.sleep(float(NEWS_REFRESH_INTERVAL_SECONDS)) import asyncio asyncio.create_task(_loop()) @app.get("/") def root(): return { "status": "ok", "transport": "fastmcp+sse", "mount": "/mcp", "tools": ["get_latest_events", "get_events_for_entity", "get_event_summary", "detect_emerging_topics"], "refresh": { "enabled": NEWS_BACKGROUND_REFRESH_ENABLED, "interval_seconds": NEWS_REFRESH_INTERVAL_SECONDS, }, } @app.get("/health") def health(): store = SQLiteClusterStore(DB_PATH) return { "status": "ok", "ttl_hours": CLUSTERS_TTL_HOURS, "db": str(DB_PATH), "refresh": store.get_feed_state("breakingthenews"), }