from __future__ import annotations import asyncio import logging from collections import Counter from datetime import datetime, timezone from email.utils import parsedate_to_datetime from fastapi import FastAPI from mcp.server.fastmcp import FastMCP from mcp.server.transport_security import TransportSecuritySettings from news_mcp.config import DEFAULT_LOOKBACK_HOURS, DEFAULT_TOPICS, DB_PATH from news_mcp.config import ( NEWS_PRUNE_INTERVAL_HOURS, NEWS_PRUNING_ENABLED, NEWS_REFRESH_INTERVAL_SECONDS, NEWS_BACKGROUND_REFRESH_ENABLED, NEWS_BACKGROUND_REFRESH_ON_START, NEWS_RETENTION_DAYS, ) from news_mcp.jobs.poller import refresh_clusters from news_mcp.storage.sqlite_store import SQLiteClusterStore from news_mcp.enrichment.llm_enrich import summarize_cluster_llm from news_mcp.trends_resolution import resolve_entity_via_trends from news_mcp.llm import active_llm_config from news_mcp.entity_normalize import normalize_query mcp = FastMCP( "news-mcp", transport_security=TransportSecuritySettings(enable_dns_rebinding_protection=False), ) def _cluster_entity_haystack(cluster: dict) -> list[str]: """Collect the normalized entity clues attached to a cluster.""" values: list[str] = [] for ent in cluster.get("entities", []) or []: values.append(str(ent).strip().lower()) for res in cluster.get("entityResolutions", []) or []: if not isinstance(res, dict): continue for key in ("normalized", "canonical_label", "mid"): val = res.get(key) if val: values.append(str(val).strip().lower()) return [v for v in values if v] def _parse_cluster_timestamp(value) -> datetime: if not value: return datetime.min.replace(tzinfo=timezone.utc) text = str(value).strip() if not text: return datetime.min.replace(tzinfo=timezone.utc) try: dt = datetime.fromisoformat(text.replace("Z", "+00:00")) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) return dt.astimezone(timezone.utc) except Exception: pass try: dt = parsedate_to_datetime(text) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) return dt.astimezone(timezone.utc) except Exception: return datetime.min.replace(tzinfo=timezone.utc) def _sort_clusters_by_recency(clusters: list[dict]) -> list[dict]: return sorted( clusters, key=lambda c: ( _parse_cluster_timestamp(c.get("timestamp")), float(c.get("importance", 0.0) or 0.0), ), reverse=True, ) @mcp.tool(description="Investigate a topic and return the newest deduplicated news clusters, sorted by recency.") async def get_latest_events(topic: str = "crypto", limit: int = 5, include_articles: bool = False): limit = max(1, min(int(limit), 20)) # If the caller passes an entity-like value, resolve it and use the canonical # entity as the query lens. Otherwise keep the original topic path. topic_norm = normalize_query(topic).lower() resolved = resolve_entity_via_trends(topic_norm) allowed = {t.lower() for t in DEFAULT_TOPICS} is_topic = topic_norm in allowed query_terms = { topic_norm, str(resolved.get("normalized") or "").strip().lower(), str(resolved.get("canonical_label") or "").strip().lower(), str(resolved.get("mid") or "").strip().lower(), } query_terms = {q for q in query_terms if q} store = SQLiteClusterStore(DB_PATH) if is_topic: # Cache-first: only refresh if we currently have no fresh clusters for this topic. clusters = store.get_latest_clusters(topic=topic_norm, ttl_hours=DEFAULT_LOOKBACK_HOURS, limit=limit) if not clusters: await refresh_clusters(topic=topic_norm, limit=200) clusters = store.get_latest_clusters(topic=topic_norm, ttl_hours=DEFAULT_LOOKBACK_HOURS, limit=limit) else: # Entity-aware mode: search recent clusters across all topics and match by # raw entity, canonical label, or MID. clusters = store.get_latest_clusters_all_topics(ttl_hours=DEFAULT_LOOKBACK_HOURS, limit=limit * 8) filtered = [] for c in clusters: haystack = _cluster_entity_haystack(c) if any(any(term in item for item in haystack) for term in query_terms): filtered.append(c) if len(filtered) >= limit: break clusters = filtered out = [] for c in _sort_clusters_by_recency(clusters): item = { "cluster_id": c.get("cluster_id"), "headline": c.get("headline"), "summary": c.get("summary"), "entities": c.get("entities", []), "sentiment": c.get("sentiment", "neutral"), "importance": c.get("importance", 0.0), "sources": c.get("sources", []), "timestamp": c.get("timestamp"), } if include_articles: # Return minimal article fields to keep responses compact. arts = c.get("articles", []) or [] item["articles"] = [ { "title": a.get("title"), "url": a.get("url"), "source": a.get("source"), "timestamp": a.get("timestamp"), } for a in arts if isinstance(a, dict) ] out.append(item) return out @mcp.tool(description="Investigate a person, company, place, or theme by matching extracted entities within a time window.") async def get_events_for_entity(entity: str, limit: int = 10, timeframe: str = "24h", include_articles: bool = False): limit = max(1, min(int(limit), 30)) query = normalize_query(entity).strip().lower() if not query: return [] resolved = resolve_entity_via_trends(query) query_terms = { query, str(resolved.get("normalized") or "").strip().lower(), str(resolved.get("canonical_label") or "").strip().lower(), str(resolved.get("mid") or "").strip().lower(), } query_terms = {q for q in query_terms if q} store = SQLiteClusterStore(DB_PATH) def _match_clusters(clusters: list[dict]) -> list[dict]: hits: list[dict] = [] for c in _sort_clusters_by_recency(clusters): haystack = _cluster_entity_haystack(c) if any(any(term in item for item in haystack) for term in query_terms): hits.append(c) if len(hits) >= limit: break return hits hours = _parse_timeframe_to_hours(timeframe) clusters = store.get_latest_clusters_all_topics(ttl_hours=hours, limit=max(200, limit * 10)) hits = _match_clusters(clusters) out = [] for c in hits: item = { "cluster_id": c.get("cluster_id"), "headline": c.get("headline"), "summary": c.get("summary"), "entities": c.get("entities", []), "sentiment": c.get("sentiment", "neutral"), "importance": c.get("importance", 0.0), "sources": c.get("sources", []), "timestamp": c.get("timestamp"), } if include_articles: arts = c.get("articles", []) or [] item["articles"] = [ { "title": a.get("title"), "url": a.get("url"), "source": a.get("source"), "timestamp": a.get("timestamp"), } for a in arts if isinstance(a, dict) ] out.append(item) return out @mcp.tool(description="Investigate one cluster in depth and return a concise LLM-written explanation plus key facts.") async def get_event_summary(event_id: str, include_articles: bool = False): store = SQLiteClusterStore(DB_PATH) # Summary cache: reuse if present within TTL. cached_summary = store.get_cluster_summary( cluster_id=event_id, ttl_hours=DEFAULT_LOOKBACK_HOURS, ) if cached_summary: out = { "event_id": event_id, "headline": cached_summary.get("headline"), "mergedSummary": cached_summary.get("mergedSummary"), "keyFacts": cached_summary.get("keyFacts", []), "sources": cached_summary.get("sources", []), } if include_articles: cluster = store.get_cluster_by_id(event_id) arts = (cluster or {}).get("articles", []) or [] out["articles"] = [ { "title": a.get("title"), "url": a.get("url"), "source": a.get("source"), "timestamp": a.get("timestamp"), } for a in arts if isinstance(a, dict) ] return out cluster = store.get_cluster_by_id(event_id) if not cluster: return { "event_id": event_id, "error": "NOT_FOUND", } articles_out = None if include_articles: arts = cluster.get("articles", []) or [] articles_out = [ { "title": a.get("title"), "url": a.get("url"), "source": a.get("source"), "timestamp": a.get("timestamp"), } for a in arts if isinstance(a, dict) ] summary = await summarize_cluster_llm(cluster) store.upsert_cluster_summary(event_id, summary) out = { "event_id": event_id, "headline": summary.get("headline"), "mergedSummary": summary.get("mergedSummary"), "keyFacts": summary.get("keyFacts", []), "sources": summary.get("sources", []), } if include_articles: out["articles"] = articles_out or [] return out @mcp.tool(description="Explore what is starting to matter: surface emerging entities and phrases from recent clusters.") async def detect_emerging_topics(limit: int = 10): limit = max(1, min(int(limit), 20)) store = SQLiteClusterStore(DB_PATH) clusters = store.get_latest_clusters_all_topics(ttl_hours=DEFAULT_LOOKBACK_HOURS, limit=200) import re entity_counts = Counter() entity_importance_sum = Counter() # co-occurrence: ent -> other_ent -> count entity_cooccur = {} phrase_counts = Counter() topic_counts = Counter() # Very light heuristics to reduce “meta entities” dominating emerging topics. # Keep it conservative: only skip obvious boilerplate. def _is_generic_entity(ent: str) -> bool: e = str(ent).strip().lower() if not e: return True if len(e) < 4: return True # common outlet-ish / meta-ish tokens if e in {"news", "latest", "breaking"}: return True return False for c in clusters: topic_counts[c.get("topic", "other")] += 1 ents_in_cluster = [e for e in (c.get("entities", []) or []) if not _is_generic_entity(e)] ents_in_cluster_norm = [str(e).strip().lower() for e in ents_in_cluster if str(e).strip()] for ent in ents_in_cluster_norm: if _is_generic_entity(ent): continue entity_counts[ent] += 1 try: entity_importance_sum[ent] += float(c.get("importance", 0.0) or 0.0) except Exception: pass # update co-occurrence counts for i in range(len(ents_in_cluster_norm)): a = ents_in_cluster_norm[i] if not a: continue entity_cooccur.setdefault(a, Counter()) for j in range(len(ents_in_cluster_norm)): if i == j: continue b = ents_in_cluster_norm[j] if not b: continue entity_cooccur[a][b] += 1 text = f"{c.get('headline','')} {c.get('summary','')}" words = [w for w in re.findall(r"[A-Za-z][A-Za-z0-9\-]{2,}", text.lower())] for i in range(len(words) - 1): phrase = f"{words[i]} {words[i+1]}" if len(phrase) > 6: phrase_counts[phrase] += 1 emerging = [] # Combine frequency with average importance so “big signal” rises over pure repetition. for ent, count in entity_counts.most_common(limit): avg_imp = entity_importance_sum[ent] / max(1, count) # avg_imp is typically 0..~1; keep score bounded. trend_score = 0.25 + 0.40 * min(1.0, avg_imp) + 0.08 * min(6.0, float(count)) related = [] for other, _cnt in (entity_cooccur.get(ent) or Counter()).most_common(3): # avoid returning the entity itself (shouldn't happen, but be safe) if other != ent: related.append(other) emerging.append({ "topic": ent, "trend_score": min(0.99, round(trend_score, 2)), "related_entities": related if related else [ent], "signal_type": "entity", "count": count, "avg_importance": round(avg_imp, 3), }) for phrase, count in phrase_counts.most_common(limit * 2): if any(item["topic"] == phrase for item in emerging): continue emerging.append({ "topic": phrase.title(), "trend_score": min(0.99, round(0.20 + 0.10 * count, 2)), "related_entities": [], "signal_type": "phrase", "count": count, }) if len(emerging) >= limit: break return emerging[:limit] @mcp.tool(description="Investigate whether sentiment around an entity is positive, negative, or neutral over a chosen lookback window.") async def get_news_sentiment(entity: str, timeframe: str = "24h"): store = SQLiteClusterStore(DB_PATH) ent = normalize_query(entity).strip().lower() resolved = resolve_entity_via_trends(ent) query_terms = { ent, str(resolved.get("normalized") or "").strip().lower(), str(resolved.get("canonical_label") or "").strip().lower(), str(resolved.get("mid") or "").strip().lower(), } query_terms = {q for q in query_terms if q} if not ent: return { "entity": entity, "sentiment": "neutral", "score": 0.0, "cluster_count": 0, } # timeframe: accept '24h' or '24' tf = str(timeframe).strip().lower() try: hours = int(tf[:-1]) if tf.endswith("h") else int(tf) except Exception: hours = 24 hours = max(1, min(int(hours), 168)) clusters = store.get_latest_clusters_all_topics(ttl_hours=hours, limit=500) matched = [] for c in clusters: haystack = _cluster_entity_haystack(c) if any(any(term in item for item in haystack) for term in query_terms): matched.append(c) if not matched: return { "entity": entity, "sentiment": "neutral", "score": 0.0, "cluster_count": 0, } scores = [] for c in matched: s = c.get("sentimentScore") if s is not None: try: scores.append(float(s)) except Exception: pass avg_score = sum(scores) / len(scores) if scores else 0.0 # Keep the label aligned with the numeric score. # Small magnitudes are treated as neutral to avoid noisy label flips. if avg_score >= 0.15: sentiment = "positive" elif avg_score <= -0.15: sentiment = "negative" else: sentiment = "neutral" return { "entity": entity, "sentiment": sentiment, "score": round(avg_score, 3), "cluster_count": len(matched), } def _parse_timeframe_to_hours(timeframe: str) -> int: tf = str(timeframe).strip().lower() try: if tf.endswith("d"): days = int(tf[:-1]) return max(1, days * 24) if tf.endswith("h"): return max(1, int(tf[:-1])) return max(1, int(tf)) except Exception: return 24 @mcp.tool( description="Investigate which entities tend to appear alongside a subject entity in recent clusters, based on co-occurrence." ) async def get_related_entities(subject: str, timeframe: str = "24h", limit: int = 10): store = SQLiteClusterStore(DB_PATH) limit = max(1, min(int(limit), 30)) subj = normalize_query(subject).strip().lower() if not subj: return [] resolved = resolve_entity_via_trends(subj) query_terms = { subj, str(resolved.get("normalized") or "").strip().lower(), str(resolved.get("canonical_label") or "").strip().lower(), str(resolved.get("mid") or "").strip().lower(), } query_terms = {q for q in query_terms if q} hours = _parse_timeframe_to_hours(timeframe) clusters = store.get_latest_clusters_all_topics(ttl_hours=hours, limit=500) # Aggregate related metrics per entity. rel_count = Counter() rel_imp_sum = Counter() rel_sent_sum = Counter() rel_sent_n = Counter() for c in clusters: haystack = _cluster_entity_haystack(c) if not any(term in item for item in haystack for term in query_terms): continue ents = [str(e).strip().lower() for e in (c.get("entities", []) or []) if str(e).strip()] # remove generic/meta-ish short tokens conservatively ents = [e for e in ents if len(e) >= 4] for e in ents: if e in query_terms: continue rel_count[e] += 1 try: rel_imp_sum[e] += float(c.get("importance", 0.0) or 0.0) except Exception: pass # sentiment aggregation based on sentimentScore if available. s = c.get("sentimentScore") if s is not None: try: rel_sent_sum[e] += float(s) rel_sent_n[e] += 1 except Exception: pass # Sort by count, then avg importance. items = [] for ent, cnt in rel_count.most_common(): avg_imp = rel_imp_sum[ent] / max(1, cnt) avg_score = rel_sent_sum[ent] / max(1, rel_sent_n[ent]) if rel_sent_n[ent] else 0.0 if avg_score >= 0.15: sentiment = "positive" elif avg_score <= -0.15: sentiment = "negative" else: sentiment = "neutral" items.append( { "entity": ent, "count": cnt, "avg_importance": round(avg_imp, 3), "sentiment": sentiment, "score": round(avg_score, 3), } ) if len(items) >= limit: break return items app = FastAPI(title="News MCP Server") logger = logging.getLogger("news_mcp.startup") app.mount("/mcp", mcp.sse_app()) _background_task_started = False @app.on_event("startup") async def _start_background_refresh(): global _background_task_started if _background_task_started: return if not NEWS_BACKGROUND_REFRESH_ENABLED: return _background_task_started = True logger.info("news-mcp llm config: %s", active_llm_config()) store = SQLiteClusterStore(DB_PATH) prune_result = store.prune_if_due( pruning_enabled=NEWS_PRUNING_ENABLED, retention_days=NEWS_RETENTION_DAYS, interval_hours=NEWS_PRUNE_INTERVAL_HOURS, ) logger.info("startup prune_result=%s", prune_result) async def _loop(): if not NEWS_BACKGROUND_REFRESH_ON_START: await asyncio.sleep(float(NEWS_REFRESH_INTERVAL_SECONDS)) while True: try: # Refresh all topics by passing topic=None await refresh_clusters(topic=None, limit=200) except Exception: # Avoid crashing the server on network errors. pass await asyncio.sleep(float(NEWS_REFRESH_INTERVAL_SECONDS)) asyncio.create_task(_loop()) @app.get("/") def root(): return { "status": "ok", "transport": "fastmcp+sse", "mount": "/mcp", "tools": ["get_latest_events", "get_events_for_entity", "get_event_summary", "detect_emerging_topics"], "refresh": { "enabled": NEWS_BACKGROUND_REFRESH_ENABLED, "interval_seconds": NEWS_REFRESH_INTERVAL_SECONDS, }, "retention": { "lookback_hours": DEFAULT_LOOKBACK_HOURS, "retention_days": NEWS_RETENTION_DAYS, }, "pruning": { "enabled": NEWS_PRUNING_ENABLED, "interval_hours": NEWS_PRUNE_INTERVAL_HOURS, }, } @app.get("/health") def health(): store = SQLiteClusterStore(DB_PATH) return { "status": "ok", "lookback_hours": DEFAULT_LOOKBACK_HOURS, "db": str(DB_PATH), "refresh": store.get_feed_state("breakingthenews"), "pruning": store.get_prune_state( pruning_enabled=NEWS_PRUNING_ENABLED, retention_days=NEWS_RETENTION_DAYS, interval_hours=NEWS_PRUNE_INTERVAL_HOURS, ), }