from __future__ import annotations import asyncio import logging import subprocess import math import re import time from collections import Counter from datetime import datetime, timezone from email.utils import parsedate_to_datetime from pathlib import Path from fastapi import FastAPI, Form from mcp.server.fastmcp import FastMCP from mcp.server.transport_security import TransportSecuritySettings from news_mcp.config import DEFAULT_LOOKBACK_HOURS, DEFAULT_TOPICS, DB_PATH from news_mcp.config import ( NEWS_FEED_URL, NEWS_FEED_URLS, NEWS_PRUNE_INTERVAL_HOURS, NEWS_PRUNING_ENABLED, NEWS_REFRESH_INTERVAL_SECONDS, NEWS_BACKGROUND_REFRESH_ENABLED, NEWS_BACKGROUND_REFRESH_ON_START, NEWS_RETENTION_DAYS, ) from news_mcp.jobs.poller import refresh_clusters from news_mcp.storage.sqlite_store import SQLiteClusterStore from news_mcp.dashboard.dashboard_store import DashboardStore from news_mcp.enrichment.llm_enrich import summarize_cluster_llm from news_mcp.trends_resolution import resolve_entity_via_trends from news_mcp.llm import active_llm_config from news_mcp.entity_normalize import normalize_query from news_mcp.related_entities import related_recent_entities logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s", ) _PROCESS_STARTED_AT = time.monotonic() _REPO_ROOT = Path(__file__).resolve().parent try: _VERSION_HASH = ( subprocess.check_output( ["git", "rev-parse", "--short=9", "HEAD"], cwd=str(_REPO_ROOT), stderr=subprocess.DEVNULL, ) .decode() .strip() ) except Exception: _VERSION_HASH = "unknown" mcp = FastMCP( "news-mcp", transport_security=TransportSecuritySettings(enable_dns_rebinding_protection=False), ) def _cluster_entity_haystack(cluster: dict) -> list[str]: """Collect the normalized entity clues attached to a cluster.""" values: list[str] = [] for ent in cluster.get("entities", []) or []: values.append(str(ent).strip().lower()) for res in cluster.get("entityResolutions", []) or []: if not isinstance(res, dict): continue for key in ("normalized", "canonical_label", "mid"): val = res.get(key) if val: values.append(str(val).strip().lower()) return [v for v in values if v] def _parse_cluster_timestamp(value) -> datetime: if not value: return datetime.min.replace(tzinfo=timezone.utc) text = str(value).strip() if not text: return datetime.min.replace(tzinfo=timezone.utc) try: dt = datetime.fromisoformat(text.replace("Z", "+00:00")) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) return dt.astimezone(timezone.utc) except Exception: pass try: dt = parsedate_to_datetime(text) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) return dt.astimezone(timezone.utc) except Exception: return datetime.min.replace(tzinfo=timezone.utc) def _sort_clusters_by_recency(clusters: list[dict]) -> list[dict]: return sorted( clusters, key=lambda c: ( _parse_cluster_timestamp(c.get("timestamp")), float(c.get("importance", 0.0) or 0.0), ), reverse=True, ) def _tool_card(name: str, description: str, inputs: list[dict], outputs: list[str], notes: list[str] | None = None) -> dict: return { "name": name, "description": description, "inputs": inputs, "outputs": outputs, "notes": notes or [], } NEWS_TOOL_CARDS = [ _tool_card( "get_feeds", "List all configured RSS feeds with their enabled/disabled status.", [], ["feeds[]: {feed_key, enabled, last_hash, last_item_count, updated_at}"], ["Use this to see which feeds are currently active or disabled."], ), _tool_card( "toggle_feed", "Enable or disable a specific RSS feed by URL.", [ {"name": "feed_url", "type": "string", "meaning": "the feed URL to toggle"}, {"name": "enabled", "type": "boolean", "meaning": "true to enable, false to disable"}, ], ["ok", "feed_key", "enabled"], ["Changes take effect on the next refresh cycle."], ), _tool_card( "get_latest_events", "Get the newest deduplicated clusters for a topic or resolved entity-like query.", [ {"name": "topic", "type": "string", "default": "all topics", "meaning": "coarse category (crypto, macro, regulation, ai, other), entity-like topic, or omit for all topics"}, {"name": "limit", "type": "integer", "default": 5, "range": "1-20"}, {"name": "include_articles", "type": "boolean", "default": False}, ], ["headline", "summary", "entities", "sentiment", "importance", "sources", "timestamp", "articles?"], ["Use when you want the freshest clusters and are willing to let the server decide topic vs entity mode."], ), _tool_card( "get_events_for_entity", "Search recent clusters for a person, place, company, or theme by entity matching.", [ {"name": "entity", "type": "string", "meaning": "entity label or phrase"}, {"name": "timeframe", "type": "string", "default": "24h", "examples": ["24h", "72h", "3d"]}, {"name": "limit", "type": "integer", "default": 10, "range": "1-30"}, {"name": "include_articles", "type": "boolean", "default": False}, ], ["headline", "summary", "entities", "sentiment", "importance", "sources", "timestamp", "articles?"], ["Normalization is automatic; use this for an entity-centered deep dive."], ), _tool_card( "get_event_summary", "Produce a concise LLM-written explanation for one cluster and key facts.", [ {"name": "event_id", "type": "string", "meaning": "cluster_id; do not surface in user-facing prose"}, {"name": "include_articles", "type": "boolean", "default": False}, ], ["headline", "mergedSummary", "keyFacts", "sources", "articles?"], ["Prefer this after you have already chosen a specific cluster to explain."], ), _tool_card( "detect_emerging_topics", "Surface entities and phrases starting to matter in the recent window.", [ {"name": "limit", "type": "integer", "default": 10, "range": "1-20"}, {"name": "timeframe", "type": "string", "default": "24h", "examples": ["4h", "24h", "3d"]}, {"name": "topic", "type": "string", "default": "all topics", "examples": ["crypto", "macro", "regulation", "ai", "other"]}, {"name": "around", "type": "string", "default": "none", "meaning": "entity to scope results to its neighborhood (e.g. \"Bitcoin\")"}, ], ["topic", "trend_score", "velocity", "recent_count", "prior_count", "source_count", "related_entities", "signal_type"], ["Use timeframe to control lookback, topic to scope to a category, around to find what's emerging near a specific entity."], ), _tool_card( "get_news_sentiment", "Estimate sentiment around an entity over a lookback window.", [ {"name": "entity", "type": "string"}, {"name": "timeframe", "type": "string", "default": "24h"}, ], ["entity", "sentiment", "score", "cluster_count"], ["Use after locating a cluster set or entity neighborhood."], ), _tool_card( "get_related_recent_entities", "Blend local co-occurrence with Google Trends related topics, while preserving mids where available.", [ {"name": "subject", "type": "string", "meaning": "canonical entity or subject phrase"}, {"name": "timeframe", "type": "string", "default": "72h"}, {"name": "limit", "type": "integer", "default": 10, "range": "1-25"}, {"name": "include_trends", "type": "boolean", "default": True}, ], ["subject", "related[].normalized", "related[].canonical_label", "related[].mid", "related[].sources", "related[].scores"], ["Use this to drill from a subject into related entities, then feed those into get_events_for_entity."], ), ] NEWS_COMPOSITION_RECIPES = [ { "name": "fresh-news-tail", "steps": [ "get_latest_events(topic=...)", "optionally get_event_summary(event_id=...) for the strongest cluster", ], "notes": ["Best for a quick tail of what is happening now. Omit topic for all topics, or pass crypto/macro/regulation/ai/other to filter."] }, { "name": "entity-deep-dive", "steps": [ "get_events_for_entity(entity=...)", "get_event_summary(event_id=...)", "get_news_sentiment(entity=..., timeframe=...)", ], "notes": ["Prefer canonical entity labels when you have them; the server normalizes for you."], }, { "name": "subject-neighborhood", "steps": [ "get_related_recent_entities(subject=...)", "for each strong related entity, call get_events_for_entity(entity=...)", ], "notes": ["Use this when you want a graph-like expansion around a subject."] }, { "name": "emerging-signal", "steps": [ "detect_emerging_topics(limit=..., timeframe=..., topic=..., around=...)", "choose a topic/entity from the results", "get_events_for_entity(entity=...)", "get_news_sentiment(entity=...)", ], "notes": ["Use timeframe to control lookback (e.g. \"4h\" for what's hot right now, \"3d\" for weekly trends), topic to scope to a category, around to find what's emerging near a specific entity. Check velocity and source_count to distinguish real spikes from noise."], }, ] NEWS_AGENT_TIPS = [ "If you need a fast answer, start with get_latest_events, then summarize the strongest cluster with get_event_summary.", "If a user asks about a person/place/company/theme, use get_events_for_entity before broadening to get_related_recent_entities.", "Treat cluster_id as an internal cursor, not user-facing output; use it only for follow-up tool calls.", "When describing clusters, keep sources and timestamps visible so the user can assess recency and provenance.", "Prefer a short chain of tools over many parallel calls unless you are building a neighborhood map or comparison table.", "For tricky names, rely on the server's resolver instead of inventing alias rules in the client.", "Use detect_emerging_topics with timeframe=\"4h\" for what's hot right now, timeframe=\"3d\" for weekly trends. Use topic= to scope to a category, around= to find what's emerging near a specific entity. Check velocity to distinguish accelerating signals from steady-state ones.", ] NEWS_EXAMPLE_CHAINS = [ { "task": "What is happening now?", "chain": [ "get_latest_events(topic=...)", "get_event_summary(event_id=...) if one cluster looks important", ], }, { "task": "Deep dive on an entity", "chain": [ "get_events_for_entity(entity=..., timeframe=...)", "get_news_sentiment(entity=..., timeframe=...)", "get_event_summary(event_id=...) for the strongest cluster", ], }, { "task": "Broaden from a subject", "chain": [ "get_related_recent_entities(subject=..., include_trends=true)", "get_events_for_entity(entity=...) for the strongest related entities", ], }, { "task": "Find what is emerging", "chain": [ "detect_emerging_topics(limit=..., timeframe=..., topic=..., around=...) with optional scoping", "get_events_for_entity(entity=...) on one or two emerging terms", ], }, { "task": "What's heating up around a specific entity", "chain": [ "detect_emerging_topics(around=\"\", timeframe=\"4h\")", "get_events_for_entity(entity=...) on the top emerging neighbor", ], }, ] def _configured_feed_urls() -> list[str]: """Return the configured feed URLs from environment variables.""" urls = [u.strip() for u in NEWS_FEED_URLS.split(",") if u.strip()] if not urls: urls = [NEWS_FEED_URL] return urls @mcp.tool(description="List all configured RSS feeds with their current enabled/disabled status.") async def get_feeds() -> list[dict]: """Return each feed URL with its enabled flag, last fetch stats, and timestamps.""" store = SQLiteClusterStore(DB_PATH) return store.get_feed_state_list() @mcp.tool(description="Enable or disable a specific RSS feed by URL.") async def toggle_feed(feed_url: str, enabled: bool) -> dict: """Toggle a feed's active/inactive state. Changes take effect on the next background refresh cycle. Returns the updated feed state. """ store = SQLiteClusterStore(DB_PATH) store.set_feed_enabled(feed_url.strip(), enabled) updated = store.get_feed_state(feed_url.strip()) return {"ok": True, "feed_key": feed_url.strip(), "enabled": enabled, "details": updated} @mcp.tool(description="Investigate a topic and return the newest deduplicated news clusters, sorted by recency.") async def get_latest_events(topic: str | None = None, limit: int = 5, include_articles: bool = False): limit = max(1, min(int(limit), 20)) # When topic is omitted, search across all topics (no topic filter). # When topic is provided and matches a known topic, filter by that topic. # Otherwise treat the value as an entity-like query. topic_norm = normalize_query(topic).lower() if topic else "" resolved = resolve_entity_via_trends(topic_norm) if topic_norm else {} allowed = {t.lower() for t in DEFAULT_TOPICS} is_topic = topic_norm in allowed is_all_topics = not topic_norm query_terms = { topic_norm, str(resolved.get("normalized") or "").strip().lower(), str(resolved.get("canonical_label") or "").strip().lower(), str(resolved.get("mid") or "").strip().lower(), } query_terms = {q for q in query_terms if q} store = SQLiteClusterStore(DB_PATH) if is_all_topics: # No topic specified: return freshest clusters across all topics. clusters = store.get_latest_clusters_all_topics(ttl_hours=DEFAULT_LOOKBACK_HOURS, limit=limit) elif is_topic: # Cache-first: only refresh if we currently have no fresh clusters for this topic. clusters = store.get_latest_clusters(topic=topic_norm, ttl_hours=DEFAULT_LOOKBACK_HOURS, limit=limit) if not clusters: await refresh_clusters(topic=topic_norm, limit=200) clusters = store.get_latest_clusters(topic=topic_norm, ttl_hours=DEFAULT_LOOKBACK_HOURS, limit=limit) else: # Entity-aware mode: search recent clusters across all topics and match by # raw entity, canonical label, or MID. clusters = store.get_latest_clusters_all_topics(ttl_hours=DEFAULT_LOOKBACK_HOURS, limit=limit * 8) filtered = [] for c in clusters: haystack = _cluster_entity_haystack(c) if any(any(term in item for item in haystack) for term in query_terms): filtered.append(c) if len(filtered) >= limit: break clusters = filtered out = [] for c in _sort_clusters_by_recency(clusters): item = { "cluster_id": c.get("cluster_id"), "headline": c.get("headline"), "summary": c.get("summary"), "entities": c.get("entities", []), "sentiment": c.get("sentiment", "neutral"), "importance": c.get("importance", 0.0), "sources": c.get("sources", []), "timestamp": c.get("timestamp"), } if include_articles: # Return minimal article fields to keep responses compact. arts = c.get("articles", []) or [] item["articles"] = [ { "title": a.get("title"), "url": a.get("url"), "source": a.get("source"), "timestamp": a.get("timestamp"), } for a in arts if isinstance(a, dict) ] out.append(item) return out @mcp.tool(description="Investigate a person, company, place, or theme by matching extracted entities within a time window.") async def get_events_for_entity(entity: str, limit: int = 10, timeframe: str = "24h", include_articles: bool = False): limit = max(1, min(int(limit), 30)) query = normalize_query(entity).strip().lower() if not query: return [] resolved = resolve_entity_via_trends(query) query_terms = { query, str(resolved.get("normalized") or "").strip().lower(), str(resolved.get("canonical_label") or "").strip().lower(), str(resolved.get("mid") or "").strip().lower(), } query_terms = {q for q in query_terms if q} store = SQLiteClusterStore(DB_PATH) def _match_clusters(clusters: list[dict]) -> list[dict]: hits: list[dict] = [] for c in _sort_clusters_by_recency(clusters): haystack = _cluster_entity_haystack(c) if any(any(term in item for item in haystack) for term in query_terms): hits.append(c) if len(hits) >= limit: break return hits hours = _parse_timeframe_to_hours(timeframe) clusters = store.get_latest_clusters_all_topics(ttl_hours=hours, limit=max(200, limit * 10)) hits = _match_clusters(clusters) out = [] for c in hits: item = { "cluster_id": c.get("cluster_id"), "headline": c.get("headline"), "summary": c.get("summary"), "entities": c.get("entities", []), "sentiment": c.get("sentiment", "neutral"), "importance": c.get("importance", 0.0), "sources": c.get("sources", []), "timestamp": c.get("timestamp"), } if include_articles: arts = c.get("articles", []) or [] item["articles"] = [ { "title": a.get("title"), "url": a.get("url"), "source": a.get("source"), "timestamp": a.get("timestamp"), } for a in arts if isinstance(a, dict) ] out.append(item) return out @mcp.tool(description="Return entities most commonly associated with the subject in recent clusters, optionally blended with Google Trends suggestions.") async def get_related_recent_entities(subject: str, timeframe: str = "72h", limit: int = 10, include_trends: bool = True): limit = max(1, min(int(limit), 25)) hours = _parse_timeframe_to_hours(timeframe) include_trends_bool = str(include_trends).strip().lower() not in {"false", "0", "no"} store = SQLiteClusterStore(DB_PATH) result = related_recent_entities( store=store, subject=subject, timeframe_hours=hours, limit=limit, include_trends=include_trends_bool, ) return result @mcp.tool(description="Investigate one cluster in depth and return a concise LLM-written explanation plus key facts.") async def get_event_summary(event_id: str, include_articles: bool = False): store = SQLiteClusterStore(DB_PATH) # Summary cache: reuse if present within TTL. cached_summary = store.get_cluster_summary( cluster_id=event_id, ttl_hours=DEFAULT_LOOKBACK_HOURS, ) if cached_summary: out = { "event_id": event_id, "headline": cached_summary.get("headline"), "mergedSummary": cached_summary.get("mergedSummary"), "keyFacts": cached_summary.get("keyFacts", []), "sources": cached_summary.get("sources", []), } if include_articles: cluster = store.get_cluster_by_id(event_id) arts = (cluster or {}).get("articles", []) or [] out["articles"] = [ { "title": a.get("title"), "url": a.get("url"), "source": a.get("source"), "timestamp": a.get("timestamp"), } for a in arts if isinstance(a, dict) ] return out cluster = store.get_cluster_by_id(event_id) if not cluster: return { "event_id": event_id, "error": "NOT_FOUND", } articles_out = None if include_articles: arts = cluster.get("articles", []) or [] articles_out = [ { "title": a.get("title"), "url": a.get("url"), "source": a.get("source"), "timestamp": a.get("timestamp"), } for a in arts if isinstance(a, dict) ] summary = await summarize_cluster_llm(cluster) store.upsert_cluster_summary(event_id, summary) out = { "event_id": event_id, "headline": summary.get("headline"), "mergedSummary": summary.get("mergedSummary"), "keyFacts": summary.get("keyFacts", []), "sources": summary.get("sources", []), } if include_articles: out["articles"] = articles_out or [] return out @mcp.tool(description="Explore what is starting to matter: surface emerging entities and phrases from recent clusters. " "Use timeframe to control the lookback window, topic to scope to a category, and around to find what's emerging near a specific entity.") async def detect_emerging_topics(limit: int = 10, timeframe: str = "24h", topic: str | None = None, around: str | None = None): """Surface entities and phrases that are accelerating in recent clusters. Args: limit: max results to return (1-20, default 10). timeframe: lookback window like "4h", "24h", "3d" (default "24h"). topic: optional coarse topic filter ("crypto", "macro", "regulation", "ai", "other"). around: optional entity — only return entities that co-occur with this entity in the recent window (e.g. "Bitcoin" to find what's emerging in Bitcoin's neighborhood). """ limit = max(1, min(int(limit), 20)) hours = _parse_timeframe_to_hours(timeframe) half_hours = hours / 2.0 store = SQLiteClusterStore(DB_PATH) # Fetch more clusters than needed so velocity stats are meaningful even for short windows. clusters = store.get_latest_clusters_all_topics(ttl_hours=hours, limit=500) # --- optional topic filter --- if topic: topic_norm = normalize_query(topic).strip().lower() if topic_norm: clusters = [c for c in clusters if (c.get("topic") or "other").strip().lower() == topic_norm] # --- resolve the 'around' entity --- around_terms: set[str] = set() if around: around_norm = normalize_query(around).strip().lower() if around_norm: resolved = resolve_entity_via_trends(around_norm) around_terms = { around_norm, str(resolved.get("normalized") or "").strip().lower(), str(resolved.get("canonical_label") or "").strip().lower(), } around_terms.discard("") # split clusters into first-half vs second-half by timestamp # clusters are already sorted most-recent-first from the store now = datetime.now(timezone.utc) def _cluster_age_hours(c: dict) -> float: """Return the cluster's age in hours (approximate, from now).""" ts = c.get("timestamp") or c.get("last_updated") if not ts: return 0.0 # treat un-dated as fresh try: s = str(ts).replace("Z", "+00:00") dt = datetime.fromisoformat(s) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) return max(0.0, (now - dt.astimezone(timezone.utc)).total_seconds() / 3600.0) except Exception: try: dt = parsedate_to_datetime(str(ts)) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) return max(0.0, (now - dt.astimezone(timezone.utc)).total_seconds() / 3600.0) except Exception: return 0.0 # Generic entity filter _generic_tokens = {"news", "latest", "breaking", "update", "updates", "report", "reports"} def _is_generic_entity(ent: str) -> bool: e = str(ent).strip().lower() if not e or len(e) < 4: return True if e in _generic_tokens: return True return False # --- accumulate signals --- # recent = second half of timeframe (newer), prior = first half (older) entity_counts_recent = Counter() entity_counts_prior = Counter() entity_importance_recent = Counter() entity_sources: dict[str, set] = {} # ent -> set of source names entity_buckets: dict[str, set] = {} # ent -> set of time-bucket indices (for sustained-spike detection) entity_cooccur: dict[str, Counter] = {} phrase_counts_recent = Counter() bucket_size_hours = max(1.0, hours / 6.0) # split window into ~6 buckets for c in clusters: ents_in_cluster = [e for e in (c.get("entities", []) or []) if not _is_generic_entity(e)] ents_norm = [str(e).strip().lower() for e in ents_in_cluster if str(e).strip()] age_h = _cluster_age_hours(c) is_recent = age_h <= half_hours bucket_idx = int(age_h / bucket_size_hours) # --- around filter: only count clusters that mention the target entity --- if around_terms: haystack = set(ents_norm) for res in c.get("entityResolutions", []) or []: if isinstance(res, dict): for key in ("normalized", "canonical_label"): val = res.get(key) if val: haystack.add(str(val).strip().lower()) if not (haystack & around_terms): continue counts = entity_counts_recent if is_recent else entity_counts_prior imp_acc = entity_importance_recent if is_recent else None # only importance from recent window for ent in ents_norm: if _is_generic_entity(ent): continue counts[ent] += 1 if ent not in entity_sources: entity_sources[ent] = set() src = c.get("source") or c.get("headline", "").split(" - ")[-1] if c.get("headline") else "" if src: entity_sources[ent].add(str(src)) if ent not in entity_buckets: entity_buckets[ent] = set() entity_buckets[ent].add(bucket_idx) if imp_acc is not None: try: imp_acc[ent] += float(c.get("importance", 0.0) or 0.0) except Exception: pass # co-occurrence (only for clusters matching the around filter, if any) for i in range(len(ents_norm)): a = ents_norm[i] if _is_generic_entity(a): continue if a not in entity_cooccur: entity_cooccur[a] = Counter() for j in range(len(ents_norm)): if i == j: continue b = ents_norm[j] if _is_generic_entity(b): continue entity_cooccur[a][b] += 1 # bigram phrases (recent only) if is_recent: text = f"{c.get('headline', '')} {c.get('summary', '')}" words = re.findall(r"[A-Za-z][A-Za-z0-9\-]{2,}", text.lower()) for i in range(len(words) - 1): phrase = f"{words[i]} {words[i+1]}" if len(phrase) > 6: phrase_counts_recent[phrase] += 1 # --- score entities --- all_entities = set(entity_counts_recent.keys()) | set(entity_counts_prior.keys()) scored = [] for ent in all_entities: recent_n = entity_counts_recent.get(ent, 0) prior_n = entity_counts_prior.get(ent, 0) total_n = recent_n + prior_n if total_n < 1: continue # velocity: ratio of recent vs prior (smoothed to avoid division noise) # 0 prior → velocity = recent_n (pure emergence) # equal → velocity = 1.0 (steady) velocity = (recent_n + 0.5) / (prior_n + 0.5) # recency weight: what fraction of total hits are in the recent window recency_ratio = recent_n / total_n # source diversity: how many distinct outlets n_sources = len(entity_sources.get(ent, set())) # sustained: how many distinct time buckets did it appear in (max ~6) n_buckets = len(entity_buckets.get(ent, set())) # average importance (recent window only) avg_imp = (entity_importance_recent.get(ent, 0.0) / max(1, recent_n)) if recent_n > 0 else 0.0 composed_score = ( 0.35 * min(1.0, math.log1p(velocity) / math.log1p(4.0)) + # velocity (0..1, 4x = max) 0.25 * recency_ratio + # recency concentration 0.15 * min(1.0, n_sources / 5.0) + # source diversity 0.10 * min(1.0, n_buckets / 4.0) + # sustained (>1 bucket) 0.15 * min(1.0, avg_imp) # importance ) related = [] if ent in entity_cooccur: for other, _cnt in entity_cooccur[ent].most_common(5): if other != ent: related.append(other) scored.append({ "topic": ent, "trend_score": min(0.99, round(composed_score, 3)), "related_entities": related[:3] if related else [ent], "velocity": round(velocity, 2), "recent_count": recent_n, "prior_count": prior_n, "source_count": n_sources, "avg_importance": round(avg_imp, 3), "signal_type": "entity", }) # sort by composed score descending scored.sort(key=lambda x: (-x["trend_score"], -x["velocity"], x["topic"])) # --- add phrase signals (only from recent window) --- emerging = list(scored) # start with entities for phrase, count in phrase_counts_recent.most_common(limit * 2): if any(item["topic"] == phrase for item in emerging): continue emerging.append({ "topic": phrase.title(), "trend_score": min(0.99, round(0.30 + 0.15 * min(count, 5), 2)), "related_entities": [], "velocity": None, "recent_count": count, "prior_count": 0, "source_count": 0, "avg_importance": 0.0, "signal_type": "phrase", }) if len(emerging) >= limit: break return emerging[:limit] @mcp.tool(description="Investigate whether sentiment around an entity is positive, negative, or neutral over a chosen lookback window.") async def get_news_sentiment(entity: str, timeframe: str = "24h"): store = SQLiteClusterStore(DB_PATH) ent = normalize_query(entity).strip().lower() resolved = resolve_entity_via_trends(ent) query_terms = { ent, str(resolved.get("normalized") or "").strip().lower(), str(resolved.get("canonical_label") or "").strip().lower(), str(resolved.get("mid") or "").strip().lower(), } query_terms = {q for q in query_terms if q} if not ent: return { "entity": entity, "sentiment": "neutral", "score": 0.0, "cluster_count": 0, } # timeframe: accept '24h' or '24' tf = str(timeframe).strip().lower() try: hours = int(tf[:-1]) if tf.endswith("h") else int(tf) except Exception: hours = 24 hours = max(1, min(int(hours), 168)) clusters = store.get_latest_clusters_all_topics(ttl_hours=hours, limit=500) matched = [] for c in clusters: haystack = _cluster_entity_haystack(c) if any(any(term in item for item in haystack) for term in query_terms): matched.append(c) if not matched: return { "entity": entity, "sentiment": "neutral", "score": 0.0, "cluster_count": 0, } scores = [] for c in matched: s = c.get("sentimentScore") if s is not None: try: scores.append(float(s)) except Exception: pass avg_score = sum(scores) / len(scores) if scores else 0.0 # Keep the label aligned with the numeric score. # Small magnitudes are treated as neutral to avoid noisy label flips. if avg_score >= 0.15: sentiment = "positive" elif avg_score <= -0.15: sentiment = "negative" else: sentiment = "neutral" return { "entity": entity, "sentiment": sentiment, "score": round(avg_score, 3), "cluster_count": len(matched), } @mcp.tool(description="Describe the server tool surface, how tools fit together, and output conventions for downstream agents.") async def get_capabilities(): return { "server": { "name": "news-mcp", "purpose": "Recent news clusters, entity drill-down, sentiment, emerging topics, and related-entity expansion.", "output_conventions": { "cluster_ids": "Do not surface cluster_id in user-facing prose unless explicitly requested; treat it as internal navigation metadata.", "sources": "Always preserve and display sources when summarizing a cluster or entity result.", "timestamps": "Mention timestamps consistently when comparing multiple clusters or when recency matters.", }, }, "tools": NEWS_TOOL_CARDS, "recipes": NEWS_COMPOSITION_RECIPES, "example_chains": NEWS_EXAMPLE_CHAINS, "agent_tips": NEWS_AGENT_TIPS, "guidance": [ "Use get_latest_events for a tail, get_events_for_entity for entity deep dives, and get_related_recent_entities for neighborhood expansion.", "Prefer normalized/canonical entities when possible, but the server will resolve common aliases and MIDs for you.", "When presenting results to users, summarize the cluster; avoid exposing internal IDs unless they are needed for follow-up tool calls.", "For emerging topics, use detect_emerging_topics with timeframe and around parameters to scope your query. High velocity + high source_count = strong emerging signal.", ], } def _parse_timeframe_to_hours(timeframe: str) -> int: tf = str(timeframe).strip().lower() try: if tf.endswith("d"): days = int(tf[:-1]) return max(1, days * 24) if tf.endswith("h"): return max(1, int(tf[:-1])) return max(1, int(tf)) except Exception: return 24 from contextlib import asynccontextmanager @asynccontextmanager async def _lifespan(app: FastAPI): asyncio.ensure_future(_background_refresh_loop()) yield app = FastAPI(title="News MCP Server", lifespan=_lifespan) logger = logging.getLogger("news_mcp.startup") app.mount("/mcp", mcp.sse_app()) # Shared store — single connection pool _shared_store = SQLiteClusterStore(DB_PATH) _refresh_lock = asyncio.Lock() _refresh_started = False async def _background_refresh_loop(): """Non-blocking background refresher: prune then poll. Protected by an async lock so a second event-loop wake-up cannot start a parallel ingestion cycle. """ global _refresh_started async with _refresh_lock: if _refresh_started: return _refresh_started = True logger.info("news-mcp llm config: %s", active_llm_config()) # Prune off-thread so we do not block the event loop prune_result = await asyncio.to_thread( _shared_store.prune_if_due, NEWS_PRUNING_ENABLED, NEWS_RETENTION_DAYS, NEWS_PRUNE_INTERVAL_HOURS, ) logger.info("startup prune_result=%s", prune_result) if not NEWS_BACKGROUND_REFRESH_ENABLED: return async def _loop(): if not NEWS_BACKGROUND_REFRESH_ON_START: logger.info("background refresh delayed start interval_seconds=%s", NEWS_REFRESH_INTERVAL_SECONDS) await asyncio.sleep(float(NEWS_REFRESH_INTERVAL_SECONDS)) while True: try: logger.info("background refresh tick start") await refresh_clusters(topic=None, limit=200) logger.info("background refresh tick complete") except Exception: logger.exception("background refresh tick failed") await asyncio.sleep(float(NEWS_REFRESH_INTERVAL_SECONDS)) asyncio.create_task(_loop()) @app.get("/") def root(): return { "status": "ok", "transport": "fastmcp+sse", "mount": "/mcp", "tools": [ "get_latest_events", "get_events_for_entity", "get_event_summary", "detect_emerging_topics", "get_news_sentiment", "get_related_recent_entities", "get_capabilities", ], "refresh": { "enabled": NEWS_BACKGROUND_REFRESH_ENABLED, "interval_seconds": NEWS_REFRESH_INTERVAL_SECONDS, }, "retention": { "lookback_hours": DEFAULT_LOOKBACK_HOURS, "retention_days": NEWS_RETENTION_DAYS, }, "pruning": { "enabled": NEWS_PRUNING_ENABLED, "interval_hours": NEWS_PRUNE_INTERVAL_HOURS, }, } # ------------------------------------------------------------------ # Dashboard REST API endpoints # ------------------------------------------------------------------ from fastapi.staticfiles import StaticFiles from fastapi.responses import JSONResponse app.mount("/dashboard", StaticFiles(directory="dashboard", html=True), name="dashboard") import logging as _log API_LOG = _log.getLogger("news_mcp.api") def _api_ok(data: dict) -> dict: return data def _api_err(exc: Exception, ctx: str) -> JSONResponse: API_LOG.exception(f"API error in {ctx}") return JSONResponse(status_code=500, content={"error": str(exc), "ctx": ctx}) @app.get("/api/v1/health") def api_health(): """Extended health + dashboard stats.""" try: store = DashboardStore(_shared_store) stats = store.get_dashboard_stats() stats["version"] = _VERSION_HASH return stats except Exception as e: return _api_err(e, "health") @app.get("/api/v1/clusters") def api_clusters( topic: str | None = None, hours: int = 24, limit: int = 50, offset: int = 0, ): """Paginated cluster listing.""" try: store = DashboardStore(_shared_store) clusters = store.get_clusters_page(topic=topic, hours=hours, limit=limit, offset=offset) with store._store._conn() as conn: if topic and topic != "all": count_row = conn.execute( "SELECT COUNT(*) FROM clusters WHERE updated_at >= datetime('now', ? || ' hours') AND topic = ?", (-hours, topic), ).fetchone() else: count_row = conn.execute( "SELECT COUNT(*) FROM clusters WHERE updated_at >= datetime('now', ? || ' hours')", (-hours,), ).fetchone() total = count_row[0] if count_row else 0 return {"clusters": clusters, "total": total, "topic": topic or "all", "hours": hours} except Exception as e: return _api_err(e, f"clusters(topic={topic},hours={hours})") @app.get("/api/v1/sentiment-series") def api_sentiment_series( topic: str | None = None, hours: int = 24, bucket_hours: float = 1.0, ): """Sentiment time-series for Chart.js.""" try: store = DashboardStore(_shared_store) series = store.get_sentiment_series(topic=topic, hours=hours, bucket_hours=bucket_hours) return {"series": series, "topic": topic or "all"} except Exception as e: return _api_err(e, f"sentiment(topic={topic})") @app.get("/api/v1/entities") def api_entities( hours: int = 24, limit: int = 30, ): """Top entity frequencies.""" try: store = DashboardStore(_shared_store) entities = store.get_entity_frequencies(hours=hours, limit=limit) return {"entities": entities, "hours": hours} except Exception as e: return _api_err(e, f"entities(hours={hours})") @app.get("/api/v1/cluster/{cluster_id}") def api_cluster_detail(cluster_id: str): """Full cluster detail for drill-down.""" try: store = DashboardStore(_shared_store) detail = store.get_cluster_detail(cluster_id) if not detail: return JSONResponse(status_code=404, content={"error": "Cluster not found", "id": cluster_id}) return detail except Exception as e: return _api_err(e, f"detail({cluster_id})") # ------------------------------------------------------------------ # Feed management endpoints (toggle on/off from dashboard) # ------------------------------------------------------------------ @app.get("/api/v1/feeds") def api_feeds(): """List all configured feeds with enabled/disabled status.""" try: store = SQLiteClusterStore(DB_PATH) feed_list = store.get_feed_state_list() configured = _configured_feed_urls() return { "feeds": feed_list, "configured_urls": configured, } except Exception as e: return _api_err(e, "feeds") @app.post("/api/v1/feeds/toggle") async def api_feed_toggle(feed_url: str = Form(), enabled: bool = Form()): """Toggle a feed's enabled state.""" try: store = SQLiteClusterStore(DB_PATH) ok = store.set_feed_enabled(feed_url.strip(), enabled) if not ok: return JSONResponse( status_code=404, content={"error": f"Feed not found: {feed_url}"}, ) return {"ok": True, "feed_url": feed_url.strip(), "enabled": enabled} except Exception as e: return _api_err(e, f"toggle({feed_url})") @app.get("/health") def health(): return { "status": "ok", "uptime": round(time.monotonic() - _PROCESS_STARTED_AT, 3), "version": _VERSION_HASH, }