from __future__ import annotations from fastapi import FastAPI from mcp.server.fastmcp import FastMCP from mcp.server.transport_security import TransportSecuritySettings from news_mcp.config import CLUSTERS_TTL_HOURS, DEFAULT_TOPICS, DB_PATH from news_mcp.config import NEWS_REFRESH_INTERVAL_SECONDS, NEWS_BACKGROUND_REFRESH_ENABLED, NEWS_BACKGROUND_REFRESH_ON_START from news_mcp.jobs.poller import refresh_clusters from news_mcp.storage.sqlite_store import SQLiteClusterStore from news_mcp.enrichment.llm_enrich import summarize_cluster_groq from news_mcp.trends_resolution import resolve_entity_via_trends from news_mcp.llm import active_llm_config from news_mcp.entity_normalize import normalize_query from collections import Counter import logging mcp = FastMCP( "news-mcp", transport_security=TransportSecuritySettings(enable_dns_rebinding_protection=False), ) def _cluster_entity_haystack(cluster: dict) -> list[str]: """Collect the normalized entity clues attached to a cluster.""" values: list[str] = [] for ent in cluster.get("entities", []) or []: values.append(str(ent).strip().lower()) for res in cluster.get("entityResolutions", []) or []: if not isinstance(res, dict): continue for key in ("normalized", "canonical_label", "mid"): val = res.get(key) if val: values.append(str(val).strip().lower()) return [v for v in values if v] @mcp.tool(description="What is happening right now? Return the latest deduplicated news clusters for a topic.") async def get_latest_events(topic: str = "crypto", limit: int = 5, include_articles: bool = False): limit = max(1, min(int(limit), 20)) # If the caller passes an entity-like value, resolve it and use the canonical # entity as the query lens. Otherwise keep the original topic path. topic_norm = normalize_query(topic).lower() resolved = resolve_entity_via_trends(topic_norm) allowed = {t.lower() for t in DEFAULT_TOPICS} is_topic = topic_norm in allowed query_terms = { topic_norm, str(resolved.get("normalized") or "").strip().lower(), str(resolved.get("canonical_label") or "").strip().lower(), str(resolved.get("mid") or "").strip().lower(), } query_terms = {q for q in query_terms if q} store = SQLiteClusterStore(DB_PATH) if is_topic: # Cache-first: only refresh if we currently have no fresh clusters for this topic. clusters = store.get_latest_clusters(topic=topic_norm, ttl_hours=CLUSTERS_TTL_HOURS, limit=limit) if not clusters: await refresh_clusters(topic=topic_norm, limit=200) clusters = store.get_latest_clusters(topic=topic_norm, ttl_hours=CLUSTERS_TTL_HOURS, limit=limit) else: # Entity-aware mode: search recent clusters across all topics and match by # raw entity, canonical label, or MID. clusters = store.get_latest_clusters_all_topics(ttl_hours=CLUSTERS_TTL_HOURS, limit=limit * 8) filtered = [] for c in clusters: haystack = _cluster_entity_haystack(c) if any(any(term in item for item in haystack) for term in query_terms): filtered.append(c) if len(filtered) >= limit: break clusters = filtered # Ensure the response is compact and agent-friendly. clusters_sorted = sorted(clusters, key=lambda x: float(x.get("importance", 0.0)), reverse=True) out = [] for c in clusters_sorted: item = { "cluster_id": c.get("cluster_id"), "headline": c.get("headline"), "summary": c.get("summary"), "entities": c.get("entities", []), "sentiment": c.get("sentiment", "neutral"), "importance": c.get("importance", 0.0), "sources": c.get("sources", []), "timestamp": c.get("timestamp"), } if include_articles: # Return minimal article fields to keep responses compact. arts = c.get("articles", []) or [] item["articles"] = [ { "title": a.get("title"), "url": a.get("url"), "source": a.get("source"), "timestamp": a.get("timestamp"), } for a in arts if isinstance(a, dict) ] out.append(item) return out @mcp.tool(description="What's happening with X? Filter latest clusters by extracted entity substring (case-insensitive).") async def get_events_for_entity(entity: str, limit: int = 10, include_articles: bool = False): limit = max(1, min(int(limit), 30)) query = normalize_query(entity).strip().lower() if not query: return [] resolved = resolve_entity_via_trends(query) query_terms = { query, str(resolved.get("normalized") or "").strip().lower(), str(resolved.get("canonical_label") or "").strip().lower(), str(resolved.get("mid") or "").strip().lower(), } query_terms = {q for q in query_terms if q} # Cache-first: search recent clusters across all topics. store = SQLiteClusterStore(DB_PATH) def _match_clusters(clusters: list[dict]) -> list[dict]: hits: list[dict] = [] for c in clusters: haystack = _cluster_entity_haystack(c) if any(any(term in item for item in haystack) for term in query_terms): hits.append(c) if len(hits) >= limit: break return hits clusters = store.get_latest_clusters_all_topics(ttl_hours=CLUSTERS_TTL_HOURS, limit=limit * 5) hits = _match_clusters(clusters) # If the recent slice misses, broaden the search window before giving up. if not hits: clusters = store.get_latest_clusters_all_topics(ttl_hours=24 * 7, limit=500) hits = _match_clusters(clusters) # Compress to tool response shape. out = [] for c in hits: item = { "cluster_id": c.get("cluster_id"), "headline": c.get("headline"), "summary": c.get("summary"), "entities": c.get("entities", []), "sentiment": c.get("sentiment", "neutral"), "importance": c.get("importance", 0.0), "sources": c.get("sources", []), "timestamp": c.get("timestamp"), } if include_articles: arts = c.get("articles", []) or [] item["articles"] = [ { "title": a.get("title"), "url": a.get("url"), "source": a.get("source"), "timestamp": a.get("timestamp"), } for a in arts if isinstance(a, dict) ] out.append(item) return out @mcp.tool(description="Explain an event clearly by cluster_id (Groq summary).") async def get_event_summary(event_id: str): store = SQLiteClusterStore(DB_PATH) # Summary cache: reuse if present within TTL. cached_summary = store.get_cluster_summary( cluster_id=event_id, ttl_hours=CLUSTERS_TTL_HOURS, ) if cached_summary: return { "event_id": event_id, "headline": cached_summary.get("headline"), "mergedSummary": cached_summary.get("mergedSummary"), "keyFacts": cached_summary.get("keyFacts", []), "sources": cached_summary.get("sources", []), } cluster = store.get_cluster_by_id(event_id) if not cluster: return { "event_id": event_id, "error": "NOT_FOUND", } summary = await summarize_cluster_groq(cluster) store.upsert_cluster_summary(event_id, summary) return { "event_id": event_id, "headline": summary.get("headline"), "mergedSummary": summary.get("mergedSummary"), "keyFacts": summary.get("keyFacts", []), "sources": summary.get("sources", []), } @mcp.tool(description="Detect emerging topics/entities from recent cached news clusters.") async def detect_emerging_topics(limit: int = 10): limit = max(1, min(int(limit), 20)) store = SQLiteClusterStore(DB_PATH) clusters = store.get_latest_clusters_all_topics(ttl_hours=CLUSTERS_TTL_HOURS, limit=200) from collections import Counter import re entity_counts = Counter() phrase_counts = Counter() topic_counts = Counter() for c in clusters: topic_counts[c.get("topic", "other")] += 1 for ent in c.get("entities", []) or []: key = str(ent).strip().lower() if key: entity_counts[key] += 1 text = f"{c.get('headline','')} {c.get('summary','')}" words = [w for w in re.findall(r"[A-Za-z][A-Za-z0-9\-]{2,}", text.lower())] for i in range(len(words) - 1): phrase = f"{words[i]} {words[i+1]}" if len(phrase) > 6: phrase_counts[phrase] += 1 emerging = [] for ent, count in entity_counts.most_common(limit): emerging.append({ "topic": ent, "trend_score": min(0.99, round(0.25 + 0.15 * count, 2)), "related_entities": [ent], "signal_type": "entity", "count": count, }) for phrase, count in phrase_counts.most_common(limit * 2): if any(item["topic"] == phrase for item in emerging): continue emerging.append({ "topic": phrase.title(), "trend_score": min(0.99, round(0.20 + 0.10 * count, 2)), "related_entities": [], "signal_type": "phrase", "count": count, }) if len(emerging) >= limit: break return emerging[:limit] @mcp.tool(description="What's the overall sentiment around an entity within a timeframe?") async def get_news_sentiment(entity: str, timeframe: str = "24h"): store = SQLiteClusterStore(DB_PATH) ent = normalize_query(entity).strip().lower() resolved = resolve_entity_via_trends(ent) query_terms = { ent, str(resolved.get("normalized") or "").strip().lower(), str(resolved.get("canonical_label") or "").strip().lower(), str(resolved.get("mid") or "").strip().lower(), } query_terms = {q for q in query_terms if q} if not ent: return { "entity": entity, "sentiment": "neutral", "score": 0.0, "cluster_count": 0, } # timeframe: accept '24h' or '24' tf = str(timeframe).strip().lower() try: hours = int(tf[:-1]) if tf.endswith("h") else int(tf) except Exception: hours = 24 hours = max(1, min(int(hours), 168)) clusters = store.get_latest_clusters_all_topics(ttl_hours=hours, limit=500) matched = [] for c in clusters: haystack = _cluster_entity_haystack(c) if any(any(term in item for item in haystack) for term in query_terms): matched.append(c) if not matched: return { "entity": entity, "sentiment": "neutral", "score": 0.0, "cluster_count": 0, } scores = [] for c in matched: s = c.get("sentimentScore") if s is not None: try: scores.append(float(s)) except Exception: pass avg_score = sum(scores) / len(scores) if scores else 0.0 # Keep the label aligned with the numeric score. # Small magnitudes are treated as neutral to avoid noisy label flips. if avg_score >= 0.15: sentiment = "positive" elif avg_score <= -0.15: sentiment = "negative" else: sentiment = "neutral" return { "entity": entity, "sentiment": sentiment, "score": round(avg_score, 3), "cluster_count": len(matched), } app = FastAPI(title="News MCP Server") logger = logging.getLogger("news_mcp.startup") app.mount("/mcp", mcp.sse_app()) _background_task_started = False @app.on_event("startup") async def _start_background_refresh(): global _background_task_started if _background_task_started: return if not NEWS_BACKGROUND_REFRESH_ENABLED: return _background_task_started = True logger.info("news-mcp llm config: %s", active_llm_config()) async def _loop(): if not NEWS_BACKGROUND_REFRESH_ON_START: await asyncio.sleep(float(NEWS_REFRESH_INTERVAL_SECONDS)) while True: try: # Refresh all topics by passing topic=None await refresh_clusters(topic=None, limit=200) except Exception: # Avoid crashing the server on network errors. pass await asyncio.sleep(float(NEWS_REFRESH_INTERVAL_SECONDS)) import asyncio asyncio.create_task(_loop()) @app.get("/") def root(): return { "status": "ok", "transport": "fastmcp+sse", "mount": "/mcp", "tools": ["get_latest_events", "get_events_for_entity", "get_event_summary", "detect_emerging_topics"], "refresh": { "enabled": NEWS_BACKGROUND_REFRESH_ENABLED, "interval_seconds": NEWS_REFRESH_INTERVAL_SECONDS, }, } @app.get("/health") def health(): store = SQLiteClusterStore(DB_PATH) return { "status": "ok", "ttl_hours": CLUSTERS_TTL_HOURS, "db": str(DB_PATH), "refresh": store.get_feed_state("breakingthenews"), }