| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258 |
- from __future__ import annotations
- import asyncio
- import hashlib
- import logging
- import math
- import re
- import time
- from collections import Counter
- from datetime import datetime, timezone
- from pathlib import Path
- from fastapi import FastAPI, Form
- from mcp.server.fastmcp import FastMCP
- from mcp.server.transport_security import TransportSecuritySettings
- from news_mcp.config import DEFAULT_LOOKBACK_HOURS, DEFAULT_TOPICS, DB_PATH
- from news_mcp.config import (
- NEWS_FEED_URL,
- NEWS_FEED_URLS,
- NEWS_PRUNE_INTERVAL_HOURS,
- NEWS_PRUNING_ENABLED,
- NEWS_REFRESH_INTERVAL_SECONDS,
- NEWS_BACKGROUND_REFRESH_ENABLED,
- NEWS_BACKGROUND_REFRESH_ON_START,
- NEWS_RETENTION_DAYS,
- )
- from news_mcp.jobs.poller import refresh_clusters
- from news_mcp.storage.sqlite_store import SQLiteClusterStore
- from news_mcp.dashboard.dashboard_store import DashboardStore
- from news_mcp.enrichment.llm_enrich import summarize_cluster_llm
- from news_mcp.trends_resolution import resolve_entity_via_trends
- from news_mcp.llm import active_llm_config
- from news_mcp.entity_normalize import normalize_query
- from news_mcp.related_entities import related_recent_entities
- logging.basicConfig(
- level=logging.INFO,
- format="%(asctime)s %(levelname)s %(name)s: %(message)s",
- )
- _PROCESS_STARTED_AT = time.monotonic()
- _PACKAGE_DIR = Path(__file__).resolve().parent
- def _compute_version_hash() -> str:
- """SHA-256 of all .py files under news_mcp/, sorted by relative path.
- Deterministic across machines and environments — no git dependency.
- Works identically in Docker and native runs.
- """
- hasher = hashlib.sha256()
- for f in sorted(_PACKAGE_DIR.rglob("*.py")):
- try:
- hasher.update(f.read_bytes())
- except OSError:
- continue
- return hasher.hexdigest()[:9]
- _VERSION_HASH = _compute_version_hash()
- mcp = FastMCP(
- "news-mcp",
- transport_security=TransportSecuritySettings(enable_dns_rebinding_protection=False),
- )
- def _cluster_entity_haystack(cluster: dict) -> list[str]:
- """Collect the normalized entity + keyword clues attached to a cluster."""
- values: list[str] = []
- for ent in cluster.get("entities", []) or []:
- values.append(str(ent).strip().lower())
- for res in cluster.get("entityResolutions", []) or []:
- if not isinstance(res, dict):
- continue
- for key in ("normalized", "canonical_label", "mid"):
- val = res.get(key)
- if val:
- values.append(str(val).strip().lower())
- # Keywords are LLM-curated thematic descriptors — include them in the
- # searchable haystack so entity/theme queries match on subject-matter
- # signals, not just named entities.
- for kw in cluster.get("keywords", []) or []:
- values.append(str(kw).strip().lower())
- return [v for v in values if v]
- def _parse_cluster_timestamp(value) -> datetime:
- """Parse a stored cluster timestamp.
- payload.timestamp is guaranteed ISO 8601 UTC (YYYY-MM-DDTHH:MM:SS+00:00)
- at write time. Only datetime.fromisoformat is needed — no RFC 2822 fallback.
- """
- if not value:
- return datetime.min.replace(tzinfo=timezone.utc)
- text = str(value).strip()
- if not text:
- return datetime.min.replace(tzinfo=timezone.utc)
- try:
- dt = datetime.fromisoformat(text)
- if dt.tzinfo is None:
- dt = dt.replace(tzinfo=timezone.utc)
- return dt.astimezone(timezone.utc)
- except Exception:
- return datetime.min.replace(tzinfo=timezone.utc)
- def _sort_clusters_by_recency(clusters: list[dict]) -> list[dict]:
- return sorted(
- clusters,
- key=lambda c: (
- _parse_cluster_timestamp(c.get("timestamp")),
- float(c.get("importance", 0.0) or 0.0),
- ),
- reverse=True,
- )
- def _tool_card(name: str, description: str, inputs: list[dict], outputs: list[str], notes: list[str] | None = None) -> dict:
- return {
- "name": name,
- "description": description,
- "inputs": inputs,
- "outputs": outputs,
- "notes": notes or [],
- }
- NEWS_TOOL_CARDS = [
- _tool_card(
- "get_feeds",
- "List all configured RSS feeds with their enabled/disabled status.",
- [],
- ["feeds[]: {feed_key, enabled, last_hash, last_item_count, updated_at}"],
- ["Use this to see which feeds are currently active or disabled."],
- ),
- _tool_card(
- "toggle_feed",
- "Enable or disable a specific RSS feed by URL.",
- [
- {"name": "feed_url", "type": "string", "meaning": "the feed URL to toggle"},
- {"name": "enabled", "type": "boolean", "meaning": "true to enable, false to disable"},
- ],
- ["ok", "feed_key", "enabled"],
- ["Changes take effect on the next refresh cycle."],
- ),
- _tool_card(
- "get_latest_events",
- "Get the newest deduplicated clusters for a topic or resolved entity-like query.",
- [
- {"name": "topic", "type": "string", "default": "all topics", "meaning": "coarse category (crypto, macro, regulation, ai, other), entity-like topic, or omit for all topics"},
- {"name": "limit", "type": "integer", "default": 5, "range": "1-20"},
- {"name": "include_articles", "type": "boolean", "default": False},
- ],
- ["headline", "summary", "entities", "keywords", "sentiment", "importance", "sources", "timestamp", "articles?"],
- ["Use when you want the freshest clusters. Each cluster includes both named entities and LLM-curated thematic keywords describing what the story is about."],
- ),
- _tool_card(
- "get_events_for_entity",
- "Search recent clusters for a person, place, company, theme, or keyword by matching entities and thematic keywords.",
- [
- {"name": "entity", "type": "string", "meaning": "entity label, phrase, or keyword to search for"},
- {"name": "timeframe", "type": "string", "default": "24h", "examples": ["24h", "72h", "3d"]},
- {"name": "limit", "type": "integer", "default": 10, "range": "1-30"},
- {"name": "include_articles", "type": "boolean", "default": False},
- ],
- ["headline", "summary", "entities", "keywords", "sentiment", "importance", "sources", "timestamp", "articles?"],
- ["Matches against both named entities and thematic keywords. Use this for an entity-centered or theme-centered deep dive."],
- ),
- _tool_card(
- "get_event_summary",
- "Produce a concise LLM-written explanation for one cluster and key facts.",
- [
- {"name": "event_id", "type": "string", "meaning": "cluster_id; do not surface in user-facing prose"},
- {"name": "include_articles", "type": "boolean", "default": False},
- ],
- ["headline", "mergedSummary", "keyFacts", "sources", "articles?"],
- ["Prefer this after you have already chosen a specific cluster to explain."],
- ),
- _tool_card(
- "detect_emerging_topics",
- "Surface emerging entities, thematic keywords, and phrases that are accelerating in the recent window.",
- [
- {"name": "limit", "type": "integer", "default": 10, "range": "1-20"},
- {"name": "timeframe", "type": "string", "default": "24h", "examples": ["4h", "24h", "3d"]},
- {"name": "topic", "type": "string", "default": "all topics", "examples": ["crypto", "macro", "regulation", "ai", "other"]},
- {"name": "around", "type": "string", "default": "none", "meaning": "entity to scope results to its neighborhood (e.g. \"Bitcoin\")"},
- ],
- ["topic", "trend_score", "velocity", "recent_count", "prior_count", "source_count", "related_entities", "signal_type"],
- ["Use timeframe to control lookback, topic to scope to a category, around to find what's emerging near a specific entity. Signal types: entity (named entity), keyword (thematic descriptor), phrase (headline bigram). Check velocity and source_count to distinguish real spikes from noise."],
- ),
- _tool_card(
- "get_news_sentiment",
- "Estimate sentiment around an entity or keyword over a lookback window.",
- [
- {"name": "entity", "type": "string", "meaning": "entity label, phrase, or keyword to analyze"},
- {"name": "timeframe", "type": "string", "default": "24h"},
- ],
- ["entity", "sentiment", "score", "cluster_count"],
- ["Matches clusters by entities and keywords. Use after locating a cluster set or entity neighborhood."],
- ),
- _tool_card(
- "get_related_recent_entities",
- "Find entities and thematic keywords commonly co-occurring with a subject in recent clusters, optionally blended with Google Trends suggestions.",
- [
- {"name": "subject", "type": "string", "meaning": "canonical entity or subject phrase"},
- {"name": "timeframe", "type": "string", "default": "72h"},
- {"name": "limit", "type": "integer", "default": 10, "range": "1-25"},
- {"name": "include_trends", "type": "boolean", "default": True},
- ],
- ["subject", "related[].normalized", "related[].canonical_label", "related[].mid", "related[].sources", "related[].scores"],
- ["Use this to drill from a subject into related entities and themes, then feed results into get_events_for_entity."],
- ),
- ]
- NEWS_COMPOSITION_RECIPES = [
- {
- "name": "fresh-news-tail",
- "steps": [
- "get_latest_events(topic=...)",
- "optionally get_event_summary(event_id=...) for the strongest cluster",
- ],
- "notes": ["Best for a quick tail of what is happening now. Omit topic for all topics, or pass crypto/macro/regulation/ai/other to filter."]
- },
- {
- "name": "entity-deep-dive",
- "steps": [
- "get_events_for_entity(entity=...)",
- "get_event_summary(event_id=...)",
- "get_news_sentiment(entity=..., timeframe=...)",
- ],
- "notes": ["Prefer canonical entity labels when you have them; the server normalizes for you."],
- },
- {
- "name": "subject-neighborhood",
- "steps": [
- "get_related_recent_entities(subject=...)",
- "for each strong related entity, call get_events_for_entity(entity=...)",
- ],
- "notes": ["Use this when you want a graph-like expansion around a subject."]
- },
- {
- "name": "emerging-signal",
- "steps": [
- "detect_emerging_topics(limit=..., timeframe=..., topic=..., around=...)",
- "choose a topic/entity from the results",
- "get_events_for_entity(entity=...)",
- "get_news_sentiment(entity=...)",
- ],
- "notes": ["Use timeframe to control lookback (e.g. \"4h\" for what's hot right now, \"3d\" for weekly trends), topic to scope to a category, around to find what's emerging near a specific entity. Check velocity and source_count to distinguish real spikes from noise."],
- },
- ]
- NEWS_AGENT_TIPS = [
- "If you need a fast answer, start with get_latest_events, then summarize the strongest cluster with get_event_summary.",
- "If a user asks about a person/place/company/theme, use get_events_for_entity before broadening to get_related_recent_entities.",
- "Treat cluster_id as an internal cursor, not user-facing output; use it only for follow-up tool calls.",
- "When describing clusters, keep sources and timestamps visible so the user can assess recency and provenance.",
- "Prefer a short chain of tools over many parallel calls unless you are building a neighborhood map or comparison table.",
- "For tricky names, rely on the server's resolver instead of inventing alias rules in the client.",
- "Use detect_emerging_topics with timeframe=\"4h\" for what's hot right now, timeframe=\"3d\" for weekly trends. Use topic= to scope to a category, around= to find what's emerging near a specific entity. Check velocity to distinguish accelerating signals from steady-state ones. Filter by signal_type to focus on entities, keywords, or phrases.",
- "Each cluster contains both entities (named entities with identity resolution) and keywords (thematic descriptors). Use keywords to understand what a story is about beyond the named entities.",
- ]
- NEWS_EXAMPLE_CHAINS = [
- {
- "task": "What is happening now?",
- "chain": [
- "get_latest_events(topic=...)",
- "get_event_summary(event_id=...) if one cluster looks important",
- ],
- },
- {
- "task": "Deep dive on an entity",
- "chain": [
- "get_events_for_entity(entity=..., timeframe=...)",
- "get_news_sentiment(entity=..., timeframe=...)",
- "get_event_summary(event_id=...) for the strongest cluster",
- ],
- },
- {
- "task": "Broaden from a subject",
- "chain": [
- "get_related_recent_entities(subject=..., include_trends=true)",
- "get_events_for_entity(entity=...) for the strongest related entities",
- ],
- },
- {
- "task": "Find what is emerging",
- "chain": [
- "detect_emerging_topics(limit=..., timeframe=..., topic=..., around=...) with optional scoping",
- "get_events_for_entity(entity=...) on one or two emerging terms",
- ],
- },
- {
- "task": "What's heating up around a specific entity",
- "chain": [
- "detect_emerging_topics(around=\"<entity>\", timeframe=\"4h\")",
- "get_events_for_entity(entity=...) on the top emerging neighbor",
- ],
- },
- ]
- def _configured_feed_urls() -> list[str]:
- """Return the configured feed URLs from environment variables."""
- urls = [u.strip() for u in NEWS_FEED_URLS.split(",") if u.strip()]
- if not urls:
- urls = [NEWS_FEED_URL]
- return urls
- @mcp.tool(description="List all configured RSS feeds with their current enabled/disabled status.")
- async def get_feeds() -> list[dict]:
- """Return each feed URL with its enabled flag, last fetch stats, and timestamps."""
- store = SQLiteClusterStore(DB_PATH)
- return store.get_feed_state_list()
- @mcp.tool(description="Enable or disable a specific RSS feed by URL.")
- async def toggle_feed(feed_url: str, enabled: bool) -> dict:
- """Toggle a feed's active/inactive state.
- Changes take effect on the next background refresh cycle.
- Returns the updated feed state.
- """
- store = SQLiteClusterStore(DB_PATH)
- store.set_feed_enabled(feed_url.strip(), enabled)
- updated = store.get_feed_state(feed_url.strip())
- return {"ok": True, "feed_key": feed_url.strip(), "enabled": enabled, "details": updated}
- @mcp.tool(description="Investigate a topic and return the newest deduplicated news clusters with entities and thematic keywords, sorted by recency.")
- async def get_latest_events(topic: str | None = None, limit: int = 5, include_articles: bool = False):
- limit = max(1, min(int(limit), 20))
- # When topic is omitted, search across all topics (no topic filter).
- # When topic is provided and matches a known topic, filter by that topic.
- # Otherwise treat the value as an entity-like query.
- topic_norm = normalize_query(topic).lower() if topic else ""
- resolved = resolve_entity_via_trends(topic_norm) if topic_norm else {}
- allowed = {t.lower() for t in DEFAULT_TOPICS}
- is_topic = topic_norm in allowed
- is_all_topics = not topic_norm
- query_terms = {
- topic_norm,
- str(resolved.get("normalized") or "").strip().lower(),
- str(resolved.get("canonical_label") or "").strip().lower(),
- str(resolved.get("mid") or "").strip().lower(),
- }
- query_terms = {q for q in query_terms if q}
- store = SQLiteClusterStore(DB_PATH)
- if is_all_topics:
- # No topic specified: return freshest clusters across all topics.
- clusters = store.get_latest_clusters_all_topics(ttl_hours=DEFAULT_LOOKBACK_HOURS, limit=limit)
- elif is_topic:
- # Cache-first: only refresh if we currently have no fresh clusters for this topic.
- clusters = store.get_latest_clusters(topic=topic_norm, ttl_hours=DEFAULT_LOOKBACK_HOURS, limit=limit)
- if not clusters:
- await refresh_clusters(topic=topic_norm, limit=200)
- clusters = store.get_latest_clusters(topic=topic_norm, ttl_hours=DEFAULT_LOOKBACK_HOURS, limit=limit)
- else:
- # Entity-aware mode: search recent clusters across all topics and match by
- # raw entity, canonical label, or MID.
- clusters = store.get_latest_clusters_all_topics(ttl_hours=DEFAULT_LOOKBACK_HOURS, limit=limit * 8)
- filtered = []
- for c in clusters:
- haystack = _cluster_entity_haystack(c)
- if any(any(term in item for item in haystack) for term in query_terms):
- filtered.append(c)
- if len(filtered) >= limit:
- break
- clusters = filtered
- out = []
- for c in _sort_clusters_by_recency(clusters):
- item = {
- "cluster_id": c.get("cluster_id"),
- "headline": c.get("headline"),
- "summary": c.get("summary"),
- "entities": c.get("entities", []),
- "keywords": c.get("keywords", []),
- "sentiment": c.get("sentiment", "neutral"),
- "importance": c.get("importance", 0.0),
- "sources": c.get("sources", []),
- "timestamp": c.get("timestamp"),
- }
- if include_articles:
- # Return minimal article fields to keep responses compact.
- arts = c.get("articles", []) or []
- item["articles"] = [
- {
- "title": a.get("title"),
- "url": a.get("url"),
- "source": a.get("source"),
- "timestamp": a.get("timestamp"),
- }
- for a in arts
- if isinstance(a, dict)
- ]
- out.append(item)
- return out
- @mcp.tool(description="Investigate a person, company, place, theme, or keyword by matching entities and thematic keywords within a time window.")
- async def get_events_for_entity(entity: str, limit: int = 10, timeframe: str = "24h", include_articles: bool = False):
- limit = max(1, min(int(limit), 30))
- query = normalize_query(entity).strip().lower()
- if not query:
- return []
- resolved = resolve_entity_via_trends(query)
- query_terms = {
- query,
- str(resolved.get("normalized") or "").strip().lower(),
- str(resolved.get("canonical_label") or "").strip().lower(),
- str(resolved.get("mid") or "").strip().lower(),
- }
- query_terms = {q for q in query_terms if q}
- store = SQLiteClusterStore(DB_PATH)
- def _match_clusters(clusters: list[dict]) -> list[dict]:
- hits: list[dict] = []
- for c in _sort_clusters_by_recency(clusters):
- haystack = _cluster_entity_haystack(c)
- if any(any(term in item for item in haystack) for term in query_terms):
- hits.append(c)
- if len(hits) >= limit:
- break
- return hits
- hours = _parse_timeframe_to_hours(timeframe)
- clusters = store.get_latest_clusters_all_topics(ttl_hours=hours, limit=max(200, limit * 10))
- hits = _match_clusters(clusters)
- out = []
- for c in hits:
- item = {
- "cluster_id": c.get("cluster_id"),
- "headline": c.get("headline"),
- "summary": c.get("summary"),
- "entities": c.get("entities", []),
- "keywords": c.get("keywords", []),
- "sentiment": c.get("sentiment", "neutral"),
- "importance": c.get("importance", 0.0),
- "sources": c.get("sources", []),
- "timestamp": c.get("timestamp"),
- }
- if include_articles:
- arts = c.get("articles", []) or []
- item["articles"] = [
- {
- "title": a.get("title"),
- "url": a.get("url"),
- "source": a.get("source"),
- "timestamp": a.get("timestamp"),
- }
- for a in arts
- if isinstance(a, dict)
- ]
- out.append(item)
- return out
- @mcp.tool(description="Return entities and thematic keywords commonly co-occurring with the subject in recent clusters, optionally blended with Google Trends suggestions.")
- async def get_related_recent_entities(subject: str, timeframe: str = "72h", limit: int = 10, include_trends: bool = True):
- limit = max(1, min(int(limit), 25))
- hours = _parse_timeframe_to_hours(timeframe)
- include_trends_bool = str(include_trends).strip().lower() not in {"false", "0", "no"}
- store = SQLiteClusterStore(DB_PATH)
- result = related_recent_entities(
- store=store,
- subject=subject,
- timeframe_hours=hours,
- limit=limit,
- include_trends=include_trends_bool,
- )
- return result
- @mcp.tool(description="Investigate one cluster in depth and return a concise LLM-written explanation plus key facts.")
- async def get_event_summary(event_id: str, include_articles: bool = False):
- store = SQLiteClusterStore(DB_PATH)
- # Summary cache: reuse if present within TTL.
- cached_summary = store.get_cluster_summary(
- cluster_id=event_id,
- ttl_hours=DEFAULT_LOOKBACK_HOURS,
- )
- if cached_summary:
- out = {
- "event_id": event_id,
- "headline": cached_summary.get("headline"),
- "mergedSummary": cached_summary.get("mergedSummary"),
- "keyFacts": cached_summary.get("keyFacts", []),
- "sources": cached_summary.get("sources", []),
- }
- if include_articles:
- cluster = store.get_cluster_by_id(event_id)
- arts = (cluster or {}).get("articles", []) or []
- out["articles"] = [
- {
- "title": a.get("title"),
- "url": a.get("url"),
- "source": a.get("source"),
- "timestamp": a.get("timestamp"),
- }
- for a in arts
- if isinstance(a, dict)
- ]
- return out
- cluster = store.get_cluster_by_id(event_id)
- if not cluster:
- return {
- "event_id": event_id,
- "error": "NOT_FOUND",
- }
- articles_out = None
- if include_articles:
- arts = cluster.get("articles", []) or []
- articles_out = [
- {
- "title": a.get("title"),
- "url": a.get("url"),
- "source": a.get("source"),
- "timestamp": a.get("timestamp"),
- }
- for a in arts
- if isinstance(a, dict)
- ]
- summary = await summarize_cluster_llm(cluster)
- store.upsert_cluster_summary(event_id, summary)
- out = {
- "event_id": event_id,
- "headline": summary.get("headline"),
- "mergedSummary": summary.get("mergedSummary"),
- "keyFacts": summary.get("keyFacts", []),
- "sources": summary.get("sources", []),
- }
- if include_articles:
- out["articles"] = articles_out or []
- return out
- @mcp.tool(description="Explore what is starting to matter: surface emerging entities, thematic keywords, and phrases from recent clusters. "
- "Use timeframe to control the lookback window, topic to scope to a category, and around to find what's emerging near a specific entity. "
- "Results include signal_type (entity / keyword / phrase) for downstream filtering.")
- async def detect_emerging_topics(limit: int = 10, timeframe: str = "24h", topic: str | None = None, around: str | None = None):
- """Surface entities and phrases that are accelerating in recent clusters.
- Args:
- limit: max results to return (1-20, default 10).
- timeframe: lookback window like "4h", "24h", "3d" (default "24h").
- topic: optional coarse topic filter ("crypto", "macro", "regulation", "ai", "other").
- around: optional entity — only return entities that co-occur with this entity
- in the recent window (e.g. "Bitcoin" to find what's emerging in Bitcoin's neighborhood).
- """
- limit = max(1, min(int(limit), 20))
- hours = _parse_timeframe_to_hours(timeframe)
- half_hours = hours / 2.0
- store = SQLiteClusterStore(DB_PATH)
- # Fetch more clusters than needed so velocity stats are meaningful even for short windows.
- clusters = store.get_latest_clusters_all_topics(ttl_hours=hours, limit=500)
- # --- optional topic filter ---
- if topic:
- topic_norm = normalize_query(topic).strip().lower()
- if topic_norm:
- clusters = [c for c in clusters if (c.get("topic") or "other").strip().lower() == topic_norm]
- # --- resolve the 'around' entity ---
- around_terms: set[str] = set()
- if around:
- around_norm = normalize_query(around).strip().lower()
- if around_norm:
- resolved = resolve_entity_via_trends(around_norm)
- around_terms = {
- around_norm,
- str(resolved.get("normalized") or "").strip().lower(),
- str(resolved.get("canonical_label") or "").strip().lower(),
- }
- around_terms.discard("")
- # split clusters into first-half vs second-half by timestamp
- # clusters are already sorted most-recent-first from the store
- now = datetime.now(timezone.utc)
- def _cluster_age_hours(c: dict) -> float:
- """Return the cluster's age in hours. payload.timestamp is ISO 8601 UTC guaranteed."""
- ts = c.get("timestamp") or c.get("last_updated")
- if not ts:
- return 0.0
- try:
- dt = datetime.fromisoformat(str(ts).strip())
- if dt.tzinfo is None:
- dt = dt.replace(tzinfo=timezone.utc)
- return max(0.0, (now - dt.astimezone(timezone.utc)).total_seconds() / 3600.0)
- except Exception:
- return 0.0
- # Generic entity filter
- _generic_tokens = {"news", "latest", "breaking", "update", "updates", "report", "reports"}
- def _is_generic_entity(ent: str) -> bool:
- e = str(ent).strip().lower()
- if not e or len(e) < 4:
- return True
- if e in _generic_tokens:
- return True
- return False
- # --- accumulate signals ---
- # recent = second half of timeframe (newer), prior = first half (older)
- entity_counts_recent = Counter()
- entity_counts_prior = Counter()
- entity_importance_recent = Counter()
- entity_sources: dict[str, set] = {} # ent -> set of source names
- entity_buckets: dict[str, set] = {} # ent -> set of time-bucket indices (for sustained-spike detection)
- entity_cooccur: dict[str, Counter] = {}
- phrase_counts_recent = Counter()
- # Keyword accumulators — same scoring pipeline as entities, but tracking
- # LLM-curated thematic descriptors instead of named entities.
- kw_counts_recent = Counter()
- kw_counts_prior = Counter()
- kw_importance_recent = Counter()
- kw_sources: dict[str, set] = {}
- kw_buckets: dict[str, set] = {}
- bucket_size_hours = max(1.0, hours / 6.0) # split window into ~6 buckets
- for c in clusters:
- ents_in_cluster = [e for e in (c.get("entities", []) or []) if not _is_generic_entity(e)]
- ents_norm = [str(e).strip().lower() for e in ents_in_cluster if str(e).strip()]
- # Keywords: deduplicate per cluster so a cluster with the same keyword
- # listed twice doesn't inflate counts.
- kws_in_cluster = list(dict.fromkeys(
- str(k).strip().lower()
- for k in (c.get("keywords", []) or [])
- if str(k).strip() and not _is_generic_entity(k)
- ))
- age_h = _cluster_age_hours(c)
- is_recent = age_h <= half_hours
- bucket_idx = int(age_h / bucket_size_hours)
- # --- around filter: only count clusters that mention the target entity ---
- if around_terms:
- haystack = set(ents_norm)
- for res in c.get("entityResolutions", []) or []:
- if isinstance(res, dict):
- for key in ("normalized", "canonical_label"):
- val = res.get(key)
- if val:
- haystack.add(str(val).strip().lower())
- if not (haystack & around_terms):
- continue
- counts = entity_counts_recent if is_recent else entity_counts_prior
- imp_acc = entity_importance_recent if is_recent else None # only importance from recent window
- for ent in ents_norm:
- if _is_generic_entity(ent):
- continue
- counts[ent] += 1
- if ent not in entity_sources:
- entity_sources[ent] = set()
- src = c.get("source") or c.get("headline", "").split(" - ")[-1] if c.get("headline") else ""
- if src:
- entity_sources[ent].add(str(src))
- if ent not in entity_buckets:
- entity_buckets[ent] = set()
- entity_buckets[ent].add(bucket_idx)
- if imp_acc is not None:
- try:
- imp_acc[ent] += float(c.get("importance", 0.0) or 0.0)
- except Exception:
- pass
- # --- keyword counting (same recent/prior split as entities) ---
- kw_counts = kw_counts_recent if is_recent else kw_counts_prior
- kw_imp_acc = kw_importance_recent if is_recent else None
- for kw in kws_in_cluster:
- kw_counts[kw] += 1
- if kw not in kw_sources:
- kw_sources[kw] = set()
- src = c.get("source") or c.get("headline", "").split(" - ")[-1] if c.get("headline") else ""
- if src:
- kw_sources[kw].add(str(src))
- if kw not in kw_buckets:
- kw_buckets[kw] = set()
- kw_buckets[kw].add(bucket_idx)
- if kw_imp_acc is not None:
- try:
- kw_imp_acc[kw] += float(c.get("importance", 0.0) or 0.0) # type: ignore[assignment]
- except Exception:
- pass
- # co-occurrence (only for clusters matching the around filter, if any)
- for i in range(len(ents_norm)):
- a = ents_norm[i]
- if _is_generic_entity(a):
- continue
- if a not in entity_cooccur:
- entity_cooccur[a] = Counter()
- for j in range(len(ents_norm)):
- if i == j:
- continue
- b = ents_norm[j]
- if _is_generic_entity(b):
- continue
- entity_cooccur[a][b] += 1
- # bigram phrases (recent only)
- if is_recent:
- text = f"{c.get('headline', '')} {c.get('summary', '')}"
- words = re.findall(r"[A-Za-z][A-Za-z0-9\-]{2,}", text.lower())
- for i in range(len(words) - 1):
- phrase = f"{words[i]} {words[i+1]}"
- if len(phrase) > 6:
- phrase_counts_recent[phrase] += 1
- # --- score entities ---
- all_entities = set(entity_counts_recent.keys()) | set(entity_counts_prior.keys())
- scored = []
- for ent in all_entities:
- recent_n = entity_counts_recent.get(ent, 0)
- prior_n = entity_counts_prior.get(ent, 0)
- total_n = recent_n + prior_n
- if total_n < 1:
- continue
- # velocity: ratio of recent vs prior (smoothed to avoid division noise)
- # 0 prior → velocity = recent_n (pure emergence)
- # equal → velocity = 1.0 (steady)
- velocity = (recent_n + 0.5) / (prior_n + 0.5)
- # recency weight: what fraction of total hits are in the recent window
- recency_ratio = recent_n / total_n
- # source diversity: how many distinct outlets
- n_sources = len(entity_sources.get(ent, set()))
- # sustained: how many distinct time buckets did it appear in (max ~6)
- n_buckets = len(entity_buckets.get(ent, set()))
- # average importance (recent window only)
- avg_imp = (entity_importance_recent.get(ent, 0.0) / max(1, recent_n)) if recent_n > 0 else 0.0
- composed_score = (
- 0.35 * min(1.0, math.log1p(velocity) / math.log1p(4.0)) + # velocity (0..1, 4x = max)
- 0.25 * recency_ratio + # recency concentration
- 0.15 * min(1.0, n_sources / 5.0) + # source diversity
- 0.10 * min(1.0, n_buckets / 4.0) + # sustained (>1 bucket)
- 0.15 * min(1.0, avg_imp) # importance
- )
- related = []
- if ent in entity_cooccur:
- for other, _cnt in entity_cooccur[ent].most_common(5):
- if other != ent:
- related.append(other)
- scored.append({
- "topic": ent,
- "trend_score": min(0.99, round(composed_score, 3)),
- "related_entities": related[:3] if related else [ent],
- "velocity": round(velocity, 2),
- "recent_count": recent_n,
- "prior_count": prior_n,
- "source_count": n_sources,
- "avg_importance": round(avg_imp, 3),
- "signal_type": "entity",
- })
- # --- score keywords (same velocity/recency/source/sustained/importance formula) ---
- all_keywords = set(kw_counts_recent.keys()) | set(kw_counts_prior.keys())
- kw_scored = []
- for kw in all_keywords:
- # Skip keywords that are already scored as entities — entity signal is
- # higher quality (proper nouns, resolved identities).
- if kw in all_entities:
- continue
- recent_n = kw_counts_recent.get(kw, 0)
- prior_n = kw_counts_prior.get(kw, 0)
- total_n = recent_n + prior_n
- if total_n < 1:
- continue
- velocity = (recent_n + 0.5) / (prior_n + 0.5)
- recency_ratio = recent_n / total_n
- n_sources = len(kw_sources.get(kw, set()))
- n_buckets = len(kw_buckets.get(kw, set()))
- avg_imp = (kw_importance_recent.get(kw, 0.0) / max(1, recent_n)) if recent_n > 0 else 0.0
- composed_score = (
- 0.35 * min(1.0, math.log1p(velocity) / math.log1p(4.0)) +
- 0.25 * recency_ratio +
- 0.15 * min(1.0, n_sources / 5.0) +
- 0.10 * min(1.0, n_buckets / 4.0) +
- 0.15 * min(1.0, avg_imp)
- )
- kw_scored.append({
- "topic": kw,
- "trend_score": min(0.99, round(composed_score, 3)),
- "related_entities": [],
- "velocity": round(velocity, 2),
- "recent_count": recent_n,
- "prior_count": prior_n,
- "source_count": n_sources,
- "avg_importance": round(avg_imp, 3),
- "signal_type": "keyword",
- })
- # sort keywords by score descending
- kw_scored.sort(key=lambda x: (-x["trend_score"], -x["velocity"], x["topic"]))
- # sort by composed score descending
- scored.sort(key=lambda x: (-x["trend_score"], -x["velocity"], x["topic"]))
- # --- merge: entities first, then keywords, then phrases ---
- emerging = list(scored) # start with entities
- seen_topics = {item["topic"] for item in emerging}
- for kw_item in kw_scored:
- if kw_item["topic"] not in seen_topics:
- emerging.append(kw_item)
- seen_topics.add(kw_item["topic"])
- # --- add phrase signals (only from recent window) ---
- for phrase, count in phrase_counts_recent.most_common(limit * 2):
- if phrase in seen_topics:
- continue
- emerging.append({
- "topic": phrase.title(),
- "trend_score": min(0.99, round(0.30 + 0.15 * min(count, 5), 2)),
- "related_entities": [],
- "velocity": None,
- "recent_count": count,
- "prior_count": 0,
- "source_count": 0,
- "avg_importance": 0.0,
- "signal_type": "phrase",
- })
- seen_topics.add(phrase)
- if len(emerging) >= limit:
- break
- return emerging[:limit]
- @mcp.tool(description="Investigate whether sentiment around an entity or keyword is positive, negative, or neutral over a chosen lookback window. "
- "Matches clusters by both named entities and thematic keywords.")
- async def get_news_sentiment(entity: str, timeframe: str = "24h"):
- store = SQLiteClusterStore(DB_PATH)
- ent = normalize_query(entity).strip().lower()
- resolved = resolve_entity_via_trends(ent)
- query_terms = {
- ent,
- str(resolved.get("normalized") or "").strip().lower(),
- str(resolved.get("canonical_label") or "").strip().lower(),
- str(resolved.get("mid") or "").strip().lower(),
- }
- query_terms = {q for q in query_terms if q}
- if not ent:
- return {
- "entity": entity,
- "sentiment": "neutral",
- "score": 0.0,
- "cluster_count": 0,
- }
- # timeframe: accept '24h' or '24'
- tf = str(timeframe).strip().lower()
- try:
- hours = int(tf[:-1]) if tf.endswith("h") else int(tf)
- except Exception:
- hours = 24
- hours = max(1, min(int(hours), 168))
- clusters = store.get_latest_clusters_all_topics(ttl_hours=hours, limit=500)
- matched = []
- for c in clusters:
- haystack = _cluster_entity_haystack(c)
- if any(any(term in item for item in haystack) for term in query_terms):
- matched.append(c)
- if not matched:
- return {
- "entity": entity,
- "sentiment": "neutral",
- "score": 0.0,
- "cluster_count": 0,
- }
- scores = []
- for c in matched:
- s = c.get("sentimentScore")
- if s is not None:
- try:
- scores.append(float(s))
- except Exception:
- pass
- avg_score = sum(scores) / len(scores) if scores else 0.0
- # Keep the label aligned with the numeric score.
- # Small magnitudes are treated as neutral to avoid noisy label flips.
- if avg_score >= 0.15:
- sentiment = "positive"
- elif avg_score <= -0.15:
- sentiment = "negative"
- else:
- sentiment = "neutral"
- return {
- "entity": entity,
- "sentiment": sentiment,
- "score": round(avg_score, 3),
- "cluster_count": len(matched),
- }
- @mcp.tool(description="Describe the server tool surface, how tools fit together, and output conventions for downstream agents.")
- async def get_capabilities():
- return {
- "server": {
- "name": "news-mcp",
- "purpose": "Recent news clusters with entities and thematic keywords, entity/keyword drill-down, sentiment, emerging topics, and related-entity expansion.",
- "output_conventions": {
- "cluster_ids": "Do not surface cluster_id in user-facing prose unless explicitly requested; treat it as internal navigation metadata.",
- "sources": "Always preserve and display sources when summarizing a cluster or entity result.",
- "timestamps": "Mention timestamps consistently when comparing multiple clusters or when recency matters.",
- "clusters": "Each cluster includes entities (named entities with optional MID/canonical_label) and keywords (thematic descriptors). Both are searchable; entities are higher-signal, keywords capture subject-matter themes.",
- },
- },
- "tools": NEWS_TOOL_CARDS,
- "recipes": NEWS_COMPOSITION_RECIPES,
- "example_chains": NEWS_EXAMPLE_CHAINS,
- "agent_tips": NEWS_AGENT_TIPS,
- "guidance": [
- "Use get_latest_events for a tail, get_events_for_entity for entity/keyword deep dives, and get_related_recent_entities for neighborhood expansion.",
- "Prefer normalized/canonical entities when possible, but the server will resolve common aliases and MIDs for you.",
- "When presenting results to users, summarize the cluster; avoid exposing internal IDs unless they are needed for follow-up tool calls.",
- "For emerging topics, use detect_emerging_topics with timeframe and around parameters. Signal types: entity (named entity, highest quality), keyword (thematic descriptor), phrase (headline bigram). High velocity + high source_count = strong signal.",
- "get_events_for_entity and get_news_sentiment match both entities and thematic keywords — use keywords when the subject is a theme rather than a named entity.",
- ],
- }
- def _parse_timeframe_to_hours(timeframe: str) -> int:
- tf = str(timeframe).strip().lower()
- try:
- if tf.endswith("d"):
- days = int(tf[:-1])
- return max(1, days * 24)
- if tf.endswith("h"):
- return max(1, int(tf[:-1]))
- return max(1, int(tf))
- except Exception:
- return 24
- from contextlib import asynccontextmanager
- @asynccontextmanager
- async def _lifespan(app: FastAPI):
- asyncio.ensure_future(_background_refresh_loop())
- yield
- app = FastAPI(title="News MCP Server", lifespan=_lifespan)
- logger = logging.getLogger("news_mcp.startup")
- app.mount("/mcp", mcp.sse_app())
- # Shared store — single connection pool
- _shared_store = SQLiteClusterStore(DB_PATH)
- _refresh_lock = asyncio.Lock()
- _refresh_started = False
- async def _background_refresh_loop():
- """Non-blocking background refresher: prune then poll.
- Protected by an async lock so a second event-loop wake-up cannot
- start a parallel ingestion cycle.
- """
- global _refresh_started
- async with _refresh_lock:
- if _refresh_started:
- return
- _refresh_started = True
- logger.info("news-mcp llm config: %s", active_llm_config())
- # Prune off-thread so we do not block the event loop
- prune_result = await asyncio.to_thread(
- _shared_store.prune_if_due,
- NEWS_PRUNING_ENABLED,
- NEWS_RETENTION_DAYS,
- NEWS_PRUNE_INTERVAL_HOURS,
- )
- logger.info("startup prune_result=%s", prune_result)
- if not NEWS_BACKGROUND_REFRESH_ENABLED:
- return
- async def _loop():
- if not NEWS_BACKGROUND_REFRESH_ON_START:
- logger.info("background refresh delayed start interval_seconds=%s", NEWS_REFRESH_INTERVAL_SECONDS)
- await asyncio.sleep(float(NEWS_REFRESH_INTERVAL_SECONDS))
- while True:
- try:
- logger.info("background refresh tick start")
- await refresh_clusters(topic=None, limit=200)
- logger.info("background refresh tick complete")
- except Exception:
- logger.exception("background refresh tick failed")
- await asyncio.sleep(float(NEWS_REFRESH_INTERVAL_SECONDS))
- asyncio.create_task(_loop())
- @app.get("/")
- def root():
- return {
- "status": "ok",
- "transport": "fastmcp+sse",
- "mount": "/mcp",
- "tools": [
- "get_latest_events",
- "get_events_for_entity",
- "get_event_summary",
- "detect_emerging_topics",
- "get_news_sentiment",
- "get_related_recent_entities",
- "get_capabilities",
- ],
- "refresh": {
- "enabled": NEWS_BACKGROUND_REFRESH_ENABLED,
- "interval_seconds": NEWS_REFRESH_INTERVAL_SECONDS,
- },
- "retention": {
- "lookback_hours": DEFAULT_LOOKBACK_HOURS,
- "retention_days": NEWS_RETENTION_DAYS,
- },
- "pruning": {
- "enabled": NEWS_PRUNING_ENABLED,
- "interval_hours": NEWS_PRUNE_INTERVAL_HOURS,
- },
- }
- # ------------------------------------------------------------------
- # Dashboard REST API endpoints
- # ------------------------------------------------------------------
- from fastapi.staticfiles import StaticFiles
- from fastapi.responses import JSONResponse
- app.mount("/dashboard", StaticFiles(directory="dashboard", html=True), name="dashboard")
- import logging as _log
- API_LOG = _log.getLogger("news_mcp.api")
- def _api_ok(data: dict) -> dict:
- return data
- def _api_err(exc: Exception, ctx: str) -> JSONResponse:
- API_LOG.exception(f"API error in {ctx}")
- return JSONResponse(status_code=500, content={"error": str(exc), "ctx": ctx})
- @app.get("/api/v1/health")
- def api_health():
- """Extended health + dashboard stats."""
- try:
- store = DashboardStore(_shared_store)
- stats = store.get_dashboard_stats()
- stats["version"] = _VERSION_HASH
- return stats
- except Exception as e:
- return _api_err(e, "health")
- @app.get("/api/v1/clusters")
- def api_clusters(
- topic: str | None = None,
- hours: int = 24,
- limit: int = 50,
- offset: int = 0,
- ):
- """Paginated cluster listing."""
- try:
- store = DashboardStore(_shared_store)
- result = store.get_clusters_page(topic=topic, hours=hours, limit=limit, offset=offset)
- return {"clusters": result["clusters"], "total": result["total"], "topic": topic or "all", "hours": hours}
- except Exception as e:
- return _api_err(e, f"clusters(topic={topic},hours={hours})")
- @app.get("/api/v1/sentiment-series")
- def api_sentiment_series(
- topic: str | None = None,
- hours: int = 24,
- bucket_hours: float = 1.0,
- ):
- """Sentiment time-series for Chart.js."""
- try:
- store = DashboardStore(_shared_store)
- series = store.get_sentiment_series(topic=topic, hours=hours, bucket_hours=bucket_hours)
- return {"series": series, "topic": topic or "all"}
- except Exception as e:
- return _api_err(e, f"sentiment(topic={topic})")
- @app.get("/api/v1/entities")
- def api_entities(
- hours: int = 24,
- limit: int = 30,
- ):
- """Top entity frequencies."""
- try:
- store = DashboardStore(_shared_store)
- entities = store.get_entity_frequencies(hours=hours, limit=limit)
- return {"entities": entities, "hours": hours}
- except Exception as e:
- return _api_err(e, f"entities(hours={hours})")
- @app.get("/api/v1/keywords")
- def api_keywords(
- hours: int = 24,
- limit: int = 30,
- ):
- """Top keyword frequencies (thematic descriptors, excluding terms already counted as entities)."""
- try:
- store = DashboardStore(_shared_store)
- keywords = store.get_keyword_frequencies(hours=hours, limit=limit)
- return {"keywords": keywords, "hours": hours}
- except Exception as e:
- return _api_err(e, f"keywords(hours={hours})")
- @app.get("/api/v1/clusters/by-entity")
- def api_clusters_by_entity(
- entity: str,
- hours: int = 168,
- limit: int = 50,
- offset: int = 0,
- ):
- """Return clusters matching an entity, filtered by event time via SQL junction table."""
- try:
- store = DashboardStore(_shared_store)
- return store.get_clusters_by_entity(
- entity=entity.strip().lower(),
- hours=hours,
- limit=limit,
- offset=offset,
- )
- except Exception as e:
- return _api_err(e, f"by-entity(entity={entity},hours={hours})")
- @app.get("/api/v1/clusters/by-keyword")
- def api_clusters_by_keyword(
- keyword: str,
- hours: int = 168,
- limit: int = 50,
- offset: int = 0,
- ):
- """Return clusters matching a keyword, filtered by event time via SQL junction table."""
- try:
- store = DashboardStore(_shared_store)
- return store.get_clusters_by_keyword(
- keyword=keyword.strip().lower(),
- hours=hours,
- limit=limit,
- offset=offset,
- )
- except Exception as e:
- return _api_err(e, f"by-keyword(keyword={keyword},hours={hours})")
- @app.get("/api/v1/cluster/{cluster_id}")
- def api_cluster_detail(cluster_id: str):
- """Full cluster detail for drill-down."""
- try:
- store = DashboardStore(_shared_store)
- detail = store.get_cluster_detail(cluster_id)
- if not detail:
- return JSONResponse(status_code=404, content={"error": "Cluster not found", "id": cluster_id})
- return detail
- except Exception as e:
- return _api_err(e, f"detail({cluster_id})")
- # ------------------------------------------------------------------
- # Feed management endpoints (toggle on/off from dashboard)
- # ------------------------------------------------------------------
- @app.get("/api/v1/feeds")
- def api_feeds():
- """List all configured feeds with enabled/disabled status."""
- try:
- store = SQLiteClusterStore(DB_PATH)
- feed_list = store.get_feed_state_list()
- configured = _configured_feed_urls()
- return {
- "feeds": feed_list,
- "configured_urls": configured,
- }
- except Exception as e:
- return _api_err(e, "feeds")
- @app.post("/api/v1/feeds/toggle")
- async def api_feed_toggle(feed_url: str = Form(), enabled: bool = Form()):
- """Toggle a feed's enabled state."""
- try:
- store = SQLiteClusterStore(DB_PATH)
- ok = store.set_feed_enabled(feed_url.strip(), enabled)
- if not ok:
- return JSONResponse(
- status_code=404,
- content={"error": f"Feed not found: {feed_url}"},
- )
- return {"ok": True, "feed_url": feed_url.strip(), "enabled": enabled}
- except Exception as e:
- return _api_err(e, f"toggle({feed_url})")
- @app.get("/health")
- def health():
- return {
- "status": "ok",
- "uptime": round(time.monotonic() - _PROCESS_STARTED_AT, 3),
- "version": _VERSION_HASH,
- }
|