| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214 |
- from __future__ import annotations
- import asyncio
- import logging
- import subprocess
- import math
- import re
- import time
- from collections import Counter
- from datetime import datetime, timezone
- from pathlib import Path
- from fastapi import FastAPI, Form
- from mcp.server.fastmcp import FastMCP
- from mcp.server.transport_security import TransportSecuritySettings
- from news_mcp.config import DEFAULT_LOOKBACK_HOURS, DEFAULT_TOPICS, DB_PATH
- from news_mcp.config import (
- NEWS_FEED_URL,
- NEWS_FEED_URLS,
- NEWS_PRUNE_INTERVAL_HOURS,
- NEWS_PRUNING_ENABLED,
- NEWS_REFRESH_INTERVAL_SECONDS,
- NEWS_BACKGROUND_REFRESH_ENABLED,
- NEWS_BACKGROUND_REFRESH_ON_START,
- NEWS_RETENTION_DAYS,
- )
- from news_mcp.jobs.poller import refresh_clusters
- from news_mcp.storage.sqlite_store import SQLiteClusterStore
- from news_mcp.dashboard.dashboard_store import DashboardStore
- from news_mcp.enrichment.llm_enrich import summarize_cluster_llm
- from news_mcp.trends_resolution import resolve_entity_via_trends
- from news_mcp.llm import active_llm_config
- from news_mcp.entity_normalize import normalize_query
- from news_mcp.related_entities import related_recent_entities
- logging.basicConfig(
- level=logging.INFO,
- format="%(asctime)s %(levelname)s %(name)s: %(message)s",
- )
- _PROCESS_STARTED_AT = time.monotonic()
- _REPO_ROOT = Path(__file__).resolve().parent
- try:
- _VERSION_HASH = (
- subprocess.check_output(
- ["git", "rev-parse", "--short=9", "HEAD"],
- cwd=str(_REPO_ROOT),
- stderr=subprocess.DEVNULL,
- )
- .decode()
- .strip()
- )
- except Exception:
- _VERSION_HASH = "unknown"
- mcp = FastMCP(
- "news-mcp",
- transport_security=TransportSecuritySettings(enable_dns_rebinding_protection=False),
- )
- def _cluster_entity_haystack(cluster: dict) -> list[str]:
- """Collect the normalized entity + keyword clues attached to a cluster."""
- values: list[str] = []
- for ent in cluster.get("entities", []) or []:
- values.append(str(ent).strip().lower())
- for res in cluster.get("entityResolutions", []) or []:
- if not isinstance(res, dict):
- continue
- for key in ("normalized", "canonical_label", "mid"):
- val = res.get(key)
- if val:
- values.append(str(val).strip().lower())
- # Keywords are LLM-curated thematic descriptors — include them in the
- # searchable haystack so entity/theme queries match on subject-matter
- # signals, not just named entities.
- for kw in cluster.get("keywords", []) or []:
- values.append(str(kw).strip().lower())
- return [v for v in values if v]
- def _parse_cluster_timestamp(value) -> datetime:
- """Parse a stored cluster timestamp.
- payload.timestamp is guaranteed ISO 8601 UTC (YYYY-MM-DDTHH:MM:SS+00:00)
- at write time. Only datetime.fromisoformat is needed — no RFC 2822 fallback.
- """
- if not value:
- return datetime.min.replace(tzinfo=timezone.utc)
- text = str(value).strip()
- if not text:
- return datetime.min.replace(tzinfo=timezone.utc)
- try:
- dt = datetime.fromisoformat(text)
- if dt.tzinfo is None:
- dt = dt.replace(tzinfo=timezone.utc)
- return dt.astimezone(timezone.utc)
- except Exception:
- return datetime.min.replace(tzinfo=timezone.utc)
- def _sort_clusters_by_recency(clusters: list[dict]) -> list[dict]:
- return sorted(
- clusters,
- key=lambda c: (
- _parse_cluster_timestamp(c.get("timestamp")),
- float(c.get("importance", 0.0) or 0.0),
- ),
- reverse=True,
- )
- def _tool_card(name: str, description: str, inputs: list[dict], outputs: list[str], notes: list[str] | None = None) -> dict:
- return {
- "name": name,
- "description": description,
- "inputs": inputs,
- "outputs": outputs,
- "notes": notes or [],
- }
- NEWS_TOOL_CARDS = [
- _tool_card(
- "get_feeds",
- "List all configured RSS feeds with their enabled/disabled status.",
- [],
- ["feeds[]: {feed_key, enabled, last_hash, last_item_count, updated_at}"],
- ["Use this to see which feeds are currently active or disabled."],
- ),
- _tool_card(
- "toggle_feed",
- "Enable or disable a specific RSS feed by URL.",
- [
- {"name": "feed_url", "type": "string", "meaning": "the feed URL to toggle"},
- {"name": "enabled", "type": "boolean", "meaning": "true to enable, false to disable"},
- ],
- ["ok", "feed_key", "enabled"],
- ["Changes take effect on the next refresh cycle."],
- ),
- _tool_card(
- "get_latest_events",
- "Get the newest deduplicated clusters for a topic or resolved entity-like query.",
- [
- {"name": "topic", "type": "string", "default": "all topics", "meaning": "coarse category (crypto, macro, regulation, ai, other), entity-like topic, or omit for all topics"},
- {"name": "limit", "type": "integer", "default": 5, "range": "1-20"},
- {"name": "include_articles", "type": "boolean", "default": False},
- ],
- ["headline", "summary", "entities", "keywords", "sentiment", "importance", "sources", "timestamp", "articles?"],
- ["Use when you want the freshest clusters. Each cluster includes both named entities and LLM-curated thematic keywords describing what the story is about."],
- ),
- _tool_card(
- "get_events_for_entity",
- "Search recent clusters for a person, place, company, theme, or keyword by matching entities and thematic keywords.",
- [
- {"name": "entity", "type": "string", "meaning": "entity label, phrase, or keyword to search for"},
- {"name": "timeframe", "type": "string", "default": "24h", "examples": ["24h", "72h", "3d"]},
- {"name": "limit", "type": "integer", "default": 10, "range": "1-30"},
- {"name": "include_articles", "type": "boolean", "default": False},
- ],
- ["headline", "summary", "entities", "keywords", "sentiment", "importance", "sources", "timestamp", "articles?"],
- ["Matches against both named entities and thematic keywords. Use this for an entity-centered or theme-centered deep dive."],
- ),
- _tool_card(
- "get_event_summary",
- "Produce a concise LLM-written explanation for one cluster and key facts.",
- [
- {"name": "event_id", "type": "string", "meaning": "cluster_id; do not surface in user-facing prose"},
- {"name": "include_articles", "type": "boolean", "default": False},
- ],
- ["headline", "mergedSummary", "keyFacts", "sources", "articles?"],
- ["Prefer this after you have already chosen a specific cluster to explain."],
- ),
- _tool_card(
- "detect_emerging_topics",
- "Surface emerging entities, thematic keywords, and phrases that are accelerating in the recent window.",
- [
- {"name": "limit", "type": "integer", "default": 10, "range": "1-20"},
- {"name": "timeframe", "type": "string", "default": "24h", "examples": ["4h", "24h", "3d"]},
- {"name": "topic", "type": "string", "default": "all topics", "examples": ["crypto", "macro", "regulation", "ai", "other"]},
- {"name": "around", "type": "string", "default": "none", "meaning": "entity to scope results to its neighborhood (e.g. \"Bitcoin\")"},
- ],
- ["topic", "trend_score", "velocity", "recent_count", "prior_count", "source_count", "related_entities", "signal_type"],
- ["Use timeframe to control lookback, topic to scope to a category, around to find what's emerging near a specific entity. Signal types: entity (named entity), keyword (thematic descriptor), phrase (headline bigram). Check velocity and source_count to distinguish real spikes from noise."],
- ),
- _tool_card(
- "get_news_sentiment",
- "Estimate sentiment around an entity or keyword over a lookback window.",
- [
- {"name": "entity", "type": "string", "meaning": "entity label, phrase, or keyword to analyze"},
- {"name": "timeframe", "type": "string", "default": "24h"},
- ],
- ["entity", "sentiment", "score", "cluster_count"],
- ["Matches clusters by entities and keywords. Use after locating a cluster set or entity neighborhood."],
- ),
- _tool_card(
- "get_related_recent_entities",
- "Find entities and thematic keywords commonly co-occurring with a subject in recent clusters, optionally blended with Google Trends suggestions.",
- [
- {"name": "subject", "type": "string", "meaning": "canonical entity or subject phrase"},
- {"name": "timeframe", "type": "string", "default": "72h"},
- {"name": "limit", "type": "integer", "default": 10, "range": "1-25"},
- {"name": "include_trends", "type": "boolean", "default": True},
- ],
- ["subject", "related[].normalized", "related[].canonical_label", "related[].mid", "related[].sources", "related[].scores"],
- ["Use this to drill from a subject into related entities and themes, then feed results into get_events_for_entity."],
- ),
- ]
- NEWS_COMPOSITION_RECIPES = [
- {
- "name": "fresh-news-tail",
- "steps": [
- "get_latest_events(topic=...)",
- "optionally get_event_summary(event_id=...) for the strongest cluster",
- ],
- "notes": ["Best for a quick tail of what is happening now. Omit topic for all topics, or pass crypto/macro/regulation/ai/other to filter."]
- },
- {
- "name": "entity-deep-dive",
- "steps": [
- "get_events_for_entity(entity=...)",
- "get_event_summary(event_id=...)",
- "get_news_sentiment(entity=..., timeframe=...)",
- ],
- "notes": ["Prefer canonical entity labels when you have them; the server normalizes for you."],
- },
- {
- "name": "subject-neighborhood",
- "steps": [
- "get_related_recent_entities(subject=...)",
- "for each strong related entity, call get_events_for_entity(entity=...)",
- ],
- "notes": ["Use this when you want a graph-like expansion around a subject."]
- },
- {
- "name": "emerging-signal",
- "steps": [
- "detect_emerging_topics(limit=..., timeframe=..., topic=..., around=...)",
- "choose a topic/entity from the results",
- "get_events_for_entity(entity=...)",
- "get_news_sentiment(entity=...)",
- ],
- "notes": ["Use timeframe to control lookback (e.g. \"4h\" for what's hot right now, \"3d\" for weekly trends), topic to scope to a category, around to find what's emerging near a specific entity. Check velocity and source_count to distinguish real spikes from noise."],
- },
- ]
- NEWS_AGENT_TIPS = [
- "If you need a fast answer, start with get_latest_events, then summarize the strongest cluster with get_event_summary.",
- "If a user asks about a person/place/company/theme, use get_events_for_entity before broadening to get_related_recent_entities.",
- "Treat cluster_id as an internal cursor, not user-facing output; use it only for follow-up tool calls.",
- "When describing clusters, keep sources and timestamps visible so the user can assess recency and provenance.",
- "Prefer a short chain of tools over many parallel calls unless you are building a neighborhood map or comparison table.",
- "For tricky names, rely on the server's resolver instead of inventing alias rules in the client.",
- "Use detect_emerging_topics with timeframe=\"4h\" for what's hot right now, timeframe=\"3d\" for weekly trends. Use topic= to scope to a category, around= to find what's emerging near a specific entity. Check velocity to distinguish accelerating signals from steady-state ones. Filter by signal_type to focus on entities, keywords, or phrases.",
- "Each cluster contains both entities (named entities with identity resolution) and keywords (thematic descriptors). Use keywords to understand what a story is about beyond the named entities.",
- ]
- NEWS_EXAMPLE_CHAINS = [
- {
- "task": "What is happening now?",
- "chain": [
- "get_latest_events(topic=...)",
- "get_event_summary(event_id=...) if one cluster looks important",
- ],
- },
- {
- "task": "Deep dive on an entity",
- "chain": [
- "get_events_for_entity(entity=..., timeframe=...)",
- "get_news_sentiment(entity=..., timeframe=...)",
- "get_event_summary(event_id=...) for the strongest cluster",
- ],
- },
- {
- "task": "Broaden from a subject",
- "chain": [
- "get_related_recent_entities(subject=..., include_trends=true)",
- "get_events_for_entity(entity=...) for the strongest related entities",
- ],
- },
- {
- "task": "Find what is emerging",
- "chain": [
- "detect_emerging_topics(limit=..., timeframe=..., topic=..., around=...) with optional scoping",
- "get_events_for_entity(entity=...) on one or two emerging terms",
- ],
- },
- {
- "task": "What's heating up around a specific entity",
- "chain": [
- "detect_emerging_topics(around=\"<entity>\", timeframe=\"4h\")",
- "get_events_for_entity(entity=...) on the top emerging neighbor",
- ],
- },
- ]
- def _configured_feed_urls() -> list[str]:
- """Return the configured feed URLs from environment variables."""
- urls = [u.strip() for u in NEWS_FEED_URLS.split(",") if u.strip()]
- if not urls:
- urls = [NEWS_FEED_URL]
- return urls
- @mcp.tool(description="List all configured RSS feeds with their current enabled/disabled status.")
- async def get_feeds() -> list[dict]:
- """Return each feed URL with its enabled flag, last fetch stats, and timestamps."""
- store = SQLiteClusterStore(DB_PATH)
- return store.get_feed_state_list()
- @mcp.tool(description="Enable or disable a specific RSS feed by URL.")
- async def toggle_feed(feed_url: str, enabled: bool) -> dict:
- """Toggle a feed's active/inactive state.
- Changes take effect on the next background refresh cycle.
- Returns the updated feed state.
- """
- store = SQLiteClusterStore(DB_PATH)
- store.set_feed_enabled(feed_url.strip(), enabled)
- updated = store.get_feed_state(feed_url.strip())
- return {"ok": True, "feed_key": feed_url.strip(), "enabled": enabled, "details": updated}
- @mcp.tool(description="Investigate a topic and return the newest deduplicated news clusters with entities and thematic keywords, sorted by recency.")
- async def get_latest_events(topic: str | None = None, limit: int = 5, include_articles: bool = False):
- limit = max(1, min(int(limit), 20))
- # When topic is omitted, search across all topics (no topic filter).
- # When topic is provided and matches a known topic, filter by that topic.
- # Otherwise treat the value as an entity-like query.
- topic_norm = normalize_query(topic).lower() if topic else ""
- resolved = resolve_entity_via_trends(topic_norm) if topic_norm else {}
- allowed = {t.lower() for t in DEFAULT_TOPICS}
- is_topic = topic_norm in allowed
- is_all_topics = not topic_norm
- query_terms = {
- topic_norm,
- str(resolved.get("normalized") or "").strip().lower(),
- str(resolved.get("canonical_label") or "").strip().lower(),
- str(resolved.get("mid") or "").strip().lower(),
- }
- query_terms = {q for q in query_terms if q}
- store = SQLiteClusterStore(DB_PATH)
- if is_all_topics:
- # No topic specified: return freshest clusters across all topics.
- clusters = store.get_latest_clusters_all_topics(ttl_hours=DEFAULT_LOOKBACK_HOURS, limit=limit)
- elif is_topic:
- # Cache-first: only refresh if we currently have no fresh clusters for this topic.
- clusters = store.get_latest_clusters(topic=topic_norm, ttl_hours=DEFAULT_LOOKBACK_HOURS, limit=limit)
- if not clusters:
- await refresh_clusters(topic=topic_norm, limit=200)
- clusters = store.get_latest_clusters(topic=topic_norm, ttl_hours=DEFAULT_LOOKBACK_HOURS, limit=limit)
- else:
- # Entity-aware mode: search recent clusters across all topics and match by
- # raw entity, canonical label, or MID.
- clusters = store.get_latest_clusters_all_topics(ttl_hours=DEFAULT_LOOKBACK_HOURS, limit=limit * 8)
- filtered = []
- for c in clusters:
- haystack = _cluster_entity_haystack(c)
- if any(any(term in item for item in haystack) for term in query_terms):
- filtered.append(c)
- if len(filtered) >= limit:
- break
- clusters = filtered
- out = []
- for c in _sort_clusters_by_recency(clusters):
- item = {
- "cluster_id": c.get("cluster_id"),
- "headline": c.get("headline"),
- "summary": c.get("summary"),
- "entities": c.get("entities", []),
- "keywords": c.get("keywords", []),
- "sentiment": c.get("sentiment", "neutral"),
- "importance": c.get("importance", 0.0),
- "sources": c.get("sources", []),
- "timestamp": c.get("timestamp"),
- }
- if include_articles:
- # Return minimal article fields to keep responses compact.
- arts = c.get("articles", []) or []
- item["articles"] = [
- {
- "title": a.get("title"),
- "url": a.get("url"),
- "source": a.get("source"),
- "timestamp": a.get("timestamp"),
- }
- for a in arts
- if isinstance(a, dict)
- ]
- out.append(item)
- return out
- @mcp.tool(description="Investigate a person, company, place, theme, or keyword by matching entities and thematic keywords within a time window.")
- async def get_events_for_entity(entity: str, limit: int = 10, timeframe: str = "24h", include_articles: bool = False):
- limit = max(1, min(int(limit), 30))
- query = normalize_query(entity).strip().lower()
- if not query:
- return []
- resolved = resolve_entity_via_trends(query)
- query_terms = {
- query,
- str(resolved.get("normalized") or "").strip().lower(),
- str(resolved.get("canonical_label") or "").strip().lower(),
- str(resolved.get("mid") or "").strip().lower(),
- }
- query_terms = {q for q in query_terms if q}
- store = SQLiteClusterStore(DB_PATH)
- def _match_clusters(clusters: list[dict]) -> list[dict]:
- hits: list[dict] = []
- for c in _sort_clusters_by_recency(clusters):
- haystack = _cluster_entity_haystack(c)
- if any(any(term in item for item in haystack) for term in query_terms):
- hits.append(c)
- if len(hits) >= limit:
- break
- return hits
- hours = _parse_timeframe_to_hours(timeframe)
- clusters = store.get_latest_clusters_all_topics(ttl_hours=hours, limit=max(200, limit * 10))
- hits = _match_clusters(clusters)
- out = []
- for c in hits:
- item = {
- "cluster_id": c.get("cluster_id"),
- "headline": c.get("headline"),
- "summary": c.get("summary"),
- "entities": c.get("entities", []),
- "keywords": c.get("keywords", []),
- "sentiment": c.get("sentiment", "neutral"),
- "importance": c.get("importance", 0.0),
- "sources": c.get("sources", []),
- "timestamp": c.get("timestamp"),
- }
- if include_articles:
- arts = c.get("articles", []) or []
- item["articles"] = [
- {
- "title": a.get("title"),
- "url": a.get("url"),
- "source": a.get("source"),
- "timestamp": a.get("timestamp"),
- }
- for a in arts
- if isinstance(a, dict)
- ]
- out.append(item)
- return out
- @mcp.tool(description="Return entities and thematic keywords commonly co-occurring with the subject in recent clusters, optionally blended with Google Trends suggestions.")
- async def get_related_recent_entities(subject: str, timeframe: str = "72h", limit: int = 10, include_trends: bool = True):
- limit = max(1, min(int(limit), 25))
- hours = _parse_timeframe_to_hours(timeframe)
- include_trends_bool = str(include_trends).strip().lower() not in {"false", "0", "no"}
- store = SQLiteClusterStore(DB_PATH)
- result = related_recent_entities(
- store=store,
- subject=subject,
- timeframe_hours=hours,
- limit=limit,
- include_trends=include_trends_bool,
- )
- return result
- @mcp.tool(description="Investigate one cluster in depth and return a concise LLM-written explanation plus key facts.")
- async def get_event_summary(event_id: str, include_articles: bool = False):
- store = SQLiteClusterStore(DB_PATH)
- # Summary cache: reuse if present within TTL.
- cached_summary = store.get_cluster_summary(
- cluster_id=event_id,
- ttl_hours=DEFAULT_LOOKBACK_HOURS,
- )
- if cached_summary:
- out = {
- "event_id": event_id,
- "headline": cached_summary.get("headline"),
- "mergedSummary": cached_summary.get("mergedSummary"),
- "keyFacts": cached_summary.get("keyFacts", []),
- "sources": cached_summary.get("sources", []),
- }
- if include_articles:
- cluster = store.get_cluster_by_id(event_id)
- arts = (cluster or {}).get("articles", []) or []
- out["articles"] = [
- {
- "title": a.get("title"),
- "url": a.get("url"),
- "source": a.get("source"),
- "timestamp": a.get("timestamp"),
- }
- for a in arts
- if isinstance(a, dict)
- ]
- return out
- cluster = store.get_cluster_by_id(event_id)
- if not cluster:
- return {
- "event_id": event_id,
- "error": "NOT_FOUND",
- }
- articles_out = None
- if include_articles:
- arts = cluster.get("articles", []) or []
- articles_out = [
- {
- "title": a.get("title"),
- "url": a.get("url"),
- "source": a.get("source"),
- "timestamp": a.get("timestamp"),
- }
- for a in arts
- if isinstance(a, dict)
- ]
- summary = await summarize_cluster_llm(cluster)
- store.upsert_cluster_summary(event_id, summary)
- out = {
- "event_id": event_id,
- "headline": summary.get("headline"),
- "mergedSummary": summary.get("mergedSummary"),
- "keyFacts": summary.get("keyFacts", []),
- "sources": summary.get("sources", []),
- }
- if include_articles:
- out["articles"] = articles_out or []
- return out
- @mcp.tool(description="Explore what is starting to matter: surface emerging entities, thematic keywords, and phrases from recent clusters. "
- "Use timeframe to control the lookback window, topic to scope to a category, and around to find what's emerging near a specific entity. "
- "Results include signal_type (entity / keyword / phrase) for downstream filtering.")
- async def detect_emerging_topics(limit: int = 10, timeframe: str = "24h", topic: str | None = None, around: str | None = None):
- """Surface entities and phrases that are accelerating in recent clusters.
- Args:
- limit: max results to return (1-20, default 10).
- timeframe: lookback window like "4h", "24h", "3d" (default "24h").
- topic: optional coarse topic filter ("crypto", "macro", "regulation", "ai", "other").
- around: optional entity — only return entities that co-occur with this entity
- in the recent window (e.g. "Bitcoin" to find what's emerging in Bitcoin's neighborhood).
- """
- limit = max(1, min(int(limit), 20))
- hours = _parse_timeframe_to_hours(timeframe)
- half_hours = hours / 2.0
- store = SQLiteClusterStore(DB_PATH)
- # Fetch more clusters than needed so velocity stats are meaningful even for short windows.
- clusters = store.get_latest_clusters_all_topics(ttl_hours=hours, limit=500)
- # --- optional topic filter ---
- if topic:
- topic_norm = normalize_query(topic).strip().lower()
- if topic_norm:
- clusters = [c for c in clusters if (c.get("topic") or "other").strip().lower() == topic_norm]
- # --- resolve the 'around' entity ---
- around_terms: set[str] = set()
- if around:
- around_norm = normalize_query(around).strip().lower()
- if around_norm:
- resolved = resolve_entity_via_trends(around_norm)
- around_terms = {
- around_norm,
- str(resolved.get("normalized") or "").strip().lower(),
- str(resolved.get("canonical_label") or "").strip().lower(),
- }
- around_terms.discard("")
- # split clusters into first-half vs second-half by timestamp
- # clusters are already sorted most-recent-first from the store
- now = datetime.now(timezone.utc)
- def _cluster_age_hours(c: dict) -> float:
- """Return the cluster's age in hours. payload.timestamp is ISO 8601 UTC guaranteed."""
- ts = c.get("timestamp") or c.get("last_updated")
- if not ts:
- return 0.0
- try:
- dt = datetime.fromisoformat(str(ts).strip())
- if dt.tzinfo is None:
- dt = dt.replace(tzinfo=timezone.utc)
- return max(0.0, (now - dt.astimezone(timezone.utc)).total_seconds() / 3600.0)
- except Exception:
- return 0.0
- # Generic entity filter
- _generic_tokens = {"news", "latest", "breaking", "update", "updates", "report", "reports"}
- def _is_generic_entity(ent: str) -> bool:
- e = str(ent).strip().lower()
- if not e or len(e) < 4:
- return True
- if e in _generic_tokens:
- return True
- return False
- # --- accumulate signals ---
- # recent = second half of timeframe (newer), prior = first half (older)
- entity_counts_recent = Counter()
- entity_counts_prior = Counter()
- entity_importance_recent = Counter()
- entity_sources: dict[str, set] = {} # ent -> set of source names
- entity_buckets: dict[str, set] = {} # ent -> set of time-bucket indices (for sustained-spike detection)
- entity_cooccur: dict[str, Counter] = {}
- phrase_counts_recent = Counter()
- # Keyword accumulators — same scoring pipeline as entities, but tracking
- # LLM-curated thematic descriptors instead of named entities.
- kw_counts_recent = Counter()
- kw_counts_prior = Counter()
- kw_importance_recent = Counter()
- kw_sources: dict[str, set] = {}
- kw_buckets: dict[str, set] = {}
- bucket_size_hours = max(1.0, hours / 6.0) # split window into ~6 buckets
- for c in clusters:
- ents_in_cluster = [e for e in (c.get("entities", []) or []) if not _is_generic_entity(e)]
- ents_norm = [str(e).strip().lower() for e in ents_in_cluster if str(e).strip()]
- # Keywords: deduplicate per cluster so a cluster with the same keyword
- # listed twice doesn't inflate counts.
- kws_in_cluster = list(dict.fromkeys(
- str(k).strip().lower()
- for k in (c.get("keywords", []) or [])
- if str(k).strip() and not _is_generic_entity(k)
- ))
- age_h = _cluster_age_hours(c)
- is_recent = age_h <= half_hours
- bucket_idx = int(age_h / bucket_size_hours)
- # --- around filter: only count clusters that mention the target entity ---
- if around_terms:
- haystack = set(ents_norm)
- for res in c.get("entityResolutions", []) or []:
- if isinstance(res, dict):
- for key in ("normalized", "canonical_label"):
- val = res.get(key)
- if val:
- haystack.add(str(val).strip().lower())
- if not (haystack & around_terms):
- continue
- counts = entity_counts_recent if is_recent else entity_counts_prior
- imp_acc = entity_importance_recent if is_recent else None # only importance from recent window
- for ent in ents_norm:
- if _is_generic_entity(ent):
- continue
- counts[ent] += 1
- if ent not in entity_sources:
- entity_sources[ent] = set()
- src = c.get("source") or c.get("headline", "").split(" - ")[-1] if c.get("headline") else ""
- if src:
- entity_sources[ent].add(str(src))
- if ent not in entity_buckets:
- entity_buckets[ent] = set()
- entity_buckets[ent].add(bucket_idx)
- if imp_acc is not None:
- try:
- imp_acc[ent] += float(c.get("importance", 0.0) or 0.0)
- except Exception:
- pass
- # --- keyword counting (same recent/prior split as entities) ---
- kw_counts = kw_counts_recent if is_recent else kw_counts_prior
- kw_imp_acc = kw_importance_recent if is_recent else None
- for kw in kws_in_cluster:
- kw_counts[kw] += 1
- if kw not in kw_sources:
- kw_sources[kw] = set()
- src = c.get("source") or c.get("headline", "").split(" - ")[-1] if c.get("headline") else ""
- if src:
- kw_sources[kw].add(str(src))
- if kw not in kw_buckets:
- kw_buckets[kw] = set()
- kw_buckets[kw].add(bucket_idx)
- if kw_imp_acc is not None:
- try:
- kw_imp_acc[kw] += float(c.get("importance", 0.0) or 0.0) # type: ignore[assignment]
- except Exception:
- pass
- # co-occurrence (only for clusters matching the around filter, if any)
- for i in range(len(ents_norm)):
- a = ents_norm[i]
- if _is_generic_entity(a):
- continue
- if a not in entity_cooccur:
- entity_cooccur[a] = Counter()
- for j in range(len(ents_norm)):
- if i == j:
- continue
- b = ents_norm[j]
- if _is_generic_entity(b):
- continue
- entity_cooccur[a][b] += 1
- # bigram phrases (recent only)
- if is_recent:
- text = f"{c.get('headline', '')} {c.get('summary', '')}"
- words = re.findall(r"[A-Za-z][A-Za-z0-9\-]{2,}", text.lower())
- for i in range(len(words) - 1):
- phrase = f"{words[i]} {words[i+1]}"
- if len(phrase) > 6:
- phrase_counts_recent[phrase] += 1
- # --- score entities ---
- all_entities = set(entity_counts_recent.keys()) | set(entity_counts_prior.keys())
- scored = []
- for ent in all_entities:
- recent_n = entity_counts_recent.get(ent, 0)
- prior_n = entity_counts_prior.get(ent, 0)
- total_n = recent_n + prior_n
- if total_n < 1:
- continue
- # velocity: ratio of recent vs prior (smoothed to avoid division noise)
- # 0 prior → velocity = recent_n (pure emergence)
- # equal → velocity = 1.0 (steady)
- velocity = (recent_n + 0.5) / (prior_n + 0.5)
- # recency weight: what fraction of total hits are in the recent window
- recency_ratio = recent_n / total_n
- # source diversity: how many distinct outlets
- n_sources = len(entity_sources.get(ent, set()))
- # sustained: how many distinct time buckets did it appear in (max ~6)
- n_buckets = len(entity_buckets.get(ent, set()))
- # average importance (recent window only)
- avg_imp = (entity_importance_recent.get(ent, 0.0) / max(1, recent_n)) if recent_n > 0 else 0.0
- composed_score = (
- 0.35 * min(1.0, math.log1p(velocity) / math.log1p(4.0)) + # velocity (0..1, 4x = max)
- 0.25 * recency_ratio + # recency concentration
- 0.15 * min(1.0, n_sources / 5.0) + # source diversity
- 0.10 * min(1.0, n_buckets / 4.0) + # sustained (>1 bucket)
- 0.15 * min(1.0, avg_imp) # importance
- )
- related = []
- if ent in entity_cooccur:
- for other, _cnt in entity_cooccur[ent].most_common(5):
- if other != ent:
- related.append(other)
- scored.append({
- "topic": ent,
- "trend_score": min(0.99, round(composed_score, 3)),
- "related_entities": related[:3] if related else [ent],
- "velocity": round(velocity, 2),
- "recent_count": recent_n,
- "prior_count": prior_n,
- "source_count": n_sources,
- "avg_importance": round(avg_imp, 3),
- "signal_type": "entity",
- })
- # --- score keywords (same velocity/recency/source/sustained/importance formula) ---
- all_keywords = set(kw_counts_recent.keys()) | set(kw_counts_prior.keys())
- kw_scored = []
- for kw in all_keywords:
- # Skip keywords that are already scored as entities — entity signal is
- # higher quality (proper nouns, resolved identities).
- if kw in all_entities:
- continue
- recent_n = kw_counts_recent.get(kw, 0)
- prior_n = kw_counts_prior.get(kw, 0)
- total_n = recent_n + prior_n
- if total_n < 1:
- continue
- velocity = (recent_n + 0.5) / (prior_n + 0.5)
- recency_ratio = recent_n / total_n
- n_sources = len(kw_sources.get(kw, set()))
- n_buckets = len(kw_buckets.get(kw, set()))
- avg_imp = (kw_importance_recent.get(kw, 0.0) / max(1, recent_n)) if recent_n > 0 else 0.0
- composed_score = (
- 0.35 * min(1.0, math.log1p(velocity) / math.log1p(4.0)) +
- 0.25 * recency_ratio +
- 0.15 * min(1.0, n_sources / 5.0) +
- 0.10 * min(1.0, n_buckets / 4.0) +
- 0.15 * min(1.0, avg_imp)
- )
- kw_scored.append({
- "topic": kw,
- "trend_score": min(0.99, round(composed_score, 3)),
- "related_entities": [],
- "velocity": round(velocity, 2),
- "recent_count": recent_n,
- "prior_count": prior_n,
- "source_count": n_sources,
- "avg_importance": round(avg_imp, 3),
- "signal_type": "keyword",
- })
- # sort keywords by score descending
- kw_scored.sort(key=lambda x: (-x["trend_score"], -x["velocity"], x["topic"]))
- # sort by composed score descending
- scored.sort(key=lambda x: (-x["trend_score"], -x["velocity"], x["topic"]))
- # --- merge: entities first, then keywords, then phrases ---
- emerging = list(scored) # start with entities
- seen_topics = {item["topic"] for item in emerging}
- for kw_item in kw_scored:
- if kw_item["topic"] not in seen_topics:
- emerging.append(kw_item)
- seen_topics.add(kw_item["topic"])
- # --- add phrase signals (only from recent window) ---
- for phrase, count in phrase_counts_recent.most_common(limit * 2):
- if phrase in seen_topics:
- continue
- emerging.append({
- "topic": phrase.title(),
- "trend_score": min(0.99, round(0.30 + 0.15 * min(count, 5), 2)),
- "related_entities": [],
- "velocity": None,
- "recent_count": count,
- "prior_count": 0,
- "source_count": 0,
- "avg_importance": 0.0,
- "signal_type": "phrase",
- })
- seen_topics.add(phrase)
- if len(emerging) >= limit:
- break
- return emerging[:limit]
- @mcp.tool(description="Investigate whether sentiment around an entity or keyword is positive, negative, or neutral over a chosen lookback window. "
- "Matches clusters by both named entities and thematic keywords.")
- async def get_news_sentiment(entity: str, timeframe: str = "24h"):
- store = SQLiteClusterStore(DB_PATH)
- ent = normalize_query(entity).strip().lower()
- resolved = resolve_entity_via_trends(ent)
- query_terms = {
- ent,
- str(resolved.get("normalized") or "").strip().lower(),
- str(resolved.get("canonical_label") or "").strip().lower(),
- str(resolved.get("mid") or "").strip().lower(),
- }
- query_terms = {q for q in query_terms if q}
- if not ent:
- return {
- "entity": entity,
- "sentiment": "neutral",
- "score": 0.0,
- "cluster_count": 0,
- }
- # timeframe: accept '24h' or '24'
- tf = str(timeframe).strip().lower()
- try:
- hours = int(tf[:-1]) if tf.endswith("h") else int(tf)
- except Exception:
- hours = 24
- hours = max(1, min(int(hours), 168))
- clusters = store.get_latest_clusters_all_topics(ttl_hours=hours, limit=500)
- matched = []
- for c in clusters:
- haystack = _cluster_entity_haystack(c)
- if any(any(term in item for item in haystack) for term in query_terms):
- matched.append(c)
- if not matched:
- return {
- "entity": entity,
- "sentiment": "neutral",
- "score": 0.0,
- "cluster_count": 0,
- }
- scores = []
- for c in matched:
- s = c.get("sentimentScore")
- if s is not None:
- try:
- scores.append(float(s))
- except Exception:
- pass
- avg_score = sum(scores) / len(scores) if scores else 0.0
- # Keep the label aligned with the numeric score.
- # Small magnitudes are treated as neutral to avoid noisy label flips.
- if avg_score >= 0.15:
- sentiment = "positive"
- elif avg_score <= -0.15:
- sentiment = "negative"
- else:
- sentiment = "neutral"
- return {
- "entity": entity,
- "sentiment": sentiment,
- "score": round(avg_score, 3),
- "cluster_count": len(matched),
- }
- @mcp.tool(description="Describe the server tool surface, how tools fit together, and output conventions for downstream agents.")
- async def get_capabilities():
- return {
- "server": {
- "name": "news-mcp",
- "purpose": "Recent news clusters with entities and thematic keywords, entity/keyword drill-down, sentiment, emerging topics, and related-entity expansion.",
- "output_conventions": {
- "cluster_ids": "Do not surface cluster_id in user-facing prose unless explicitly requested; treat it as internal navigation metadata.",
- "sources": "Always preserve and display sources when summarizing a cluster or entity result.",
- "timestamps": "Mention timestamps consistently when comparing multiple clusters or when recency matters.",
- "clusters": "Each cluster includes entities (named entities with optional MID/canonical_label) and keywords (thematic descriptors). Both are searchable; entities are higher-signal, keywords capture subject-matter themes.",
- },
- },
- "tools": NEWS_TOOL_CARDS,
- "recipes": NEWS_COMPOSITION_RECIPES,
- "example_chains": NEWS_EXAMPLE_CHAINS,
- "agent_tips": NEWS_AGENT_TIPS,
- "guidance": [
- "Use get_latest_events for a tail, get_events_for_entity for entity/keyword deep dives, and get_related_recent_entities for neighborhood expansion.",
- "Prefer normalized/canonical entities when possible, but the server will resolve common aliases and MIDs for you.",
- "When presenting results to users, summarize the cluster; avoid exposing internal IDs unless they are needed for follow-up tool calls.",
- "For emerging topics, use detect_emerging_topics with timeframe and around parameters. Signal types: entity (named entity, highest quality), keyword (thematic descriptor), phrase (headline bigram). High velocity + high source_count = strong signal.",
- "get_events_for_entity and get_news_sentiment match both entities and thematic keywords — use keywords when the subject is a theme rather than a named entity.",
- ],
- }
- def _parse_timeframe_to_hours(timeframe: str) -> int:
- tf = str(timeframe).strip().lower()
- try:
- if tf.endswith("d"):
- days = int(tf[:-1])
- return max(1, days * 24)
- if tf.endswith("h"):
- return max(1, int(tf[:-1]))
- return max(1, int(tf))
- except Exception:
- return 24
- from contextlib import asynccontextmanager
- @asynccontextmanager
- async def _lifespan(app: FastAPI):
- asyncio.ensure_future(_background_refresh_loop())
- yield
- app = FastAPI(title="News MCP Server", lifespan=_lifespan)
- logger = logging.getLogger("news_mcp.startup")
- app.mount("/mcp", mcp.sse_app())
- # Shared store — single connection pool
- _shared_store = SQLiteClusterStore(DB_PATH)
- _refresh_lock = asyncio.Lock()
- _refresh_started = False
- async def _background_refresh_loop():
- """Non-blocking background refresher: prune then poll.
- Protected by an async lock so a second event-loop wake-up cannot
- start a parallel ingestion cycle.
- """
- global _refresh_started
- async with _refresh_lock:
- if _refresh_started:
- return
- _refresh_started = True
- logger.info("news-mcp llm config: %s", active_llm_config())
- # Prune off-thread so we do not block the event loop
- prune_result = await asyncio.to_thread(
- _shared_store.prune_if_due,
- NEWS_PRUNING_ENABLED,
- NEWS_RETENTION_DAYS,
- NEWS_PRUNE_INTERVAL_HOURS,
- )
- logger.info("startup prune_result=%s", prune_result)
- if not NEWS_BACKGROUND_REFRESH_ENABLED:
- return
- async def _loop():
- if not NEWS_BACKGROUND_REFRESH_ON_START:
- logger.info("background refresh delayed start interval_seconds=%s", NEWS_REFRESH_INTERVAL_SECONDS)
- await asyncio.sleep(float(NEWS_REFRESH_INTERVAL_SECONDS))
- while True:
- try:
- logger.info("background refresh tick start")
- await refresh_clusters(topic=None, limit=200)
- logger.info("background refresh tick complete")
- except Exception:
- logger.exception("background refresh tick failed")
- await asyncio.sleep(float(NEWS_REFRESH_INTERVAL_SECONDS))
- asyncio.create_task(_loop())
- @app.get("/")
- def root():
- return {
- "status": "ok",
- "transport": "fastmcp+sse",
- "mount": "/mcp",
- "tools": [
- "get_latest_events",
- "get_events_for_entity",
- "get_event_summary",
- "detect_emerging_topics",
- "get_news_sentiment",
- "get_related_recent_entities",
- "get_capabilities",
- ],
- "refresh": {
- "enabled": NEWS_BACKGROUND_REFRESH_ENABLED,
- "interval_seconds": NEWS_REFRESH_INTERVAL_SECONDS,
- },
- "retention": {
- "lookback_hours": DEFAULT_LOOKBACK_HOURS,
- "retention_days": NEWS_RETENTION_DAYS,
- },
- "pruning": {
- "enabled": NEWS_PRUNING_ENABLED,
- "interval_hours": NEWS_PRUNE_INTERVAL_HOURS,
- },
- }
- # ------------------------------------------------------------------
- # Dashboard REST API endpoints
- # ------------------------------------------------------------------
- from fastapi.staticfiles import StaticFiles
- from fastapi.responses import JSONResponse
- app.mount("/dashboard", StaticFiles(directory="dashboard", html=True), name="dashboard")
- import logging as _log
- API_LOG = _log.getLogger("news_mcp.api")
- def _api_ok(data: dict) -> dict:
- return data
- def _api_err(exc: Exception, ctx: str) -> JSONResponse:
- API_LOG.exception(f"API error in {ctx}")
- return JSONResponse(status_code=500, content={"error": str(exc), "ctx": ctx})
- @app.get("/api/v1/health")
- def api_health():
- """Extended health + dashboard stats."""
- try:
- store = DashboardStore(_shared_store)
- stats = store.get_dashboard_stats()
- stats["version"] = _VERSION_HASH
- return stats
- except Exception as e:
- return _api_err(e, "health")
- @app.get("/api/v1/clusters")
- def api_clusters(
- topic: str | None = None,
- hours: int = 24,
- limit: int = 50,
- offset: int = 0,
- ):
- """Paginated cluster listing."""
- try:
- store = DashboardStore(_shared_store)
- result = store.get_clusters_page(topic=topic, hours=hours, limit=limit, offset=offset)
- return {"clusters": result["clusters"], "total": result["total"], "topic": topic or "all", "hours": hours}
- except Exception as e:
- return _api_err(e, f"clusters(topic={topic},hours={hours})")
- @app.get("/api/v1/sentiment-series")
- def api_sentiment_series(
- topic: str | None = None,
- hours: int = 24,
- bucket_hours: float = 1.0,
- ):
- """Sentiment time-series for Chart.js."""
- try:
- store = DashboardStore(_shared_store)
- series = store.get_sentiment_series(topic=topic, hours=hours, bucket_hours=bucket_hours)
- return {"series": series, "topic": topic or "all"}
- except Exception as e:
- return _api_err(e, f"sentiment(topic={topic})")
- @app.get("/api/v1/entities")
- def api_entities(
- hours: int = 24,
- limit: int = 30,
- ):
- """Top entity frequencies."""
- try:
- store = DashboardStore(_shared_store)
- entities = store.get_entity_frequencies(hours=hours, limit=limit)
- return {"entities": entities, "hours": hours}
- except Exception as e:
- return _api_err(e, f"entities(hours={hours})")
- @app.get("/api/v1/keywords")
- def api_keywords(
- hours: int = 24,
- limit: int = 30,
- ):
- """Top keyword frequencies (thematic descriptors, excluding terms already counted as entities)."""
- try:
- store = DashboardStore(_shared_store)
- keywords = store.get_keyword_frequencies(hours=hours, limit=limit)
- return {"keywords": keywords, "hours": hours}
- except Exception as e:
- return _api_err(e, f"keywords(hours={hours})")
- @app.get("/api/v1/cluster/{cluster_id}")
- def api_cluster_detail(cluster_id: str):
- """Full cluster detail for drill-down."""
- try:
- store = DashboardStore(_shared_store)
- detail = store.get_cluster_detail(cluster_id)
- if not detail:
- return JSONResponse(status_code=404, content={"error": "Cluster not found", "id": cluster_id})
- return detail
- except Exception as e:
- return _api_err(e, f"detail({cluster_id})")
- # ------------------------------------------------------------------
- # Feed management endpoints (toggle on/off from dashboard)
- # ------------------------------------------------------------------
- @app.get("/api/v1/feeds")
- def api_feeds():
- """List all configured feeds with enabled/disabled status."""
- try:
- store = SQLiteClusterStore(DB_PATH)
- feed_list = store.get_feed_state_list()
- configured = _configured_feed_urls()
- return {
- "feeds": feed_list,
- "configured_urls": configured,
- }
- except Exception as e:
- return _api_err(e, "feeds")
- @app.post("/api/v1/feeds/toggle")
- async def api_feed_toggle(feed_url: str = Form(), enabled: bool = Form()):
- """Toggle a feed's enabled state."""
- try:
- store = SQLiteClusterStore(DB_PATH)
- ok = store.set_feed_enabled(feed_url.strip(), enabled)
- if not ok:
- return JSONResponse(
- status_code=404,
- content={"error": f"Feed not found: {feed_url}"},
- )
- return {"ok": True, "feed_url": feed_url.strip(), "enabled": enabled}
- except Exception as e:
- return _api_err(e, f"toggle({feed_url})")
- @app.get("/health")
- def health():
- return {
- "status": "ok",
- "uptime": round(time.monotonic() - _PROCESS_STARTED_AT, 3),
- "version": _VERSION_HASH,
- }
|