from __future__ import annotations import asyncio import hashlib import json import logging import math import re import time from collections import Counter from datetime import datetime, timezone from pathlib import Path from fastapi import FastAPI, Form from mcp.server.fastmcp import FastMCP from mcp.server.transport_security import TransportSecuritySettings from news_mcp.config import DEFAULT_LOOKBACK_HOURS, DEFAULT_TOPICS, DB_PATH from news_mcp.config import ( NEWS_FEED_URL, NEWS_FEED_URLS, NEWS_PRUNE_INTERVAL_HOURS, NEWS_PRUNING_ENABLED, NEWS_REFRESH_INTERVAL_SECONDS, NEWS_BACKGROUND_REFRESH_ENABLED, NEWS_BACKGROUND_REFRESH_ON_START, NEWS_RETENTION_DAYS, ) from news_mcp.jobs.poller import refresh_clusters from news_mcp.storage.sqlite_store import SQLiteClusterStore from news_mcp.enrichment.llm_enrich import summarize_cluster_llm from news_mcp.trends_resolution import resolve_entity_via_trends from news_mcp.llm import active_llm_config from news_mcp.entity_normalize import normalize_query from news_mcp.related_entities import related_recent_entities logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s", ) _PROCESS_STARTED_AT = time.monotonic() _PACKAGE_DIR = Path(__file__).resolve().parent def _compute_version_hash() -> str: """SHA-256 of all .py files under news_mcp/, sorted by relative path. Deterministic across machines and environments — no git dependency. Works identically in Docker and native runs. """ hasher = hashlib.sha256() for f in sorted(_PACKAGE_DIR.rglob("*.py")): try: hasher.update(f.read_bytes()) except OSError: continue return hasher.hexdigest()[:9] _VERSION_HASH = _compute_version_hash() mcp = FastMCP( "news-mcp", transport_security=TransportSecuritySettings(enable_dns_rebinding_protection=False), ) def _cluster_entity_haystack(cluster: dict) -> list[str]: """Collect the normalized entity + keyword clues attached to a cluster.""" values: list[str] = [] for ent in cluster.get("entities", []) or []: values.append(str(ent).strip().lower()) for res in cluster.get("entityResolutions", []) or []: if not isinstance(res, dict): continue for key in ("normalized", "canonical_label", "mid"): val = res.get(key) if val: values.append(str(val).strip().lower()) # Keywords are LLM-curated thematic descriptors — include them in the # searchable haystack so entity/theme queries match on subject-matter # signals, not just named entities. for kw in cluster.get("keywords", []) or []: values.append(str(kw).strip().lower()) return [v for v in values if v] def _parse_cluster_timestamp(value) -> datetime: """Parse a stored cluster timestamp. payload.timestamp is guaranteed ISO 8601 UTC (YYYY-MM-DDTHH:MM:SS+00:00) at write time. Only datetime.fromisoformat is needed — no RFC 2822 fallback. """ if not value: return datetime.min.replace(tzinfo=timezone.utc) text = str(value).strip() if not text: return datetime.min.replace(tzinfo=timezone.utc) try: dt = datetime.fromisoformat(text) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) return dt.astimezone(timezone.utc) except Exception: return datetime.min.replace(tzinfo=timezone.utc) def _sort_clusters_by_recency(clusters: list[dict]) -> list[dict]: return sorted( clusters, key=lambda c: ( _parse_cluster_timestamp(c.get("timestamp")), float(c.get("importance", 0.0) or 0.0), ), reverse=True, ) def _tool_card(name: str, description: str, inputs: list[dict], outputs: list[str], notes: list[str] | None = None) -> dict: return { "name": name, "description": description, "inputs": inputs, "outputs": outputs, "notes": notes or [], } NEWS_TOOL_CARDS = [ _tool_card( "get_feeds", "List all configured RSS feeds with their enabled/disabled status.", [], ["feeds[]: {feed_key, enabled, last_hash, last_item_count, updated_at}"], ["Use this to see which feeds are currently active or disabled."], ), _tool_card( "toggle_feed", "Enable or disable a specific RSS feed by URL.", [ {"name": "feed_url", "type": "string", "meaning": "the feed URL to toggle"}, {"name": "enabled", "type": "boolean", "meaning": "true to enable, false to disable"}, ], ["ok", "feed_key", "enabled"], ["Changes take effect on the next refresh cycle."], ), _tool_card( "get_latest_events", "Get the newest deduplicated clusters for a topic or resolved entity-like query.", [ {"name": "topic", "type": "string", "default": "all topics", "meaning": "coarse category (crypto, macro, regulation, ai, other), entity-like topic, or omit for all topics"}, {"name": "limit", "type": "integer", "default": 5, "range": "1-20"}, {"name": "include_articles", "type": "boolean", "default": False}, ], ["headline", "summary", "entities", "keywords", "sentiment", "importance", "sources", "timestamp", "articles?"], ["Use when you want the freshest clusters. Each cluster includes both named entities and LLM-curated thematic keywords describing what the story is about."], ), _tool_card( "get_events_for_entity", "Search recent clusters for a person, place, company, theme, or keyword by matching entities and thematic keywords.", [ {"name": "entity", "type": "string", "meaning": "entity label, phrase, or keyword to search for"}, {"name": "timeframe", "type": "string", "default": "24h", "examples": ["24h", "72h", "3d"]}, {"name": "limit", "type": "integer", "default": 10, "range": "1-30"}, {"name": "include_articles", "type": "boolean", "default": False}, ], ["headline", "summary", "entities", "keywords", "sentiment", "importance", "sources", "timestamp", "articles?"], ["Matches against both named entities and thematic keywords. Use this for an entity-centered or theme-centered deep dive."], ), _tool_card( "get_event_summary", "Produce a concise LLM-written explanation for one cluster and key facts.", [ {"name": "event_id", "type": "string", "meaning": "cluster_id; do not surface in user-facing prose"}, {"name": "include_articles", "type": "boolean", "default": True}, ], ["headline", "mergedSummary", "keyFacts", "sources", "entities", "keywords", "related_entities", "related_keywords", "topic", "sentiment", "importance", "articles"], ["Rich cluster drill-down. Returns LLM summary + cluster metadata + articles. Defaults to include articles."], ), _tool_card( "detect_emerging_topics", "Surface emerging entities, thematic keywords, and phrases that are accelerating in the recent window.", [ {"name": "limit", "type": "integer", "default": 10, "range": "1-20"}, {"name": "timeframe", "type": "string", "default": "24h", "examples": ["4h", "24h", "3d"]}, {"name": "topic", "type": "string", "default": "all topics", "examples": ["crypto", "macro", "regulation", "ai", "other"]}, {"name": "around", "type": "string", "default": "none", "meaning": "entity to scope results to its neighborhood (e.g. \"Bitcoin\")"}, ], ["topic", "trend_score", "velocity", "recent_count", "prior_count", "source_count", "related_entities", "related_keywords", "signal_type"], ["Use timeframe to control lookback, topic to scope to a category, around to find what's emerging near a specific entity. Signal types: entity (named entity), keyword (thematic descriptor), phrase (headline bigram). Check velocity and source_count to distinguish real spikes from noise."], ), _tool_card( "get_news_sentiment", "Estimate sentiment around an entity or keyword over a lookback window.", [ {"name": "entity", "type": "string", "meaning": "entity label, phrase, or keyword to analyze"}, {"name": "timeframe", "type": "string", "default": "24h"}, ], ["entity", "sentiment", "score", "cluster_count"], ["Matches clusters by entities and keywords. Use after locating a cluster set or entity neighborhood."], ), _tool_card( "get_related_recent_entities", "Find entities and thematic keywords commonly co-occurring with a subject in recent clusters, optionally blended with Google Trends suggestions.", [ {"name": "subject", "type": "string", "meaning": "canonical entity or subject phrase"}, {"name": "timeframe", "type": "string", "default": "72h"}, {"name": "limit", "type": "integer", "default": 10, "range": "1-25"}, {"name": "include_trends", "type": "boolean", "default": True}, ], ["subject", "related[].normalized", "related[].canonical_label", "related[].mid", "related[].sources", "related[].scores"], ["Use this to drill from a subject into related entities and themes, then feed results into get_events_for_entity."], ), ] NEWS_COMPOSITION_RECIPES = [ { "name": "fresh-news-tail", "steps": [ "get_latest_events(topic=...)", "optionally get_event_summary(event_id=...) for the strongest cluster", ], "notes": ["Best for a quick tail of what is happening now. Omit topic for all topics, or pass crypto/macro/regulation/ai/other to filter."] }, { "name": "entity-deep-dive", "steps": [ "get_events_for_entity(entity=...)", "get_event_summary(event_id=...)", "get_news_sentiment(entity=..., timeframe=...)", ], "notes": ["Prefer canonical entity labels when you have them; the server normalizes for you."], }, { "name": "subject-neighborhood", "steps": [ "get_related_recent_entities(subject=...)", "for each strong related entity, call get_events_for_entity(entity=...)", ], "notes": ["Use this when you want a graph-like expansion around a subject."] }, { "name": "emerging-signal", "steps": [ "detect_emerging_topics(limit=..., timeframe=..., topic=..., around=...)", "choose a topic/entity from the results", "get_events_for_entity(entity=...)", "get_news_sentiment(entity=...)", ], "notes": ["Use timeframe to control lookback (e.g. \"4h\" for what's hot right now, \"3d\" for weekly trends), topic to scope to a category, around to find what's emerging near a specific entity. Check velocity and source_count to distinguish real spikes from noise."], }, ] NEWS_AGENT_TIPS = [ "If you need a fast answer, start with get_latest_events, then summarize the strongest cluster with get_event_summary.", "If a user asks about a person/place/company/theme, use get_events_for_entity before broadening to get_related_recent_entities.", "Treat cluster_id as an internal cursor, not user-facing output; use it only for follow-up tool calls.", "When describing clusters, keep sources and timestamps visible so the user can assess recency and provenance.", "Prefer a short chain of tools over many parallel calls unless you are building a neighborhood map or comparison table.", "For tricky names, rely on the server's resolver instead of inventing alias rules in the client.", "Use detect_emerging_topics with timeframe=\"4h\" for what's hot right now, timeframe=\"3d\" for weekly trends. Use topic= to scope to a category, around= to find what's emerging near a specific entity. Check velocity to distinguish accelerating signals from steady-state ones. Filter by signal_type to focus on entities, keywords, or phrases. Each result also includes related_keywords for thematic context.", "get_event_summary returns a rich result: headline, mergedSummary, keyFacts, entities, keywords, related_entities, related_keywords, topic, sentiment, importance, and articles (included by default). Use it for full cluster drill-down.", "Each cluster contains both entities (named entities with identity resolution) and keywords (thematic descriptors). Use keywords to understand what a story is about beyond the named entities.", "Use detect_emerging_topics with multiple timeframes (e.g. 4h vs 3d) and compare results to distinguish what's hot right now vs what's persistently trending. related_keywords help identify thematic neighborhoods.", ] NEWS_EXAMPLE_CHAINS = [ { "task": "What is happening now?", "chain": [ "get_latest_events(topic=...)", "get_event_summary(event_id=...) if one cluster looks important", ], }, { "task": "Deep dive on an entity", "chain": [ "get_events_for_entity(entity=..., timeframe=...)", "get_news_sentiment(entity=..., timeframe=...)", "get_event_summary(event_id=...) for the strongest cluster", ], }, { "task": "Broaden from a subject", "chain": [ "get_related_recent_entities(subject=..., include_trends=true)", "get_events_for_entity(entity=...) for the strongest related entities", ], }, { "task": "Find what is emerging", "chain": [ "detect_emerging_topics(limit=..., timeframe=..., topic=..., around=...) with optional scoping", "get_events_for_entity(entity=...) on one or two emerging terms", ], }, { "task": "What's heating up around a specific entity", "chain": [ "detect_emerging_topics(around=\"\", timeframe=\"4h\")", "get_events_for_entity(entity=...) on the top emerging neighbor", ], }, { "task": "Full investigation pipeline", "chain": [ "detect_emerging_topics(limit=20, timeframe=\"3d\")", "pick an emerging entity/keyword and note its related_entities and related_keywords", "get_event_summary(event_id=...) on the top cluster for full context including articles", "get_news_sentiment(entity=...) to gauge tone around the emerging topic", "detect_emerging_topics(around=, timeframe=\"4h\") to scout its neighborhood", ], }, ] def _configured_feed_urls() -> list[str]: """Return the configured feed URLs from environment variables.""" urls = [u.strip() for u in NEWS_FEED_URLS.split(",") if u.strip()] if not urls: urls = [NEWS_FEED_URL] return urls @mcp.tool(description="List all configured RSS feeds with their current enabled/disabled status.") async def get_feeds() -> list[dict]: """Return each feed URL with its enabled flag, last fetch stats, and timestamps.""" store = SQLiteClusterStore(DB_PATH) return store.get_feed_state_list() @mcp.tool(description="Enable or disable a specific RSS feed by URL.") async def toggle_feed(feed_url: str, enabled: bool) -> dict: """Toggle a feed's active/inactive state. Changes take effect on the next background refresh cycle. Returns the updated feed state. """ store = SQLiteClusterStore(DB_PATH) store.set_feed_enabled(feed_url.strip(), enabled) updated = store.get_feed_state(feed_url.strip()) return {"ok": True, "feed_key": feed_url.strip(), "enabled": enabled, "details": updated} @mcp.tool(description="Investigate a topic and return the newest deduplicated news clusters with entities and thematic keywords, sorted by recency.") async def get_latest_events(topic: str | None = None, limit: int = 5, include_articles: bool = False): limit = max(1, min(int(limit), 20)) # When topic is omitted, search across all topics (no topic filter). # When topic is provided and matches a known topic, filter by that topic. # Otherwise treat the value as an entity-like query. topic_norm = normalize_query(topic).lower() if topic else "" resolved = resolve_entity_via_trends(topic_norm) if topic_norm else {} allowed = {t.lower() for t in DEFAULT_TOPICS} is_topic = topic_norm in allowed is_all_topics = not topic_norm query_terms = { topic_norm, str(resolved.get("normalized") or "").strip().lower(), str(resolved.get("canonical_label") or "").strip().lower(), str(resolved.get("mid") or "").strip().lower(), } query_terms = {q for q in query_terms if q} store = SQLiteClusterStore(DB_PATH) if is_all_topics: # No topic specified: return freshest clusters across all topics. clusters = store.get_latest_clusters_all_topics(ttl_hours=DEFAULT_LOOKBACK_HOURS, limit=limit) elif is_topic: # Cache-first: only refresh if we currently have no fresh clusters for this topic. clusters = store.get_latest_clusters(topic=topic_norm, ttl_hours=DEFAULT_LOOKBACK_HOURS, limit=limit) if not clusters: await refresh_clusters(topic=topic_norm, limit=200) clusters = store.get_latest_clusters(topic=topic_norm, ttl_hours=DEFAULT_LOOKBACK_HOURS, limit=limit) else: # Entity-aware mode: search recent clusters across all topics and match by # raw entity, canonical label, or MID using SQL-level junction table search. clusters = store.get_clusters_by_entity_or_keyword( query_terms=query_terms, hours=DEFAULT_LOOKBACK_HOURS, limit=limit ) out = [] for c in _sort_clusters_by_recency(clusters): item = { "cluster_id": c.get("cluster_id"), "headline": c.get("headline"), "summary": c.get("summary"), "entities": c.get("entities", []), "keywords": c.get("keywords", []), "sentiment": c.get("sentiment", "neutral"), "importance": c.get("importance", 0.0), "sources": c.get("sources", []), "timestamp": c.get("timestamp"), } if include_articles: # Return minimal article fields to keep responses compact. arts = c.get("articles", []) or [] item["articles"] = [ { "title": a.get("title"), "url": a.get("url"), "source": a.get("source"), "timestamp": a.get("timestamp"), } for a in arts if isinstance(a, dict) ] out.append(item) return out @mcp.tool(description="Investigate a person, company, place, theme, or keyword by matching entities and thematic keywords within a time window.") async def get_events_for_entity(entity: str, limit: int = 10, timeframe: str = "24h", include_articles: bool = False): limit = max(1, min(int(limit), 30)) query = normalize_query(entity).strip().lower() if not query: return [] resolved = resolve_entity_via_trends(query) query_terms = { query, str(resolved.get("normalized") or "").strip().lower(), str(resolved.get("canonical_label") or "").strip().lower(), str(resolved.get("mid") or "").strip().lower(), } query_terms = {q for q in query_terms if q} store = SQLiteClusterStore(DB_PATH) hours = _parse_timeframe_to_hours(timeframe) hits = store.get_clusters_by_entity_or_keyword(query_terms=query_terms, hours=hours, limit=limit) out = [] for c in hits: item = { "cluster_id": c.get("cluster_id"), "headline": c.get("headline"), "summary": c.get("summary"), "entities": c.get("entities", []), "keywords": c.get("keywords", []), "sentiment": c.get("sentiment", "neutral"), "importance": c.get("importance", 0.0), "sources": c.get("sources", []), "timestamp": c.get("timestamp"), } if include_articles: arts = c.get("articles", []) or [] item["articles"] = [ { "title": a.get("title"), "url": a.get("url"), "source": a.get("source"), "timestamp": a.get("timestamp"), } for a in arts if isinstance(a, dict) ] out.append(item) return out @mcp.tool(description="Return entities and thematic keywords commonly co-occurring with the subject in recent clusters, optionally blended with Google Trends suggestions.") async def get_related_recent_entities(subject: str, timeframe: str = "72h", limit: int = 10, include_trends: bool = True): limit = max(1, min(int(limit), 25)) hours = _parse_timeframe_to_hours(timeframe) include_trends_bool = str(include_trends).strip().lower() not in {"false", "0", "no"} store = SQLiteClusterStore(DB_PATH) result = related_recent_entities( store=store, subject=subject, timeframe_hours=hours, limit=limit, include_trends=include_trends_bool, ) return result @mcp.tool(description="Investigate one cluster in depth and return a concise LLM-written explanation plus key facts, " "entities, keywords, related entities and keywords, sentiment, importance, and articles.") async def get_event_summary(event_id: str, include_articles: bool = True): store = SQLiteClusterStore(DB_PATH) # Summary cache: reuse if present within TTL. cluster = store.get_cluster_by_id(event_id) if not cluster: return { "event_id": event_id, "error": "NOT_FOUND", } cached_summary = store.get_cluster_summary( cluster_id=event_id, ttl_hours=DEFAULT_LOOKBACK_HOURS, ) def _enrich(base: dict, src_cluster: dict) -> dict: base["entities"] = src_cluster.get("entities", []) base["keywords"] = src_cluster.get("keywords", []) base["topic"] = src_cluster.get("topic", "other") base["sentiment"] = src_cluster.get("sentiment", "neutral") base["sentimentScore"] = src_cluster.get("sentimentScore") base["importance"] = src_cluster.get("importance", 0.0) # Related entities: from co-occurrence in this cluster's article set resolved = src_cluster.get("entityResolutions", []) or [] related_ents = [] seen_ents = {str(e).strip().lower() for e in (src_cluster.get("entities", []) or [])} for res in resolved: if isinstance(res, dict): label = str(res.get("canonical_label") or res.get("normalized") or "").strip() if label and label.lower() not in seen_ents: related_ents.append(label) seen_ents.add(label.lower()) base["related_entities"] = related_ents[:10] # Related keywords: from the cluster's own keywords (thematic descriptors) # plus any co-occurring keywords from recent related clusters base["related_keywords"] = _fetch_related_keywords(store, src_cluster, event_id) if include_articles: arts = src_cluster.get("articles", []) or [] base["articles"] = [ { "title": a.get("title"), "url": a.get("url"), "source": a.get("source"), "timestamp": a.get("timestamp"), } for a in arts if isinstance(a, dict) ] return base if cached_summary: out = { "event_id": event_id, "headline": cached_summary.get("headline"), "mergedSummary": cached_summary.get("mergedSummary"), "keyFacts": cached_summary.get("keyFacts", []), "sources": cached_summary.get("sources", []), } out = _enrich(out, cluster) return out summary = await summarize_cluster_llm(cluster) store.upsert_cluster_summary(event_id, summary) out = { "event_id": event_id, "headline": summary.get("headline"), "mergedSummary": summary.get("mergedSummary"), "keyFacts": summary.get("keyFacts", []), "sources": summary.get("sources", []), } out = _enrich(out, cluster) return out def _fetch_related_keywords(store: SQLiteClusterStore, cluster: dict, event_id: str) -> list[str]: """Find keywords that co-occur with this cluster's entities in recent clusters. This gives agents thematic context: what else was being discussed alongside the entities in this cluster during the same window. """ entities = cluster.get("entities", []) or [] if not entities: return [] # Build a set of entity terms to search with entity_terms = set() for e in entities: entity_terms.add(str(e).strip().lower()) for res in (cluster.get("entityResolutions", []) or []): if isinstance(res, dict): for key in ("normalized", "canonical_label"): val = res.get(key) if val: entity_terms.add(str(val).strip().lower()) entity_terms.discard("") if not entity_terms: return [] # Find recent clusters that share any entity, collect their keywords # Use payload_ts lookback of 48h for co-occurrence window from datetime import timedelta cutoff = (datetime.now(timezone.utc) - timedelta(hours=48)).isoformat() placeholders = ",".join("?" for _ in entity_terms) try: rows = store._conn().execute( f"SELECT DISTINCT c.payload FROM clusters c " f"JOIN cluster_entities ce ON c.cluster_id = ce.cluster_id " f"WHERE c.payload_ts >= ? AND c.cluster_id != ? " f"AND ce.entity IN ({placeholders}) " f"ORDER BY c.payload_ts DESC LIMIT 20", (cutoff, event_id, *entity_terms), ).fetchall() except Exception: return [] kw_counter: dict[str, int] = {} cluster_kws = {str(k).strip().lower() for k in (cluster.get("keywords", []) or []) if str(k).strip()} for (payload_text,) in rows: try: c = json.loads(payload_text) except Exception: continue for kw in (c.get("keywords", []) or []): kw_norm = str(kw).strip() if not kw_norm: continue kw_key = kw_norm.lower() # Skip keywords that already appear in this cluster if kw_key in cluster_kws: continue kw_counter[kw_norm] = kw_counter.get(kw_norm, 0) + 1 # Return top keywords by co-occurrence count sorted_kws = sorted(kw_counter.items(), key=lambda x: -x[1]) return [kw for kw, _ in sorted_kws[:10]] @mcp.tool(description="Explore what is starting to matter: surface emerging entities, thematic keywords, and phrases from recent clusters. " "Use timeframe to control the lookback window, topic to scope to a category, and around to find what's emerging near a specific entity. " "Results include signal_type (entity / keyword / phrase) for downstream filtering.") async def detect_emerging_topics(limit: int = 10, timeframe: str = "24h", topic: str | None = None, around: str | None = None): """Surface entities and phrases that are accelerating in recent clusters. Args: limit: max results to return (1-20, default 10). timeframe: lookback window like "4h", "24h", "3d" (default "24h"). topic: optional coarse topic filter ("crypto", "macro", "regulation", "ai", "other"). around: optional entity — only return entities that co-occur with this entity in the recent window (e.g. "Bitcoin" to find what's emerging in Bitcoin's neighborhood). """ limit = max(1, min(int(limit), 20)) hours = _parse_timeframe_to_hours(timeframe) half_hours = hours / 2.0 store = SQLiteClusterStore(DB_PATH) # Fetch more clusters than needed so velocity stats are meaningful even for short windows. clusters = store.get_latest_clusters_all_topics(ttl_hours=hours, limit=500) # --- optional topic filter --- if topic: topic_norm = normalize_query(topic).strip().lower() if topic_norm: clusters = [c for c in clusters if (c.get("topic") or "other").strip().lower() == topic_norm] # --- resolve the 'around' entity --- around_terms: set[str] = set() if around: around_norm = normalize_query(around).strip().lower() if around_norm: resolved = resolve_entity_via_trends(around_norm) around_terms = { around_norm, str(resolved.get("normalized") or "").strip().lower(), str(resolved.get("canonical_label") or "").strip().lower(), } around_terms.discard("") # split clusters into first-half vs second-half by timestamp # clusters are already sorted most-recent-first from the store now = datetime.now(timezone.utc) def _cluster_age_hours(c: dict) -> float: """Return the cluster's age in hours. payload.timestamp is ISO 8601 UTC guaranteed.""" ts = c.get("timestamp") or c.get("last_updated") if not ts: return 0.0 try: dt = datetime.fromisoformat(str(ts).strip()) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) return max(0.0, (now - dt.astimezone(timezone.utc)).total_seconds() / 3600.0) except Exception: return 0.0 # Generic entity filter _generic_tokens = {"news", "latest", "breaking", "update", "updates", "report", "reports"} def _is_generic_entity(ent: str) -> bool: e = str(ent).strip().lower() if not e or len(e) < 4: return True if e in _generic_tokens: return True return False # --- accumulate signals --- # recent = second half of timeframe (newer), prior = first half (older) entity_counts_recent = Counter() entity_counts_prior = Counter() entity_importance_recent = Counter() entity_sources: dict[str, set] = {} # ent -> set of source names entity_buckets: dict[str, set] = {} # ent -> set of time-bucket indices (for sustained-spike detection) entity_cooccur: dict[str, Counter] = {} phrase_counts_recent = Counter() # Keyword accumulators — same scoring pipeline as entities, but tracking # LLM-curated thematic descriptors instead of named entities. kw_counts_recent = Counter() kw_counts_prior = Counter() kw_importance_recent = Counter() kw_sources: dict[str, set] = {} kw_buckets: dict[str, set] = {} kw_cooccur: dict[str, Counter] = {} entity_kw_cooccur: dict[str, Counter] = {} # entity -> Counter of co-occurring keywords bucket_size_hours = max(1.0, hours / 6.0) # split window into ~6 buckets for c in clusters: ents_in_cluster = [e for e in (c.get("entities", []) or []) if not _is_generic_entity(e)] ents_norm = [str(e).strip().lower() for e in ents_in_cluster if str(e).strip()] # Keywords: deduplicate per cluster so a cluster with the same keyword # listed twice doesn't inflate counts. kws_in_cluster = list(dict.fromkeys( str(k).strip().lower() for k in (c.get("keywords", []) or []) if str(k).strip() and not _is_generic_entity(k) )) age_h = _cluster_age_hours(c) is_recent = age_h <= half_hours bucket_idx = int(age_h / bucket_size_hours) # --- around filter: only count clusters that mention the target entity --- if around_terms: haystack = set(ents_norm) for res in c.get("entityResolutions", []) or []: if isinstance(res, dict): for key in ("normalized", "canonical_label"): val = res.get(key) if val: haystack.add(str(val).strip().lower()) if not (haystack & around_terms): continue counts = entity_counts_recent if is_recent else entity_counts_prior imp_acc = entity_importance_recent if is_recent else None # only importance from recent window for ent in ents_norm: if _is_generic_entity(ent): continue counts[ent] += 1 if ent not in entity_sources: entity_sources[ent] = set() src = c.get("source") or c.get("headline", "").split(" - ")[-1] if c.get("headline") else "" if src: entity_sources[ent].add(str(src)) if ent not in entity_buckets: entity_buckets[ent] = set() entity_buckets[ent].add(bucket_idx) if imp_acc is not None: try: imp_acc[ent] += float(c.get("importance", 0.0) or 0.0) except Exception: pass # --- keyword counting (same recent/prior split as entities) --- kw_counts = kw_counts_recent if is_recent else kw_counts_prior kw_imp_acc = kw_importance_recent if is_recent else None for kw in kws_in_cluster: kw_counts[kw] += 1 if kw not in kw_sources: kw_sources[kw] = set() src = c.get("source") or c.get("headline", "").split(" - ")[-1] if c.get("headline") else "" if src: kw_sources[kw].add(str(src)) if kw not in kw_buckets: kw_buckets[kw] = set() kw_buckets[kw].add(bucket_idx) if kw_imp_acc is not None: try: kw_imp_acc[kw] += float(c.get("importance", 0.0) or 0.0) # type: ignore[assignment] except Exception: pass # co-occurrence (only for clusters matching the around filter, if any) for i in range(len(ents_norm)): a = ents_norm[i] if _is_generic_entity(a): continue if a not in entity_cooccur: entity_cooccur[a] = Counter() for j in range(len(ents_norm)): if i == j: continue b = ents_norm[j] if _is_generic_entity(b): continue entity_cooccur[a][b] += 1 # keyword co-occurrence: which keywords appear together in the same clusters for i in range(len(kws_in_cluster)): ka = kws_in_cluster[i] if ka not in kw_cooccur: kw_cooccur[ka] = Counter() for j in range(len(kws_in_cluster)): if i == j: continue kb = kws_in_cluster[j] kw_cooccur[ka][kb] += 1 # also track entity<->keyword co-occurrence (bidirectional) for ent in ents_norm: if _is_generic_entity(ent): continue kw_cooccur[ka][ent] += 1 # and the reverse: entity -> keyword if ent not in entity_kw_cooccur: entity_kw_cooccur[ent] = Counter() entity_kw_cooccur[ent][ka] += 1 # bigram phrases (recent only) if is_recent: text = f"{c.get('headline', '')} {c.get('summary', '')}" words = re.findall(r"[A-Za-z][A-Za-z0-9\-]{2,}", text.lower()) for i in range(len(words) - 1): phrase = f"{words[i]} {words[i+1]}" if len(phrase) > 6: phrase_counts_recent[phrase] += 1 # --- score entities --- all_entities = set(entity_counts_recent.keys()) | set(entity_counts_prior.keys()) scored = [] for ent in all_entities: recent_n = entity_counts_recent.get(ent, 0) prior_n = entity_counts_prior.get(ent, 0) total_n = recent_n + prior_n if total_n < 1: continue # velocity: ratio of recent vs prior (smoothed to avoid division noise) # 0 prior → velocity = recent_n (pure emergence) # equal → velocity = 1.0 (steady) velocity = (recent_n + 0.5) / (prior_n + 0.5) # recency weight: what fraction of total hits are in the recent window recency_ratio = recent_n / total_n # source diversity: how many distinct outlets n_sources = len(entity_sources.get(ent, set())) # sustained: how many distinct time buckets did it appear in (max ~6) n_buckets = len(entity_buckets.get(ent, set())) # average importance (recent window only) avg_imp = (entity_importance_recent.get(ent, 0.0) / max(1, recent_n)) if recent_n > 0 else 0.0 composed_score = ( 0.35 * min(1.0, math.log1p(velocity) / math.log1p(4.0)) + # velocity (0..1, 4x = max) 0.25 * recency_ratio + # recency concentration 0.15 * min(1.0, n_sources / 5.0) + # source diversity 0.10 * min(1.0, n_buckets / 4.0) + # sustained (>1 bucket) 0.15 * min(1.0, avg_imp) # importance ) related = [] if ent in entity_cooccur: for other, _cnt in entity_cooccur[ent].most_common(10): if other != ent: related.append(other) related_kws = [] if ent in entity_kw_cooccur: # Build a set of related entity names (lowercased) to deduplicate # keywords that are already represented in related_entities related_ent_names = {e.strip().lower() for e in related} # Also include the entity itself and its common aliases related_ent_names.add(ent.strip().lower()) for kw, _cnt in entity_kw_cooccur[ent].most_common(10): kw_lower = kw.strip().lower() # Skip keywords that are just a related entity name (substring match) if any(kw_lower in ent_name or ent_name in kw_lower for ent_name in related_ent_names): continue related_kws.append(kw) if len(related_kws) >= 5: break scored.append({ "topic": ent, "trend_score": min(0.99, round(composed_score, 3)), "related_entities": related[:5] if related else [ent], "related_keywords": related_kws[:5], "velocity": round(velocity, 2), "recent_count": recent_n, "prior_count": prior_n, "source_count": n_sources, "avg_importance": round(avg_imp, 3), "signal_type": "entity", }) # --- score keywords (same velocity/recency/source/sustained/importance formula) --- all_keywords = set(kw_counts_recent.keys()) | set(kw_counts_prior.keys()) kw_scored = [] for kw in all_keywords: # Skip keywords that are already scored as entities — entity signal is # higher quality (proper nouns, resolved identities). if kw in all_entities: continue recent_n = kw_counts_recent.get(kw, 0) prior_n = kw_counts_prior.get(kw, 0) total_n = recent_n + prior_n if total_n < 1: continue velocity = (recent_n + 0.5) / (prior_n + 0.5) recency_ratio = recent_n / total_n n_sources = len(kw_sources.get(kw, set())) n_buckets = len(kw_buckets.get(kw, set())) avg_imp = (kw_importance_recent.get(kw, 0.0) / max(1, recent_n)) if recent_n > 0 else 0.0 composed_score = ( 0.35 * min(1.0, math.log1p(velocity) / math.log1p(4.0)) + 0.25 * recency_ratio + 0.15 * min(1.0, n_sources / 5.0) + 0.10 * min(1.0, n_buckets / 4.0) + 0.15 * min(1.0, avg_imp) ) kw_related_kws = [] kw_related_ents = [] if kw in kw_cooccur: for other, _cnt in kw_cooccur[kw].most_common(10): if other == kw: continue # If this co-occurring term is a known entity, route to related_entities if other in all_entities: kw_related_ents.append(other) else: kw_related_kws.append(other) if len(kw_related_kws) >= 5 and len(kw_related_ents) >= 5: break kw_scored.append({ "topic": kw, "trend_score": min(0.99, round(composed_score, 3)), "related_entities": kw_related_ents[:5], "related_keywords": kw_related_kws[:5], "velocity": round(velocity, 2), "recent_count": recent_n, "prior_count": prior_n, "source_count": n_sources, "avg_importance": round(avg_imp, 3), "signal_type": "keyword", }) # sort keywords by score descending kw_scored.sort(key=lambda x: (-x["trend_score"], -x["velocity"], x["topic"])) # sort by composed score descending scored.sort(key=lambda x: (-x["trend_score"], -x["velocity"], x["topic"])) # --- merge: entities first, then keywords, then phrases --- emerging = list(scored) # start with entities seen_topics = {item["topic"] for item in emerging} for kw_item in kw_scored: if kw_item["topic"] not in seen_topics: emerging.append(kw_item) seen_topics.add(kw_item["topic"]) # --- add phrase signals (only from recent window) --- for phrase, count in phrase_counts_recent.most_common(limit * 2): if phrase in seen_topics: continue emerging.append({ "topic": phrase.title(), "trend_score": min(0.99, round(0.30 + 0.15 * min(count, 5), 2)), "related_entities": [], "related_keywords": [], "velocity": None, "recent_count": count, "prior_count": 0, "source_count": 0, "avg_importance": 0.0, "signal_type": "phrase", }) seen_topics.add(phrase) if len(emerging) >= limit: break return emerging[:limit] @mcp.tool(description="Investigate whether sentiment around an entity or keyword is positive, negative, or neutral over a chosen lookback window. " "Matches clusters by both named entities and thematic keywords.") async def get_news_sentiment(entity: str, timeframe: str = "24h"): store = SQLiteClusterStore(DB_PATH) ent = normalize_query(entity).strip().lower() resolved = resolve_entity_via_trends(ent) query_terms = { ent, str(resolved.get("normalized") or "").strip().lower(), str(resolved.get("canonical_label") or "").strip().lower(), str(resolved.get("mid") or "").strip().lower(), } query_terms = {q for q in query_terms if q} if not ent: return { "entity": entity, "sentiment": "neutral", "score": 0.0, "cluster_count": 0, } # timeframe: accept '24h' or '24' tf = str(timeframe).strip().lower() try: hours = int(tf[:-1]) if tf.endswith("h") else int(tf) except Exception: hours = 24 hours = max(1, min(int(hours), 168)) clusters = store.get_clusters_by_entity_or_keyword(query_terms=query_terms, hours=hours, limit=500) matched = clusters if not matched: return { "entity": entity, "sentiment": "neutral", "score": 0.0, "cluster_count": 0, } scores = [] for c in matched: s = c.get("sentimentScore") if s is not None: try: scores.append(float(s)) except Exception: pass avg_score = sum(scores) / len(scores) if scores else 0.0 # Keep the label aligned with the numeric score. # Small magnitudes are treated as neutral to avoid noisy label flips. if avg_score >= 0.15: sentiment = "positive" elif avg_score <= -0.15: sentiment = "negative" else: sentiment = "neutral" return { "entity": entity, "sentiment": sentiment, "score": round(avg_score, 3), "cluster_count": len(matched), } @mcp.tool(description="Describe the server tool surface, how tools fit together, and output conventions for downstream agents.") async def get_capabilities(): return { "server": { "name": "news-mcp", "purpose": "Recent news clusters with entities and thematic keywords, entity/keyword drill-down, sentiment, emerging topics, and related-entity expansion.", "output_conventions": { "cluster_ids": "Do not surface cluster_id in user-facing prose unless explicitly requested; treat it as internal navigation metadata.", "sources": "Always preserve and display sources when summarizing a cluster or entity result.", "timestamps": "Mention timestamps consistently when comparing multiple clusters or when recency matters.", "clusters": "Each cluster includes entities (named entities with optional MID/canonical_label) and keywords (thematic descriptors). Both are searchable; entities are higher-signal, keywords capture subject-matter themes.", }, }, "tools": NEWS_TOOL_CARDS, "recipes": NEWS_COMPOSITION_RECIPES, "example_chains": NEWS_EXAMPLE_CHAINS, "agent_tips": NEWS_AGENT_TIPS, "guidance": [ "Use get_latest_events for a tail, get_events_for_entity for entity/keyword deep dives, and get_related_recent_entities for neighborhood expansion.", "Prefer normalized/canonical entities when possible, but the server will resolve common aliases and MIDs for you.", "When presenting results to users, summarize the cluster; avoid exposing internal IDs unless they are needed for follow-up tool calls.", "For emerging topics, use detect_emerging_topics with timeframe and around parameters. Signal types: entity (named entity, highest quality), keyword (thematic descriptor), phrase (headline bigram). High velocity + high source_count = strong signal.", "get_events_for_entity and get_news_sentiment match both entities and thematic keywords — use keywords when the subject is a theme rather than a named entity.", ], } def _parse_timeframe_to_hours(timeframe: str) -> int: tf = str(timeframe).strip().lower() try: if tf.endswith("d"): days = int(tf[:-1]) return max(1, days * 24) if tf.endswith("h"): return max(1, int(tf[:-1])) return max(1, int(tf)) except Exception: return 24 from contextlib import asynccontextmanager @asynccontextmanager async def _lifespan(app: FastAPI): asyncio.ensure_future(_background_refresh_loop()) yield app = FastAPI(title="News MCP Server", lifespan=_lifespan) logger = logging.getLogger("news_mcp.startup") app.mount("/mcp", mcp.sse_app()) # Shared store — single connection pool _shared_store = SQLiteClusterStore(DB_PATH) _refresh_lock = asyncio.Lock() _refresh_started = False async def _background_refresh_loop(): """Non-blocking background refresher: prune then poll. Protected by an async lock so a second event-loop wake-up cannot start a parallel ingestion cycle. """ global _refresh_started async with _refresh_lock: if _refresh_started: return _refresh_started = True logger.info("news-mcp llm config: %s", active_llm_config()) # Prune off-thread so we do not block the event loop prune_result = await asyncio.to_thread( _shared_store.prune_if_due, NEWS_PRUNING_ENABLED, NEWS_RETENTION_DAYS, NEWS_PRUNE_INTERVAL_HOURS, ) logger.info("startup prune_result=%s", prune_result) if not NEWS_BACKGROUND_REFRESH_ENABLED: return async def _loop(): if not NEWS_BACKGROUND_REFRESH_ON_START: logger.info("background refresh delayed start interval_seconds=%s", NEWS_REFRESH_INTERVAL_SECONDS) await asyncio.sleep(float(NEWS_REFRESH_INTERVAL_SECONDS)) while True: try: logger.info("background refresh tick start") await refresh_clusters(topic=None, limit=200) logger.info("background refresh tick complete") except Exception: logger.exception("background refresh tick failed") await asyncio.sleep(float(NEWS_REFRESH_INTERVAL_SECONDS)) asyncio.create_task(_loop()) @app.get("/") def root(): return { "status": "ok", "transport": "fastmcp+sse", "mount": "/mcp", "tools": [ "get_latest_events", "get_events_for_entity", "get_event_summary", "detect_emerging_topics", "get_news_sentiment", "get_related_recent_entities", "get_capabilities", ], "refresh": { "enabled": NEWS_BACKGROUND_REFRESH_ENABLED, "interval_seconds": NEWS_REFRESH_INTERVAL_SECONDS, }, "retention": { "lookback_hours": DEFAULT_LOOKBACK_HOURS, "retention_days": NEWS_RETENTION_DAYS, }, "pruning": { "enabled": NEWS_PRUNING_ENABLED, "interval_hours": NEWS_PRUNE_INTERVAL_HOURS, }, } # ------------------------------------------------------------------ # Dashboard REST API endpoints # ------------------------------------------------------------------ from fastapi.staticfiles import StaticFiles from fastapi.responses import JSONResponse app.mount("/dashboard", StaticFiles(directory="dashboard", html=True), name="dashboard") import logging as _log API_LOG = _log.getLogger("news_mcp.api") def _api_ok(data: dict) -> dict: return data def _api_err(exc: Exception, ctx: str) -> JSONResponse: API_LOG.exception(f"API error in {ctx}") return JSONResponse(status_code=500, content={"error": str(exc), "ctx": ctx}) @app.get("/api/v1/health") def api_health(): """Extended health + dashboard stats.""" try: store = _shared_store stats = store.get_dashboard_stats() stats["version"] = _VERSION_HASH return stats except Exception as e: return _api_err(e, "health") @app.get("/api/v1/clusters") def api_clusters( topic: str | None = None, hours: int = 24, limit: int = 50, offset: int = 0, ): """Paginated cluster listing.""" try: store = _shared_store result = store.get_clusters_page(topic=topic, hours=hours, limit=limit, offset=offset) return {"clusters": result["clusters"], "total": result["total"], "topic": topic or "all", "hours": hours} except Exception as e: return _api_err(e, f"clusters(topic={topic},hours={hours})") @app.get("/api/v1/sentiment-series") def api_sentiment_series( topic: str | None = None, hours: int = 24, bucket_hours: float = 1.0, ): """Sentiment time-series for Chart.js.""" try: store = _shared_store series = store.get_sentiment_series(topic=topic, hours=hours, bucket_hours=bucket_hours) return {"series": series, "topic": topic or "all"} except Exception as e: return _api_err(e, f"sentiment(topic={topic})") @app.get("/api/v1/entities") def api_entities( hours: int = 24, limit: int = 30, ): """Top entity frequencies.""" try: store = _shared_store entities = store.get_entity_frequencies(hours=hours, limit=limit) return {"entities": entities, "hours": hours} except Exception as e: return _api_err(e, f"entities(hours={hours})") @app.get("/api/v1/keywords") def api_keywords( hours: int = 24, limit: int = 30, ): """Top keyword frequencies (thematic descriptors, excluding terms already counted as entities).""" try: store = _shared_store keywords = store.get_keyword_frequencies(hours=hours, limit=limit) return {"keywords": keywords, "hours": hours} except Exception as e: return _api_err(e, f"keywords(hours={hours})") @app.get("/api/v1/clusters/by-entity") def api_clusters_by_entity( entity: str, hours: int = 168, limit: int = 50, offset: int = 0, ): """Return clusters matching an entity, filtered by event time via SQL junction table.""" try: store = _shared_store return store.get_clusters_by_entity( entity=entity.strip().lower(), hours=hours, limit=limit, offset=offset, ) except Exception as e: return _api_err(e, f"by-entity(entity={entity},hours={hours})") @app.get("/api/v1/clusters/by-keyword") def api_clusters_by_keyword( keyword: str, hours: int = 168, limit: int = 50, offset: int = 0, ): """Return clusters matching a keyword, filtered by event time via SQL junction table.""" try: store = _shared_store return store.get_clusters_by_keyword( keyword=keyword.strip().lower(), hours=hours, limit=limit, offset=offset, ) except Exception as e: return _api_err(e, f"by-keyword(keyword={keyword},hours={hours})") @app.get("/api/v1/cluster/{cluster_id}") def api_cluster_detail(cluster_id: str): """Full cluster detail for drill-down.""" try: store = _shared_store detail = store.get_cluster_detail(cluster_id) if not detail: return JSONResponse(status_code=404, content={"error": "Cluster not found", "id": cluster_id}) return detail except Exception as e: return _api_err(e, f"detail({cluster_id})") # ------------------------------------------------------------------ # Feed management endpoints (toggle on/off from dashboard) # ------------------------------------------------------------------ @app.get("/api/v1/feeds") def api_feeds(): """List all configured feeds with enabled/disabled status.""" try: store = SQLiteClusterStore(DB_PATH) feed_list = store.get_feed_state_list() configured = _configured_feed_urls() return { "feeds": feed_list, "configured_urls": configured, } except Exception as e: return _api_err(e, "feeds") @app.post("/api/v1/feeds/toggle") async def api_feed_toggle(feed_url: str = Form(), enabled: bool = Form()): """Toggle a feed's enabled state.""" try: store = SQLiteClusterStore(DB_PATH) ok = store.set_feed_enabled(feed_url.strip(), enabled) if not ok: return JSONResponse( status_code=404, content={"error": f"Feed not found: {feed_url}"}, ) return {"ok": True, "feed_url": feed_url.strip(), "enabled": enabled} except Exception as e: return _api_err(e, f"toggle({feed_url})") @app.get("/health") def health(): return { "status": "ok", "uptime": round(time.monotonic() - _PROCESS_STARTED_AT, 3), "version": _VERSION_HASH, }