mcp_server_fastmcp.py 65 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527
  1. from __future__ import annotations
  2. import asyncio
  3. import hashlib
  4. import json
  5. import logging
  6. import math
  7. import re
  8. import time
  9. from collections import Counter
  10. from datetime import datetime, timezone
  11. from pathlib import Path
  12. from fastapi import FastAPI, Form
  13. from mcp.server.fastmcp import FastMCP
  14. from mcp.server.transport_security import TransportSecuritySettings
  15. from news_mcp.config import DEFAULT_LOOKBACK_HOURS, DEFAULT_TOPICS, DB_PATH
  16. from news_mcp.config import (
  17. NEWS_FEED_URL,
  18. NEWS_FEED_URLS,
  19. NEWS_PRUNE_INTERVAL_HOURS,
  20. NEWS_PRUNING_ENABLED,
  21. NEWS_REFRESH_INTERVAL_SECONDS,
  22. NEWS_BACKGROUND_REFRESH_ENABLED,
  23. NEWS_BACKGROUND_REFRESH_ON_START,
  24. NEWS_RETENTION_DAYS,
  25. )
  26. from news_mcp.jobs.poller import refresh_clusters
  27. from news_mcp.storage.sqlite_store import SQLiteClusterStore
  28. from news_mcp.enrichment.llm_enrich import summarize_cluster_llm
  29. from news_mcp.trends_resolution import resolve_entity_via_trends
  30. from news_mcp.llm import active_llm_config
  31. from news_mcp.entity_normalize import normalize_query
  32. from news_mcp.related_entities import related_recent_entities
  33. logging.basicConfig(
  34. level=logging.INFO,
  35. format="%(asctime)s %(levelname)s %(name)s: %(message)s",
  36. )
  37. _PROCESS_STARTED_AT = time.monotonic()
  38. _PACKAGE_DIR = Path(__file__).resolve().parent
  39. def _compute_version_hash() -> str:
  40. """SHA-256 of all .py files under news_mcp/, sorted by relative path.
  41. Deterministic across machines and environments — no git dependency.
  42. Works identically in Docker and native runs.
  43. """
  44. hasher = hashlib.sha256()
  45. for f in sorted(_PACKAGE_DIR.rglob("*.py")):
  46. try:
  47. hasher.update(f.read_bytes())
  48. except OSError:
  49. continue
  50. return hasher.hexdigest()[:9]
  51. _VERSION_HASH = _compute_version_hash()
  52. mcp = FastMCP(
  53. "news-mcp",
  54. transport_security=TransportSecuritySettings(enable_dns_rebinding_protection=False),
  55. )
  56. def _cluster_entity_haystack(cluster: dict) -> list[str]:
  57. """Collect the normalized entity + keyword clues attached to a cluster."""
  58. values: list[str] = []
  59. for ent in cluster.get("entities", []) or []:
  60. values.append(str(ent).strip().lower())
  61. for res in cluster.get("entityResolutions", []) or []:
  62. if not isinstance(res, dict):
  63. continue
  64. for key in ("normalized", "canonical_label", "mid"):
  65. val = res.get(key)
  66. if val:
  67. values.append(str(val).strip().lower())
  68. # Keywords are LLM-curated thematic descriptors — include them in the
  69. # searchable haystack so entity/theme queries match on subject-matter
  70. # signals, not just named entities.
  71. for kw in cluster.get("keywords", []) or []:
  72. values.append(str(kw).strip().lower())
  73. return [v for v in values if v]
  74. def _parse_cluster_timestamp(value) -> datetime:
  75. """Parse a stored cluster timestamp.
  76. payload.timestamp is guaranteed ISO 8601 UTC (YYYY-MM-DDTHH:MM:SS+00:00)
  77. at write time. Only datetime.fromisoformat is needed — no RFC 2822 fallback.
  78. """
  79. if not value:
  80. return datetime.min.replace(tzinfo=timezone.utc)
  81. text = str(value).strip()
  82. if not text:
  83. return datetime.min.replace(tzinfo=timezone.utc)
  84. try:
  85. dt = datetime.fromisoformat(text)
  86. if dt.tzinfo is None:
  87. dt = dt.replace(tzinfo=timezone.utc)
  88. return dt.astimezone(timezone.utc)
  89. except Exception:
  90. return datetime.min.replace(tzinfo=timezone.utc)
  91. def _sort_clusters_by_recency(clusters: list[dict]) -> list[dict]:
  92. return sorted(
  93. clusters,
  94. key=lambda c: (
  95. _parse_cluster_timestamp(c.get("timestamp")),
  96. float(c.get("importance", 0.0) or 0.0),
  97. ),
  98. reverse=True,
  99. )
  100. def _tool_card(name: str, description: str, inputs: list[dict], outputs: list[str], notes: list[str] | None = None) -> dict:
  101. return {
  102. "name": name,
  103. "description": description,
  104. "inputs": inputs,
  105. "outputs": outputs,
  106. "notes": notes or [],
  107. }
  108. NEWS_TOOL_CARDS = [
  109. _tool_card(
  110. "get_feeds",
  111. "List all configured RSS feeds with their current enabled/disabled status, last fetch item count, and timestamps. Use to discover which feeds are active before investigating content.",
  112. [],
  113. ["feeds[]: {feed_key, enabled, last_hash, last_item_count, updated_at}"],
  114. ["Use this to see which feeds are currently active or disabled."],
  115. ),
  116. _tool_card(
  117. "toggle_feed",
  118. "Enable or disable a specific RSS feed by URL. Changes take effect on the next background refresh cycle. Returns the updated feed state including the new enabled flag.",
  119. [
  120. {"name": "feed_url", "type": "string", "meaning": "the feed URL to toggle"},
  121. {"name": "enabled", "type": "boolean", "meaning": "true to enable, false to disable"},
  122. ],
  123. ["ok", "feed_key", "enabled"],
  124. ["Changes take effect on the next refresh cycle, not immediately."],
  125. ),
  126. _tool_card(
  127. "get_latest_events",
  128. "Get the newest deduplicated news clusters, optionally filtered by topic. Each cluster is a group of related articles with LLM-extracted entities, thematic keywords, sentiment, importance score, and source list. Use this as the primary entry point for 'what is happening now' queries. Clusters are ordered by recency (freshest first). Set include_articles=true to include the underlying article URLs and titles for attribution.",
  129. [
  130. {"name": "topic", "type": "string", "default": "all topics", "meaning": "coarse category: crypto, macro, regulation, ai, other. Omit for all topics."},
  131. {"name": "limit", "type": "integer", "default": 5, "range": "1-20"},
  132. {"name": "include_articles", "type": "boolean", "default": False, "meaning": "include article URLs and titles in the response"},
  133. ],
  134. ["headline", "summary", "entities", "keywords", "sentiment", "importance", "sources", "timestamp", "articles?"],
  135. ["Each cluster includes both named entities (people, places, companies with optional MID/canonical_label) and LLM-curated thematic keywords (what the story is about). Use keywords to understand subject-matter themes beyond named entities."],
  136. ),
  137. _tool_card(
  138. "get_events_for_entity",
  139. "Search recent clusters for a person, place, company, theme, or keyword. Matches against both named entities (e.g. 'Bitcoin', 'Jerome Powell') and thematic keywords (e.g. 'rate cuts', 'AI regulation') using SQL-level junction-table search across the full time window — no row-limit blind spot. Returns full cluster objects with headline, summary, entities, keywords, sentiment, importance, sources, and optional articles. Use this for entity-centered or theme-centered deep dives.",
  140. [
  141. {"name": "entity", "type": "string", "meaning": "entity label, phrase, or keyword to search for. Case-insensitive."},
  142. {"name": "timeframe", "type": "string", "default": "24h", "examples": ["4h", "24h", "72h", "3d", "7d"], "meaning": "lookback window. Suffix with h (hours) or d (days)."},
  143. {"name": "limit", "type": "integer", "default": 10, "range": "1-30"},
  144. {"name": "include_articles", "type": "boolean", "default": False, "meaning": "include article URLs and titles"},
  145. ],
  146. ["headline", "summary", "entities", "keywords", "sentiment", "importance", "sources", "timestamp", "articles?"],
  147. ["Matches both named entities and thematic keywords. Use timeframe to control lookback. Results are ordered by recency."],
  148. ),
  149. _tool_card(
  150. "get_event_summary",
  151. "Produce a rich, LLM-written narrative for a single cluster by its cluster_id. Returns the headline, merged summary, key facts, entities, keywords, related entities, related keywords, topic, sentiment, importance score, and the full article list (included by default). This is the primary tool for full cluster drill-down. The cluster_id is an internal cursor — do not surface it in user-facing prose unless explicitly requested.",
  152. [
  153. {"name": "event_id", "type": "string", "meaning": "cluster_id from a previous tool call. Internal cursor — do not show to users."},
  154. {"name": "include_articles", "type": "boolean", "default": True, "meaning": "include the underlying articles list (URLs, titles, sources, timestamps)"},
  155. ],
  156. ["headline", "mergedSummary", "keyFacts", "sources", "entities", "keywords", "related_entities", "related_keywords", "topic", "sentiment", "importance", "articles"],
  157. ["Rich cluster drill-down. Returns LLM summary + cluster metadata + articles. Defaults to include articles. Use after get_latest_events or get_events_for_entity to get full context on a specific cluster."],
  158. ),
  159. _tool_card(
  160. "detect_emerging_topics",
  161. "Surface emerging entities, thematic keywords, and headline phrases that are accelerating in the recent window. Each result includes a trend_score, velocity (acceleration), recent_count, prior_count, source_count, related_entities, related_keywords, and signal_type. Signal types: entity (named entity, highest confidence), keyword (thematic descriptor), phrase (headline bigram). High velocity + high source_count = strong signal. Use timeframe to distinguish what's hot right now (4h) vs persistently trending (3d). Use around= to scope to a specific entity's neighborhood.",
  162. [
  163. {"name": "limit", "type": "integer", "default": 10, "range": "1-20"},
  164. {"name": "timeframe", "type": "string", "default": "24h", "examples": ["4h", "24h", "3d"], "meaning": "lookback window for velocity calculation"},
  165. {"name": "topic", "type": "string", "default": "all topics", "examples": ["crypto", "macro", "regulation", "ai", "other"], "meaning": "scope to a specific category"},
  166. {"name": "around", "type": "string", "default": "none", "meaning": "entity to scope results to its neighborhood (e.g. 'Bitcoin', 'Fed')"},
  167. ],
  168. ["topic", "trend_score", "velocity", "recent_count", "prior_count", "source_count", "related_entities", "related_keywords", "signal_type"],
  169. ["Use timeframe to control lookback, topic to scope to a category, around to find what's emerging near a specific entity. Check velocity and source_count to distinguish real spikes from noise. Compare results at different timeframes (e.g. 4h vs 3d) to distinguish hot-right-now from persistently trending."],
  170. ),
  171. _tool_card(
  172. "get_news_sentiment",
  173. "Estimate aggregate sentiment around an entity or keyword over a lookback window. Matches clusters via both named entities and thematic keywords using SQL-level search. Returns the sentiment label (positive/negative/neutral), numeric score (-1 to +1), and the number of matching clusters. Use after locating a cluster set or entity neighborhood to gauge overall tone.",
  174. [
  175. {"name": "entity", "type": "string", "meaning": "entity label, phrase, or keyword to analyze. Case-insensitive."},
  176. {"name": "timeframe", "type": "string", "default": "24h", "examples": ["24h", "72h", "3d"]},
  177. ],
  178. ["entity", "sentiment", "score", "cluster_count"],
  179. ["Matches clusters by entities and keywords. Use after locating a cluster set or entity neighborhood. Score is the average sentimentScore across matching clusters."],
  180. ),
  181. _tool_card(
  182. "get_related_recent_entities",
  183. "Find entities and thematic keywords commonly co-occurring with a subject in recent clusters, optionally blended with Google Trends suggestions. Returns related entities with normalized labels, canonical labels, MID (Wikidata ID when available), source counts, and co-occurrence scores. Use this to drill from a subject into its neighborhood — then feed the strongest related entities into get_events_for_entity for deeper investigation.",
  184. [
  185. {"name": "subject", "type": "string", "meaning": "canonical entity or subject phrase (e.g. 'Iran', 'Bitcoin', 'AI regulation')"},
  186. {"name": "timeframe", "type": "string", "default": "72h", "examples": ["24h", "72h", "3d"]},
  187. {"name": "limit", "type": "integer", "default": 10, "range": "1-25"},
  188. {"name": "include_trends", "type": "boolean", "default": True, "meaning": "blend local co-occurrence with Google Trends suggestions for broader coverage"},
  189. ],
  190. ["subject", "related[].normalized", "related[].canonical_label", "related[].mid", "related[].sources", "related[].scores"],
  191. ["Use this to drill from a subject into related entities and themes, then feed results into get_events_for_entity. Set include_trends=false for local-only co-occurrence."],
  192. ),
  193. _tool_card(
  194. "debug_dedup",
  195. "Inspect dedup status for an article URL. Returns whether the article is in seen_articles, its article_key, cluster_id, and (if title provided) similarity signals against the top-10 most similar existing clusters including match decisions and active thresholds.",
  196. [
  197. {"name": "url", "type": "string", "meaning": "article URL to inspect"},
  198. {"name": "title", "type": "string", "default": "none", "meaning": "article title for similarity signal computation"},
  199. ],
  200. ["seen", "article_key", "cluster_id", "first_seen", "stored_url", "similarity_signals?", "title_threshold", "jaccard_threshold"],
  201. ["Diagnostic tool. Use to understand why an article was or was not deduplicated."],
  202. ),
  203. ]
  204. NEWS_COMPOSITION_RECIPES = [
  205. {
  206. "name": "fresh-news-tail",
  207. "description": "Get the latest news clusters for a topic or across all topics.",
  208. "steps": [
  209. "get_latest_events(topic=..., limit=5-10)",
  210. "optionally get_event_summary(event_id=...) for the strongest cluster",
  211. ],
  212. "notes": ["Omit topic for all topics. Use include_articles=true when you need source attribution."],
  213. },
  214. {
  215. "name": "entity-deep-dive",
  216. "description": "Full investigation of an entity: clusters, sentiment, and narrative summary.",
  217. "steps": [
  218. "get_events_for_entity(entity=..., timeframe='24h', limit=10)",
  219. "get_news_sentiment(entity=..., timeframe='24h')",
  220. "get_event_summary(event_id=...) for the strongest cluster",
  221. ],
  222. "notes": ["Prefer canonical entity labels; the server normalizes common aliases. Increase timeframe to '3d' or '7d' for broader coverage."],
  223. },
  224. {
  225. "name": "subject-neighborhood",
  226. "description": "Expand from a subject to its related entities and themes, then investigate the strongest connections.",
  227. "steps": [
  228. "get_related_recent_entities(subject=..., include_trends=true)",
  229. "for each strong related entity, call get_events_for_entity(entity=...) to get clusters",
  230. ],
  231. "notes": ["Use this when you want a graph-like expansion around a subject. Filter related entities by source_count for quality."],
  232. },
  233. {
  234. "name": "emerging-signal",
  235. "description": "Find what's emerging and investigate the top signals.",
  236. "steps": [
  237. "detect_emerging_topics(limit=10, timeframe='24h')",
  238. "choose a high-velocity entity or keyword from results",
  239. "get_events_for_entity(entity=..., timeframe='24h')",
  240. "get_news_sentiment(entity=..., timeframe='24h')",
  241. ],
  242. "notes": ["Use timeframe='4h' for what's hot right now, '3d' for weekly trends. Check velocity and source_count to distinguish real spikes from noise. Use around= to scope to a specific entity's neighborhood."],
  243. },
  244. {
  245. "name": "full-investigation",
  246. "description": "Complete pipeline: emerging topics → entity drill-down → sentiment → neighborhood scouting.",
  247. "steps": [
  248. "detect_emerging_topics(limit=20, timeframe='3d')",
  249. "pick an emerging entity/keyword; note its related_entities and related_keywords",
  250. "get_event_summary(event_id=...) on the top cluster for full context including articles",
  251. "get_news_sentiment(entity=...) to gauge tone around the emerging topic",
  252. "detect_emerging_topics(around=<entity>, timeframe='4h') to scout its neighborhood",
  253. ],
  254. "notes": ["This is the most comprehensive recipe. Use when building a full picture of an emerging story."],
  255. },
  256. ]
  257. NEWS_AGENT_TIPS = [
  258. "Start with get_latest_events for a quick tail of what's happening. Use get_event_summary for full context on a specific cluster.",
  259. "For entity/theme questions, use get_events_for_entity first, then broaden with get_related_recent_entities.",
  260. "Treat cluster_id as an internal cursor for follow-up calls — do not surface it in user-facing prose unless explicitly requested.",
  261. "Always preserve sources and timestamps when summarizing results so users can assess recency and provenance.",
  262. "Prefer canonical entity labels when available; the server normalizes common aliases and resolves MIDs.",
  263. "Use detect_emerging_topics with multiple timeframes (4h vs 3d) to distinguish hot-right-now from persistently trending. Filter by signal_type=entity for highest-confidence signals.",
  264. "get_event_summary returns the richest single-cluster view: mergedSummary, keyFacts, entities, keywords, related_entities, related_keywords, and articles. Use it for full drill-down.",
  265. "Each cluster has both entities (named entities with identity resolution) and keywords (thematic descriptors). Keywords capture what a story is about beyond the named entities.",
  266. "For sentiment analysis, use get_news_sentiment after locating clusters. The score is averaged across all matching clusters in the timeframe.",
  267. "Use include_articles=true when you need source attribution, article URLs, or timestamps for individual articles within a cluster.",
  268. "The server detects in-place article content updates (e.g. stubs that get fleshed out) and automatically re-clusters and re-enriches them.",
  269. ]
  270. NEWS_EXAMPLE_CHAINS = [
  271. {
  272. "task": "What is happening now?",
  273. "chain": [
  274. "get_latest_events(topic='macro', limit=5)",
  275. "get_event_summary(event_id=<cluster_id>) for the most important cluster",
  276. ],
  277. },
  278. {
  279. "task": "Deep dive on an entity",
  280. "chain": [
  281. "get_events_for_entity(entity='Bitcoin', timeframe='24h', limit=10)",
  282. "get_news_sentiment(entity='Bitcoin', timeframe='24h')",
  283. "get_event_summary(event_id=<cluster_id>) for the strongest cluster",
  284. ],
  285. },
  286. {
  287. "task": "Broaden from a subject",
  288. "chain": [
  289. "get_related_recent_entities(subject='Iran', include_trends=true)",
  290. "get_events_for_entity(entity=<top_related_entity>, timeframe='72h')",
  291. ],
  292. },
  293. {
  294. "task": "Find what is emerging",
  295. "chain": [
  296. "detect_emerging_topics(limit=10, timeframe='24h')",
  297. "get_events_for_entity(entity=<emerging_entity>, timeframe='24h')",
  298. ],
  299. },
  300. {
  301. "task": "What's heating up around a specific entity",
  302. "chain": [
  303. "detect_emerging_topics(around='Fed', timeframe='4h')",
  304. "get_events_for_entity(entity=<top_emerging_neighbor>, timeframe='24h')",
  305. ],
  306. },
  307. {
  308. "task": "Full investigation pipeline",
  309. "chain": [
  310. "detect_emerging_topics(limit=20, timeframe='3d')",
  311. "pick an emerging entity; note its related_entities and related_keywords",
  312. "get_event_summary(event_id=<cluster_id>) for full context including articles",
  313. "get_news_sentiment(entity=<entity>) to gauge tone",
  314. "detect_emerging_topics(around=<entity>, timeframe='4h') to scout its neighborhood",
  315. ],
  316. },
  317. ]
  318. def _configured_feed_urls() -> list[str]:
  319. """Return the configured feed URLs from environment variables."""
  320. urls = [u.strip() for u in NEWS_FEED_URLS.split(",") if u.strip()]
  321. if not urls:
  322. urls = [NEWS_FEED_URL]
  323. return urls
  324. @mcp.tool(description="List all configured RSS feeds with their current enabled/disabled status.")
  325. async def get_feeds() -> list[dict]:
  326. """Return each feed URL with its enabled flag, last fetch stats, and timestamps."""
  327. store = SQLiteClusterStore(DB_PATH)
  328. return store.get_feed_state_list()
  329. @mcp.tool(description="Enable or disable a specific RSS feed by URL.")
  330. async def toggle_feed(feed_url: str, enabled: bool) -> dict:
  331. """Toggle a feed's active/inactive state.
  332. Changes take effect on the next background refresh cycle.
  333. Returns the updated feed state.
  334. """
  335. store = SQLiteClusterStore(DB_PATH)
  336. store.set_feed_enabled(feed_url.strip(), enabled)
  337. updated = store.get_feed_state(feed_url.strip())
  338. return {"ok": True, "feed_key": feed_url.strip(), "enabled": enabled, "details": updated}
  339. @mcp.tool(description="Debug dedup: inspect whether an article URL was already processed, which cluster it belongs to, and what similarity signals it would produce against existing clusters.")
  340. async def debug_dedup(url: str, title: str | None = None) -> dict:
  341. """Given an article URL (and optional title), report dedup status.
  342. Returns:
  343. - seen: whether the article_key is in seen_articles
  344. - article_key: the identity key derived from the URL
  345. - cluster_id: which cluster it belongs to (if seen)
  346. - similarity_signals: if title is provided, compute signals against
  347. the top-N most similar existing clusters
  348. """
  349. from news_mcp.article_identity import article_key, article_content_hash
  350. from news_mcp.dedup.cluster import _title_similarity, _normalize_title, _signals, _is_match
  351. from news_mcp.config import NEWS_EMBEDDINGS_ENABLED
  352. art = {"url": url, "title": title or ""}
  353. akey = article_key(art)
  354. result = {"url": url, "article_key": akey}
  355. store = SQLiteClusterStore(DB_PATH)
  356. with store._conn() as conn:
  357. # Check seen_articles
  358. row = conn.execute(
  359. "SELECT cluster_id, first_seen, url FROM seen_articles WHERE article_key=?",
  360. (akey,),
  361. ).fetchone()
  362. if row:
  363. result["seen"] = True
  364. result["cluster_id"] = row[0]
  365. result["first_seen"] = row[1]
  366. result["stored_url"] = row[2]
  367. else:
  368. result["seen"] = False
  369. # If title provided, compute similarity against top clusters
  370. if title:
  371. # Get recent clusters for comparison
  372. recent = store.get_latest_clusters_all_topics(ttl_hours=24, limit=20)
  373. signals_list = []
  374. for c in recent:
  375. c_title = c.get("headline", "")
  376. sigs = _signals(art, c)
  377. matched, signal_name, signal_value = _is_match(
  378. sigs, embeddings_enabled=NEWS_EMBEDDINGS_ENABLED,
  379. )
  380. signals_list.append({
  381. "cluster_id": c.get("cluster_id", "")[:12],
  382. "headline": c_title[:60],
  383. "title_sim": round(sigs["title"], 3),
  384. "jaccard": round(sigs["jaccard"], 3),
  385. "cosine": round(sigs["cosine"], 3) if sigs["cosine"] else None,
  386. "matched": matched,
  387. "match_signal": signal_name,
  388. "match_value": round(signal_value, 3) if signal_value else None,
  389. })
  390. # Sort by best title similarity
  391. signals_list.sort(key=lambda x: x["title_sim"], reverse=True)
  392. result["similarity_signals"] = signals_list[:10]
  393. result["title_threshold"] = 0.75 # DEFAULT_TITLE_THRESHOLD
  394. result["jaccard_threshold"] = 0.55 # DEFAULT_JACCARD_THRESHOLD
  395. return result
  396. @mcp.tool(description="Investigate a topic and return the newest deduplicated news clusters with entities and thematic keywords, sorted by recency.")
  397. async def get_latest_events(topic: str | None = None, limit: int = 5, include_articles: bool = False):
  398. limit = max(1, min(int(limit), 20))
  399. # When topic is omitted, search across all topics (no topic filter).
  400. # When topic is provided and matches a known topic, filter by that topic.
  401. # Otherwise treat the value as an entity-like query.
  402. topic_norm = normalize_query(topic).lower() if topic else ""
  403. resolved = resolve_entity_via_trends(topic_norm) if topic_norm else {}
  404. allowed = {t.lower() for t in DEFAULT_TOPICS}
  405. is_topic = topic_norm in allowed
  406. is_all_topics = not topic_norm
  407. query_terms = {
  408. topic_norm,
  409. str(resolved.get("normalized") or "").strip().lower(),
  410. str(resolved.get("canonical_label") or "").strip().lower(),
  411. str(resolved.get("mid") or "").strip().lower(),
  412. }
  413. query_terms = {q for q in query_terms if q}
  414. store = SQLiteClusterStore(DB_PATH)
  415. if is_all_topics:
  416. # No topic specified: return freshest clusters across all topics.
  417. clusters = store.get_latest_clusters_all_topics(ttl_hours=DEFAULT_LOOKBACK_HOURS, limit=limit)
  418. elif is_topic:
  419. # Cache-first: only refresh if we currently have no fresh clusters for this topic.
  420. clusters = store.get_latest_clusters(topic=topic_norm, ttl_hours=DEFAULT_LOOKBACK_HOURS, limit=limit)
  421. if not clusters:
  422. await refresh_clusters(topic=topic_norm, limit=200)
  423. clusters = store.get_latest_clusters(topic=topic_norm, ttl_hours=DEFAULT_LOOKBACK_HOURS, limit=limit)
  424. else:
  425. # Entity-aware mode: search recent clusters across all topics and match by
  426. # raw entity, canonical label, or MID using SQL-level junction table search.
  427. clusters = store.get_clusters_by_entity_or_keyword(
  428. query_terms=query_terms, hours=DEFAULT_LOOKBACK_HOURS, limit=limit
  429. )
  430. out = []
  431. for c in _sort_clusters_by_recency(clusters):
  432. item = {
  433. "cluster_id": c.get("cluster_id"),
  434. "headline": c.get("headline"),
  435. "summary": c.get("summary"),
  436. "entities": c.get("entities", []),
  437. "keywords": c.get("keywords", []),
  438. "sentiment": c.get("sentiment", "neutral"),
  439. "importance": c.get("importance", 0.0),
  440. "sources": c.get("sources", []),
  441. "timestamp": c.get("timestamp"),
  442. }
  443. if include_articles:
  444. # Return minimal article fields to keep responses compact.
  445. arts = c.get("articles", []) or []
  446. item["articles"] = [
  447. {
  448. "title": a.get("title"),
  449. "url": a.get("url"),
  450. "source": a.get("source"),
  451. "timestamp": a.get("timestamp"),
  452. }
  453. for a in arts
  454. if isinstance(a, dict)
  455. ]
  456. out.append(item)
  457. return out
  458. @mcp.tool(description="Investigate a person, company, place, theme, or keyword by matching entities and thematic keywords within a time window.")
  459. async def get_events_for_entity(entity: str, limit: int = 10, timeframe: str = "24h", include_articles: bool = False):
  460. limit = max(1, min(int(limit), 30))
  461. query = normalize_query(entity).strip().lower()
  462. if not query:
  463. return []
  464. resolved = resolve_entity_via_trends(query)
  465. query_terms = {
  466. query,
  467. str(resolved.get("normalized") or "").strip().lower(),
  468. str(resolved.get("canonical_label") or "").strip().lower(),
  469. str(resolved.get("mid") or "").strip().lower(),
  470. }
  471. query_terms = {q for q in query_terms if q}
  472. store = SQLiteClusterStore(DB_PATH)
  473. hours = _parse_timeframe_to_hours(timeframe)
  474. hits = store.get_clusters_by_entity_or_keyword(query_terms=query_terms, hours=hours, limit=limit)
  475. out = []
  476. for c in hits:
  477. item = {
  478. "cluster_id": c.get("cluster_id"),
  479. "headline": c.get("headline"),
  480. "summary": c.get("summary"),
  481. "entities": c.get("entities", []),
  482. "keywords": c.get("keywords", []),
  483. "sentiment": c.get("sentiment", "neutral"),
  484. "importance": c.get("importance", 0.0),
  485. "sources": c.get("sources", []),
  486. "timestamp": c.get("timestamp"),
  487. }
  488. if include_articles:
  489. arts = c.get("articles", []) or []
  490. item["articles"] = [
  491. {
  492. "title": a.get("title"),
  493. "url": a.get("url"),
  494. "source": a.get("source"),
  495. "timestamp": a.get("timestamp"),
  496. }
  497. for a in arts
  498. if isinstance(a, dict)
  499. ]
  500. out.append(item)
  501. return out
  502. @mcp.tool(description="Return entities and thematic keywords commonly co-occurring with the subject in recent clusters, optionally blended with Google Trends suggestions.")
  503. async def get_related_recent_entities(subject: str, timeframe: str = "72h", limit: int = 10, include_trends: bool = True):
  504. limit = max(1, min(int(limit), 25))
  505. hours = _parse_timeframe_to_hours(timeframe)
  506. include_trends_bool = str(include_trends).strip().lower() not in {"false", "0", "no"}
  507. store = SQLiteClusterStore(DB_PATH)
  508. result = related_recent_entities(
  509. store=store,
  510. subject=subject,
  511. timeframe_hours=hours,
  512. limit=limit,
  513. include_trends=include_trends_bool,
  514. )
  515. return result
  516. @mcp.tool(description="Investigate one cluster in depth and return a concise LLM-written explanation plus key facts, "
  517. "entities, keywords, related entities and keywords, sentiment, importance, and articles.")
  518. async def get_event_summary(event_id: str, include_articles: bool = True):
  519. store = SQLiteClusterStore(DB_PATH)
  520. # Summary cache: reuse if present within TTL.
  521. cluster = store.get_cluster_by_id(event_id)
  522. if not cluster:
  523. return {
  524. "event_id": event_id,
  525. "error": "NOT_FOUND",
  526. }
  527. cached_summary = store.get_cluster_summary(
  528. cluster_id=event_id,
  529. ttl_hours=DEFAULT_LOOKBACK_HOURS,
  530. )
  531. def _enrich(base: dict, src_cluster: dict) -> dict:
  532. base["entities"] = src_cluster.get("entities", [])
  533. base["keywords"] = src_cluster.get("keywords", [])
  534. base["topic"] = src_cluster.get("topic", "other")
  535. base["sentiment"] = src_cluster.get("sentiment", "neutral")
  536. base["sentimentScore"] = src_cluster.get("sentimentScore")
  537. base["importance"] = src_cluster.get("importance", 0.0)
  538. # Related entities: from co-occurrence in this cluster's article set
  539. resolved = src_cluster.get("entityResolutions", []) or []
  540. related_ents = []
  541. seen_ents = {str(e).strip().lower() for e in (src_cluster.get("entities", []) or [])}
  542. for res in resolved:
  543. if isinstance(res, dict):
  544. label = str(res.get("canonical_label") or res.get("normalized") or "").strip()
  545. if label and label.lower() not in seen_ents:
  546. related_ents.append(label)
  547. seen_ents.add(label.lower())
  548. base["related_entities"] = related_ents[:10]
  549. # Related keywords: from the cluster's own keywords (thematic descriptors)
  550. # plus any co-occurring keywords from recent related clusters
  551. base["related_keywords"] = _fetch_related_keywords(store, src_cluster, event_id)
  552. if include_articles:
  553. arts = src_cluster.get("articles", []) or []
  554. base["articles"] = [
  555. {
  556. "title": a.get("title"),
  557. "url": a.get("url"),
  558. "source": a.get("source"),
  559. "timestamp": a.get("timestamp"),
  560. }
  561. for a in arts
  562. if isinstance(a, dict)
  563. ]
  564. return base
  565. if cached_summary:
  566. out = {
  567. "event_id": event_id,
  568. "headline": cached_summary.get("headline"),
  569. "mergedSummary": cached_summary.get("mergedSummary"),
  570. "keyFacts": cached_summary.get("keyFacts", []),
  571. "sources": cached_summary.get("sources", []),
  572. }
  573. out = _enrich(out, cluster)
  574. return out
  575. summary = await summarize_cluster_llm(cluster)
  576. store.upsert_cluster_summary(event_id, summary)
  577. out = {
  578. "event_id": event_id,
  579. "headline": summary.get("headline"),
  580. "mergedSummary": summary.get("mergedSummary"),
  581. "keyFacts": summary.get("keyFacts", []),
  582. "sources": summary.get("sources", []),
  583. }
  584. out = _enrich(out, cluster)
  585. return out
  586. def _fetch_related_keywords(store: SQLiteClusterStore, cluster: dict, event_id: str) -> list[str]:
  587. """Find keywords that co-occur with this cluster's entities in recent clusters.
  588. This gives agents thematic context: what else was being discussed alongside
  589. the entities in this cluster during the same window.
  590. """
  591. entities = cluster.get("entities", []) or []
  592. if not entities:
  593. return []
  594. # Build a set of entity terms to search with
  595. entity_terms = set()
  596. for e in entities:
  597. entity_terms.add(str(e).strip().lower())
  598. for res in (cluster.get("entityResolutions", []) or []):
  599. if isinstance(res, dict):
  600. for key in ("normalized", "canonical_label"):
  601. val = res.get(key)
  602. if val:
  603. entity_terms.add(str(val).strip().lower())
  604. entity_terms.discard("")
  605. if not entity_terms:
  606. return []
  607. # Find recent clusters that share any entity, collect their keywords
  608. # Use payload_ts lookback of 48h for co-occurrence window
  609. from datetime import timedelta
  610. cutoff = (datetime.now(timezone.utc) - timedelta(hours=48)).isoformat()
  611. placeholders = ",".join("?" for _ in entity_terms)
  612. try:
  613. rows = store._conn().execute(
  614. f"SELECT DISTINCT c.payload FROM clusters c "
  615. f"JOIN cluster_entities ce ON c.cluster_id = ce.cluster_id "
  616. f"WHERE c.payload_ts >= ? AND c.cluster_id != ? "
  617. f"AND ce.entity IN ({placeholders}) "
  618. f"ORDER BY c.payload_ts DESC LIMIT 20",
  619. (cutoff, event_id, *entity_terms),
  620. ).fetchall()
  621. except Exception:
  622. return []
  623. kw_counter: dict[str, int] = {}
  624. cluster_kws = {str(k).strip().lower() for k in (cluster.get("keywords", []) or []) if str(k).strip()}
  625. for (payload_text,) in rows:
  626. try:
  627. c = json.loads(payload_text)
  628. except Exception:
  629. continue
  630. for kw in (c.get("keywords", []) or []):
  631. kw_norm = str(kw).strip()
  632. if not kw_norm:
  633. continue
  634. kw_key = kw_norm.lower()
  635. # Skip keywords that already appear in this cluster
  636. if kw_key in cluster_kws:
  637. continue
  638. kw_counter[kw_norm] = kw_counter.get(kw_norm, 0) + 1
  639. # Return top keywords by co-occurrence count
  640. sorted_kws = sorted(kw_counter.items(), key=lambda x: -x[1])
  641. return [kw for kw, _ in sorted_kws[:10]]
  642. @mcp.tool(description="Explore what is starting to matter: surface emerging entities, thematic keywords, and phrases from recent clusters. "
  643. "Use timeframe to control the lookback window, topic to scope to a category, and around to find what's emerging near a specific entity. "
  644. "Results include signal_type (entity / keyword / phrase) for downstream filtering.")
  645. async def detect_emerging_topics(limit: int = 10, timeframe: str = "24h", topic: str | None = None, around: str | None = None):
  646. """Surface entities and phrases that are accelerating in recent clusters.
  647. Args:
  648. limit: max results to return (1-20, default 10).
  649. timeframe: lookback window like "4h", "24h", "3d" (default "24h").
  650. topic: optional coarse topic filter ("crypto", "macro", "regulation", "ai", "other").
  651. around: optional entity — only return entities that co-occur with this entity
  652. in the recent window (e.g. "Bitcoin" to find what's emerging in Bitcoin's neighborhood).
  653. """
  654. limit = max(1, min(int(limit), 20))
  655. hours = _parse_timeframe_to_hours(timeframe)
  656. half_hours = hours / 2.0
  657. store = SQLiteClusterStore(DB_PATH)
  658. # Fetch more clusters than needed so velocity stats are meaningful even for short windows.
  659. clusters = store.get_latest_clusters_all_topics(ttl_hours=hours, limit=500)
  660. # --- optional topic filter ---
  661. if topic:
  662. topic_norm = normalize_query(topic).strip().lower()
  663. if topic_norm:
  664. clusters = [c for c in clusters if (c.get("topic") or "other").strip().lower() == topic_norm]
  665. # --- resolve the 'around' entity ---
  666. around_terms: set[str] = set()
  667. if around:
  668. around_norm = normalize_query(around).strip().lower()
  669. if around_norm:
  670. resolved = resolve_entity_via_trends(around_norm)
  671. around_terms = {
  672. around_norm,
  673. str(resolved.get("normalized") or "").strip().lower(),
  674. str(resolved.get("canonical_label") or "").strip().lower(),
  675. }
  676. around_terms.discard("")
  677. # split clusters into first-half vs second-half by timestamp
  678. # clusters are already sorted most-recent-first from the store
  679. now = datetime.now(timezone.utc)
  680. def _cluster_age_hours(c: dict) -> float:
  681. """Return the cluster's age in hours. payload.timestamp is ISO 8601 UTC guaranteed."""
  682. ts = c.get("timestamp") or c.get("last_updated")
  683. if not ts:
  684. return 0.0
  685. try:
  686. dt = datetime.fromisoformat(str(ts).strip())
  687. if dt.tzinfo is None:
  688. dt = dt.replace(tzinfo=timezone.utc)
  689. return max(0.0, (now - dt.astimezone(timezone.utc)).total_seconds() / 3600.0)
  690. except Exception:
  691. return 0.0
  692. # Generic entity filter
  693. _generic_tokens = {"news", "latest", "breaking", "update", "updates", "report", "reports"}
  694. def _is_generic_entity(ent: str) -> bool:
  695. e = str(ent).strip().lower()
  696. if not e or len(e) < 4:
  697. return True
  698. if e in _generic_tokens:
  699. return True
  700. return False
  701. # --- accumulate signals ---
  702. # recent = second half of timeframe (newer), prior = first half (older)
  703. entity_counts_recent = Counter()
  704. entity_counts_prior = Counter()
  705. entity_importance_recent = Counter()
  706. entity_sources: dict[str, set] = {} # ent -> set of source names
  707. entity_buckets: dict[str, set] = {} # ent -> set of time-bucket indices (for sustained-spike detection)
  708. entity_cooccur: dict[str, Counter] = {}
  709. phrase_counts_recent = Counter()
  710. # Keyword accumulators — same scoring pipeline as entities, but tracking
  711. # LLM-curated thematic descriptors instead of named entities.
  712. kw_counts_recent = Counter()
  713. kw_counts_prior = Counter()
  714. kw_importance_recent = Counter()
  715. kw_sources: dict[str, set] = {}
  716. kw_buckets: dict[str, set] = {}
  717. kw_cooccur: dict[str, Counter] = {}
  718. entity_kw_cooccur: dict[str, Counter] = {} # entity -> Counter of co-occurring keywords
  719. bucket_size_hours = max(1.0, hours / 6.0) # split window into ~6 buckets
  720. for c in clusters:
  721. ents_in_cluster = [e for e in (c.get("entities", []) or []) if not _is_generic_entity(e)]
  722. ents_norm = [str(e).strip().lower() for e in ents_in_cluster if str(e).strip()]
  723. # Keywords: deduplicate per cluster so a cluster with the same keyword
  724. # listed twice doesn't inflate counts.
  725. kws_in_cluster = list(dict.fromkeys(
  726. str(k).strip().lower()
  727. for k in (c.get("keywords", []) or [])
  728. if str(k).strip() and not _is_generic_entity(k)
  729. ))
  730. age_h = _cluster_age_hours(c)
  731. is_recent = age_h <= half_hours
  732. bucket_idx = int(age_h / bucket_size_hours)
  733. # --- around filter: only count clusters that mention the target entity ---
  734. if around_terms:
  735. haystack = set(ents_norm)
  736. for res in c.get("entityResolutions", []) or []:
  737. if isinstance(res, dict):
  738. for key in ("normalized", "canonical_label"):
  739. val = res.get(key)
  740. if val:
  741. haystack.add(str(val).strip().lower())
  742. if not (haystack & around_terms):
  743. continue
  744. counts = entity_counts_recent if is_recent else entity_counts_prior
  745. imp_acc = entity_importance_recent if is_recent else None # only importance from recent window
  746. for ent in ents_norm:
  747. if _is_generic_entity(ent):
  748. continue
  749. counts[ent] += 1
  750. if ent not in entity_sources:
  751. entity_sources[ent] = set()
  752. src = c.get("source") or c.get("headline", "").split(" - ")[-1] if c.get("headline") else ""
  753. if src:
  754. entity_sources[ent].add(str(src))
  755. if ent not in entity_buckets:
  756. entity_buckets[ent] = set()
  757. entity_buckets[ent].add(bucket_idx)
  758. if imp_acc is not None:
  759. try:
  760. imp_acc[ent] += float(c.get("importance", 0.0) or 0.0)
  761. except Exception:
  762. pass
  763. # --- keyword counting (same recent/prior split as entities) ---
  764. kw_counts = kw_counts_recent if is_recent else kw_counts_prior
  765. kw_imp_acc = kw_importance_recent if is_recent else None
  766. for kw in kws_in_cluster:
  767. kw_counts[kw] += 1
  768. if kw not in kw_sources:
  769. kw_sources[kw] = set()
  770. src = c.get("source") or c.get("headline", "").split(" - ")[-1] if c.get("headline") else ""
  771. if src:
  772. kw_sources[kw].add(str(src))
  773. if kw not in kw_buckets:
  774. kw_buckets[kw] = set()
  775. kw_buckets[kw].add(bucket_idx)
  776. if kw_imp_acc is not None:
  777. try:
  778. kw_imp_acc[kw] += float(c.get("importance", 0.0) or 0.0) # type: ignore[assignment]
  779. except Exception:
  780. pass
  781. # co-occurrence (only for clusters matching the around filter, if any)
  782. for i in range(len(ents_norm)):
  783. a = ents_norm[i]
  784. if _is_generic_entity(a):
  785. continue
  786. if a not in entity_cooccur:
  787. entity_cooccur[a] = Counter()
  788. for j in range(len(ents_norm)):
  789. if i == j:
  790. continue
  791. b = ents_norm[j]
  792. if _is_generic_entity(b):
  793. continue
  794. entity_cooccur[a][b] += 1
  795. # keyword co-occurrence: which keywords appear together in the same clusters
  796. for i in range(len(kws_in_cluster)):
  797. ka = kws_in_cluster[i]
  798. if ka not in kw_cooccur:
  799. kw_cooccur[ka] = Counter()
  800. for j in range(len(kws_in_cluster)):
  801. if i == j:
  802. continue
  803. kb = kws_in_cluster[j]
  804. kw_cooccur[ka][kb] += 1
  805. # also track entity<->keyword co-occurrence (bidirectional)
  806. for ent in ents_norm:
  807. if _is_generic_entity(ent):
  808. continue
  809. kw_cooccur[ka][ent] += 1
  810. # and the reverse: entity -> keyword
  811. if ent not in entity_kw_cooccur:
  812. entity_kw_cooccur[ent] = Counter()
  813. entity_kw_cooccur[ent][ka] += 1
  814. # bigram phrases (recent only)
  815. if is_recent:
  816. text = f"{c.get('headline', '')} {c.get('summary', '')}"
  817. words = re.findall(r"[A-Za-z][A-Za-z0-9\-]{2,}", text.lower())
  818. for i in range(len(words) - 1):
  819. phrase = f"{words[i]} {words[i+1]}"
  820. if len(phrase) > 6:
  821. phrase_counts_recent[phrase] += 1
  822. # --- score entities ---
  823. all_entities = set(entity_counts_recent.keys()) | set(entity_counts_prior.keys())
  824. scored = []
  825. for ent in all_entities:
  826. recent_n = entity_counts_recent.get(ent, 0)
  827. prior_n = entity_counts_prior.get(ent, 0)
  828. total_n = recent_n + prior_n
  829. if total_n < 1:
  830. continue
  831. # velocity: ratio of recent vs prior (smoothed to avoid division noise)
  832. # 0 prior → velocity = recent_n (pure emergence)
  833. # equal → velocity = 1.0 (steady)
  834. velocity = (recent_n + 0.5) / (prior_n + 0.5)
  835. # recency weight: what fraction of total hits are in the recent window
  836. recency_ratio = recent_n / total_n
  837. # source diversity: how many distinct outlets
  838. n_sources = len(entity_sources.get(ent, set()))
  839. # sustained: how many distinct time buckets did it appear in (max ~6)
  840. n_buckets = len(entity_buckets.get(ent, set()))
  841. # average importance (recent window only)
  842. avg_imp = (entity_importance_recent.get(ent, 0.0) / max(1, recent_n)) if recent_n > 0 else 0.0
  843. composed_score = (
  844. 0.35 * min(1.0, math.log1p(velocity) / math.log1p(4.0)) + # velocity (0..1, 4x = max)
  845. 0.25 * recency_ratio + # recency concentration
  846. 0.15 * min(1.0, n_sources / 5.0) + # source diversity
  847. 0.10 * min(1.0, n_buckets / 4.0) + # sustained (>1 bucket)
  848. 0.15 * min(1.0, avg_imp) # importance
  849. )
  850. related = []
  851. if ent in entity_cooccur:
  852. for other, _cnt in entity_cooccur[ent].most_common(10):
  853. if other != ent:
  854. related.append(other)
  855. related_kws = []
  856. if ent in entity_kw_cooccur:
  857. # Build a set of related entity names (lowercased) to deduplicate
  858. # keywords that are already represented in related_entities
  859. related_ent_names = {e.strip().lower() for e in related}
  860. # Also include the entity itself and its common aliases
  861. related_ent_names.add(ent.strip().lower())
  862. for kw, _cnt in entity_kw_cooccur[ent].most_common(10):
  863. kw_lower = kw.strip().lower()
  864. # Skip keywords that are just a related entity name (substring match)
  865. if any(kw_lower in ent_name or ent_name in kw_lower
  866. for ent_name in related_ent_names):
  867. continue
  868. related_kws.append(kw)
  869. if len(related_kws) >= 5:
  870. break
  871. scored.append({
  872. "topic": ent,
  873. "trend_score": min(0.99, round(composed_score, 3)),
  874. "related_entities": related[:5] if related else [ent],
  875. "related_keywords": related_kws[:5],
  876. "velocity": round(velocity, 2),
  877. "recent_count": recent_n,
  878. "prior_count": prior_n,
  879. "source_count": n_sources,
  880. "avg_importance": round(avg_imp, 3),
  881. "signal_type": "entity",
  882. })
  883. # --- score keywords (same velocity/recency/source/sustained/importance formula) ---
  884. all_keywords = set(kw_counts_recent.keys()) | set(kw_counts_prior.keys())
  885. kw_scored = []
  886. for kw in all_keywords:
  887. # Skip keywords that are already scored as entities — entity signal is
  888. # higher quality (proper nouns, resolved identities).
  889. if kw in all_entities:
  890. continue
  891. recent_n = kw_counts_recent.get(kw, 0)
  892. prior_n = kw_counts_prior.get(kw, 0)
  893. total_n = recent_n + prior_n
  894. if total_n < 1:
  895. continue
  896. velocity = (recent_n + 0.5) / (prior_n + 0.5)
  897. recency_ratio = recent_n / total_n
  898. n_sources = len(kw_sources.get(kw, set()))
  899. n_buckets = len(kw_buckets.get(kw, set()))
  900. avg_imp = (kw_importance_recent.get(kw, 0.0) / max(1, recent_n)) if recent_n > 0 else 0.0
  901. composed_score = (
  902. 0.35 * min(1.0, math.log1p(velocity) / math.log1p(4.0)) +
  903. 0.25 * recency_ratio +
  904. 0.15 * min(1.0, n_sources / 5.0) +
  905. 0.10 * min(1.0, n_buckets / 4.0) +
  906. 0.15 * min(1.0, avg_imp)
  907. )
  908. kw_related_kws = []
  909. kw_related_ents = []
  910. if kw in kw_cooccur:
  911. for other, _cnt in kw_cooccur[kw].most_common(10):
  912. if other == kw:
  913. continue
  914. # If this co-occurring term is a known entity, route to related_entities
  915. if other in all_entities:
  916. kw_related_ents.append(other)
  917. else:
  918. kw_related_kws.append(other)
  919. if len(kw_related_kws) >= 5 and len(kw_related_ents) >= 5:
  920. break
  921. kw_scored.append({
  922. "topic": kw,
  923. "trend_score": min(0.99, round(composed_score, 3)),
  924. "related_entities": kw_related_ents[:5],
  925. "related_keywords": kw_related_kws[:5],
  926. "velocity": round(velocity, 2),
  927. "recent_count": recent_n,
  928. "prior_count": prior_n,
  929. "source_count": n_sources,
  930. "avg_importance": round(avg_imp, 3),
  931. "signal_type": "keyword",
  932. })
  933. # sort keywords by score descending
  934. kw_scored.sort(key=lambda x: (-x["trend_score"], -x["velocity"], x["topic"]))
  935. # sort by composed score descending
  936. scored.sort(key=lambda x: (-x["trend_score"], -x["velocity"], x["topic"]))
  937. # --- merge: entities first, then keywords, then phrases ---
  938. emerging = list(scored) # start with entities
  939. seen_topics = {item["topic"] for item in emerging}
  940. for kw_item in kw_scored:
  941. if kw_item["topic"] not in seen_topics:
  942. emerging.append(kw_item)
  943. seen_topics.add(kw_item["topic"])
  944. # --- add phrase signals (only from recent window) ---
  945. for phrase, count in phrase_counts_recent.most_common(limit * 2):
  946. if phrase in seen_topics:
  947. continue
  948. emerging.append({
  949. "topic": phrase.title(),
  950. "trend_score": min(0.99, round(0.30 + 0.15 * min(count, 5), 2)),
  951. "related_entities": [],
  952. "related_keywords": [],
  953. "velocity": None,
  954. "recent_count": count,
  955. "prior_count": 0,
  956. "source_count": 0,
  957. "avg_importance": 0.0,
  958. "signal_type": "phrase",
  959. })
  960. seen_topics.add(phrase)
  961. if len(emerging) >= limit:
  962. break
  963. return emerging[:limit]
  964. @mcp.tool(description="Investigate whether sentiment around an entity or keyword is positive, negative, or neutral over a chosen lookback window. "
  965. "Matches clusters by both named entities and thematic keywords.")
  966. async def get_news_sentiment(entity: str, timeframe: str = "24h"):
  967. store = SQLiteClusterStore(DB_PATH)
  968. ent = normalize_query(entity).strip().lower()
  969. resolved = resolve_entity_via_trends(ent)
  970. query_terms = {
  971. ent,
  972. str(resolved.get("normalized") or "").strip().lower(),
  973. str(resolved.get("canonical_label") or "").strip().lower(),
  974. str(resolved.get("mid") or "").strip().lower(),
  975. }
  976. query_terms = {q for q in query_terms if q}
  977. if not ent:
  978. return {
  979. "entity": entity,
  980. "sentiment": "neutral",
  981. "score": 0.0,
  982. "cluster_count": 0,
  983. }
  984. # timeframe: accept '24h' or '24'
  985. tf = str(timeframe).strip().lower()
  986. try:
  987. hours = int(tf[:-1]) if tf.endswith("h") else int(tf)
  988. except Exception:
  989. hours = 24
  990. hours = max(1, min(int(hours), 168))
  991. clusters = store.get_clusters_by_entity_or_keyword(query_terms=query_terms, hours=hours, limit=500)
  992. matched = clusters
  993. if not matched:
  994. return {
  995. "entity": entity,
  996. "sentiment": "neutral",
  997. "score": 0.0,
  998. "cluster_count": 0,
  999. }
  1000. scores = []
  1001. for c in matched:
  1002. s = c.get("sentimentScore")
  1003. if s is not None:
  1004. try:
  1005. scores.append(float(s))
  1006. except Exception:
  1007. pass
  1008. avg_score = sum(scores) / len(scores) if scores else 0.0
  1009. # Keep the label aligned with the numeric score.
  1010. # Small magnitudes are treated as neutral to avoid noisy label flips.
  1011. if avg_score >= 0.15:
  1012. sentiment = "positive"
  1013. elif avg_score <= -0.15:
  1014. sentiment = "negative"
  1015. else:
  1016. sentiment = "neutral"
  1017. return {
  1018. "entity": entity,
  1019. "sentiment": sentiment,
  1020. "score": round(avg_score, 3),
  1021. "cluster_count": len(matched),
  1022. }
  1023. @mcp.tool(description="Describe the server tool surface, how tools fit together, and output conventions for downstream agents.")
  1024. async def get_capabilities():
  1025. return {
  1026. "server": {
  1027. "name": "news-mcp",
  1028. "version": "v0.5.0",
  1029. "purpose": "Deduplicated, enriched news clusters from RSS feeds. Three-layer dedup (feed hash → article URL → content hash). LLM enrichment with entities, keywords, sentiment, summaries. Content-change detection for in-place article updates.",
  1030. "output_conventions": {
  1031. "cluster_ids": "Do not surface cluster_id in user-facing prose unless explicitly requested; treat it as internal navigation metadata.",
  1032. "sources": "Always preserve and display sources when summarizing a cluster or entity result.",
  1033. "timestamps": "Mention timestamps consistently when comparing multiple clusters or when recency matters.",
  1034. "clusters": "Each cluster includes entities (named entities with optional MID/canonical_label) and keywords (thematic descriptors). Both are searchable; entities are higher-signal, keywords capture subject-matter themes.",
  1035. },
  1036. },
  1037. "tools": NEWS_TOOL_CARDS,
  1038. "recipes": NEWS_COMPOSITION_RECIPES,
  1039. "example_chains": NEWS_EXAMPLE_CHAINS,
  1040. "agent_tips": NEWS_AGENT_TIPS,
  1041. "guidance": [
  1042. "Use get_latest_events for a tail, get_events_for_entity for entity/keyword deep dives, and get_related_recent_entities for neighborhood expansion.",
  1043. "Prefer normalized/canonical entities when possible, but the server will resolve common aliases and MIDs for you.",
  1044. "When presenting results to users, summarize the cluster; avoid exposing internal IDs unless they are needed for follow-up tool calls.",
  1045. "For emerging topics, use detect_emerging_topics with timeframe and around parameters. Signal types: entity (named entity, highest quality), keyword (thematic descriptor), phrase (headline bigram). High velocity + high source_count = strong signal.",
  1046. "get_events_for_entity and get_news_sentiment match both entities and thematic keywords — use keywords when the subject is a theme rather than a named entity.",
  1047. "The server detects in-place article content updates (e.g. stubs that get fleshed out at the same URL) and automatically re-clusters and re-enriches them.",
  1048. ],
  1049. }
  1050. def _parse_timeframe_to_hours(timeframe: str) -> int:
  1051. tf = str(timeframe).strip().lower()
  1052. try:
  1053. if tf.endswith("d"):
  1054. days = int(tf[:-1])
  1055. return max(1, days * 24)
  1056. if tf.endswith("h"):
  1057. return max(1, int(tf[:-1]))
  1058. return max(1, int(tf))
  1059. except Exception:
  1060. return 24
  1061. from contextlib import asynccontextmanager
  1062. @asynccontextmanager
  1063. async def _lifespan(app: FastAPI):
  1064. asyncio.ensure_future(_background_refresh_loop())
  1065. yield
  1066. app = FastAPI(title="News MCP Server", lifespan=_lifespan)
  1067. logger = logging.getLogger("news_mcp.startup")
  1068. app.mount("/mcp", mcp.sse_app())
  1069. # Shared store — single connection pool
  1070. _shared_store = SQLiteClusterStore(DB_PATH)
  1071. _refresh_lock = asyncio.Lock()
  1072. _refresh_started = False
  1073. async def _background_refresh_loop():
  1074. """Non-blocking background refresher: prune then poll.
  1075. Protected by an async lock so a second event-loop wake-up cannot
  1076. start a parallel ingestion cycle.
  1077. """
  1078. global _refresh_started
  1079. async with _refresh_lock:
  1080. if _refresh_started:
  1081. return
  1082. _refresh_started = True
  1083. logger.info("news-mcp llm config: %s", active_llm_config())
  1084. # Prune off-thread so we do not block the event loop
  1085. prune_result = await asyncio.to_thread(
  1086. _shared_store.prune_if_due,
  1087. NEWS_PRUNING_ENABLED,
  1088. NEWS_RETENTION_DAYS,
  1089. NEWS_PRUNE_INTERVAL_HOURS,
  1090. )
  1091. logger.info("startup prune_result=%s", prune_result)
  1092. # Seed feed_state from .env — insert missing feeds, leave existing alone
  1093. feed_urls = [u.strip() for u in NEWS_FEED_URLS.split(",") if u.strip()]
  1094. if not feed_urls and NEWS_FEED_URL:
  1095. feed_urls = [NEWS_FEED_URL]
  1096. with _shared_store._conn() as conn:
  1097. for url in feed_urls:
  1098. conn.execute(
  1099. "INSERT OR IGNORE INTO feed_state(feed_key, last_hash, last_item_count, enabled, updated_at) VALUES(?, '', 0, 1, '')",
  1100. (url,),
  1101. )
  1102. logger.info("startup seeded %d feeds into feed_state", len(feed_urls))
  1103. if not NEWS_BACKGROUND_REFRESH_ENABLED:
  1104. return
  1105. async def _loop():
  1106. if not NEWS_BACKGROUND_REFRESH_ON_START:
  1107. logger.info("background refresh delayed start interval_seconds=%s", NEWS_REFRESH_INTERVAL_SECONDS)
  1108. await asyncio.sleep(float(NEWS_REFRESH_INTERVAL_SECONDS))
  1109. while True:
  1110. try:
  1111. logger.info("background refresh tick start")
  1112. await refresh_clusters(topic=None, limit=200)
  1113. logger.info("background refresh tick complete")
  1114. except Exception:
  1115. logger.exception("background refresh tick failed")
  1116. await asyncio.sleep(float(NEWS_REFRESH_INTERVAL_SECONDS))
  1117. asyncio.create_task(_loop())
  1118. @app.get("/")
  1119. def root():
  1120. return {
  1121. "status": "ok",
  1122. "transport": "fastmcp+sse",
  1123. "mount": "/mcp",
  1124. "tools": [
  1125. "get_latest_events",
  1126. "get_events_for_entity",
  1127. "get_event_summary",
  1128. "detect_emerging_topics",
  1129. "get_news_sentiment",
  1130. "get_related_recent_entities",
  1131. "get_capabilities",
  1132. ],
  1133. "refresh": {
  1134. "enabled": NEWS_BACKGROUND_REFRESH_ENABLED,
  1135. "interval_seconds": NEWS_REFRESH_INTERVAL_SECONDS,
  1136. },
  1137. "retention": {
  1138. "lookback_hours": DEFAULT_LOOKBACK_HOURS,
  1139. "retention_days": NEWS_RETENTION_DAYS,
  1140. },
  1141. "pruning": {
  1142. "enabled": NEWS_PRUNING_ENABLED,
  1143. "interval_hours": NEWS_PRUNE_INTERVAL_HOURS,
  1144. },
  1145. }
  1146. # ------------------------------------------------------------------
  1147. # Dashboard REST API endpoints
  1148. # ------------------------------------------------------------------
  1149. from fastapi.staticfiles import StaticFiles
  1150. from fastapi.responses import JSONResponse
  1151. app.mount("/dashboard", StaticFiles(directory="dashboard", html=True), name="dashboard")
  1152. import logging as _log
  1153. API_LOG = _log.getLogger("news_mcp.api")
  1154. def _api_ok(data: dict) -> dict:
  1155. return data
  1156. def _api_err(exc: Exception, ctx: str) -> JSONResponse:
  1157. API_LOG.exception(f"API error in {ctx}")
  1158. return JSONResponse(status_code=500, content={"error": str(exc), "ctx": ctx})
  1159. @app.get("/api/v1/health")
  1160. def api_health():
  1161. """Extended health + dashboard stats."""
  1162. try:
  1163. store = _shared_store
  1164. stats = store.get_dashboard_stats()
  1165. stats["version"] = _VERSION_HASH
  1166. return stats
  1167. except Exception as e:
  1168. return _api_err(e, "health")
  1169. @app.get("/api/v1/clusters")
  1170. def api_clusters(
  1171. topic: str | None = None,
  1172. hours: int = 24,
  1173. limit: int = 50,
  1174. offset: int = 0,
  1175. ):
  1176. """Paginated cluster listing."""
  1177. try:
  1178. store = _shared_store
  1179. result = store.get_clusters_page(topic=topic, hours=hours, limit=limit, offset=offset)
  1180. return {"clusters": result["clusters"], "total": result["total"], "topic": topic or "all", "hours": hours}
  1181. except Exception as e:
  1182. return _api_err(e, f"clusters(topic={topic},hours={hours})")
  1183. @app.get("/api/v1/sentiment-series")
  1184. def api_sentiment_series(
  1185. topic: str | None = None,
  1186. hours: int = 24,
  1187. bucket_hours: float = 1.0,
  1188. ):
  1189. """Sentiment time-series for Chart.js."""
  1190. try:
  1191. store = _shared_store
  1192. series = store.get_sentiment_series(topic=topic, hours=hours, bucket_hours=bucket_hours)
  1193. return {"series": series, "topic": topic or "all"}
  1194. except Exception as e:
  1195. return _api_err(e, f"sentiment(topic={topic})")
  1196. @app.get("/api/v1/entities")
  1197. def api_entities(
  1198. hours: int = 24,
  1199. limit: int = 30,
  1200. ):
  1201. """Top entity frequencies."""
  1202. try:
  1203. store = _shared_store
  1204. entities = store.get_entity_frequencies(hours=hours, limit=limit)
  1205. return {"entities": entities, "hours": hours}
  1206. except Exception as e:
  1207. return _api_err(e, f"entities(hours={hours})")
  1208. @app.get("/api/v1/keywords")
  1209. def api_keywords(
  1210. hours: int = 24,
  1211. limit: int = 30,
  1212. ):
  1213. """Top keyword frequencies (thematic descriptors, excluding terms already counted as entities)."""
  1214. try:
  1215. store = _shared_store
  1216. keywords = store.get_keyword_frequencies(hours=hours, limit=limit)
  1217. return {"keywords": keywords, "hours": hours}
  1218. except Exception as e:
  1219. return _api_err(e, f"keywords(hours={hours})")
  1220. @app.get("/api/v1/clusters/by-entity")
  1221. def api_clusters_by_entity(
  1222. entity: str,
  1223. hours: int = 168,
  1224. limit: int = 50,
  1225. offset: int = 0,
  1226. ):
  1227. """Return clusters matching an entity, filtered by event time via SQL junction table."""
  1228. try:
  1229. store = _shared_store
  1230. return store.get_clusters_by_entity(
  1231. entity=entity.strip().lower(),
  1232. hours=hours,
  1233. limit=limit,
  1234. offset=offset,
  1235. )
  1236. except Exception as e:
  1237. return _api_err(e, f"by-entity(entity={entity},hours={hours})")
  1238. @app.get("/api/v1/clusters/by-keyword")
  1239. def api_clusters_by_keyword(
  1240. keyword: str,
  1241. hours: int = 168,
  1242. limit: int = 50,
  1243. offset: int = 0,
  1244. ):
  1245. """Return clusters matching a keyword, filtered by event time via SQL junction table."""
  1246. try:
  1247. store = _shared_store
  1248. return store.get_clusters_by_keyword(
  1249. keyword=keyword.strip().lower(),
  1250. hours=hours,
  1251. limit=limit,
  1252. offset=offset,
  1253. )
  1254. except Exception as e:
  1255. return _api_err(e, f"by-keyword(keyword={keyword},hours={hours})")
  1256. @app.get("/api/v1/cluster/{cluster_id}")
  1257. def api_cluster_detail(cluster_id: str):
  1258. """Full cluster detail for drill-down."""
  1259. try:
  1260. store = _shared_store
  1261. detail = store.get_cluster_detail(cluster_id)
  1262. if not detail:
  1263. return JSONResponse(status_code=404, content={"error": "Cluster not found", "id": cluster_id})
  1264. return detail
  1265. except Exception as e:
  1266. return _api_err(e, f"detail({cluster_id})")
  1267. # ------------------------------------------------------------------
  1268. # Feed management endpoints (toggle on/off from dashboard)
  1269. # ------------------------------------------------------------------
  1270. @app.get("/api/v1/feeds")
  1271. def api_feeds():
  1272. """List all configured feeds with enabled/disabled status."""
  1273. try:
  1274. store = SQLiteClusterStore(DB_PATH)
  1275. feed_list = store.get_feed_state_list()
  1276. configured = _configured_feed_urls()
  1277. return {
  1278. "feeds": feed_list,
  1279. "configured_urls": configured,
  1280. }
  1281. except Exception as e:
  1282. return _api_err(e, "feeds")
  1283. @app.post("/api/v1/feeds/toggle")
  1284. async def api_feed_toggle(feed_url: str = Form(), enabled: bool = Form()):
  1285. """Toggle a feed's enabled state."""
  1286. try:
  1287. store = SQLiteClusterStore(DB_PATH)
  1288. ok = store.set_feed_enabled(feed_url.strip(), enabled)
  1289. if not ok:
  1290. return JSONResponse(
  1291. status_code=404,
  1292. content={"error": f"Feed not found: {feed_url}"},
  1293. )
  1294. return {"ok": True, "feed_url": feed_url.strip(), "enabled": enabled}
  1295. except Exception as e:
  1296. return _api_err(e, f"toggle({feed_url})")
  1297. # ------------------------------------------------------------------ #
  1298. # Site config (dashboard-tuneable parameters)
  1299. # ------------------------------------------------------------------ #
  1300. @app.get("/api/v1/config")
  1301. def api_config():
  1302. """All site config parameters (seeded from .env/defaults)."""
  1303. try:
  1304. from news_mcp.site_config import get_site_config
  1305. with _shared_store._conn() as conn:
  1306. rows = get_site_config(conn)
  1307. return {"config": rows}
  1308. except Exception as e:
  1309. return _api_err(e, "config")
  1310. @app.post("/api/v1/config/update")
  1311. async def api_config_update(key: str = Form(), value: str = Form()):
  1312. """Update a single config parameter at runtime."""
  1313. try:
  1314. from news_mcp.site_config import set_config_value
  1315. with _shared_store._conn() as conn:
  1316. ok = set_config_value(conn, key.strip(), value.strip())
  1317. conn.commit()
  1318. if not ok:
  1319. return JSONResponse(status_code=404, content={"error": f"Config key not found: {key}"})
  1320. return {"ok": True, "key": key.strip(), "value": value.strip()}
  1321. except Exception as e:
  1322. return _api_err(e, f"config/update({key})")
  1323. @app.post("/api/v1/config/reset")
  1324. async def api_config_reset():
  1325. """Reset all config to .env/defaults (drops and re-seeds site_config)."""
  1326. try:
  1327. from news_mcp.site_config import seed_site_config
  1328. with _shared_store._conn() as conn:
  1329. conn.execute("DELETE FROM site_config")
  1330. seeded = seed_site_config(conn)
  1331. conn.commit()
  1332. return {"ok": True, "seeded": seeded}
  1333. except Exception as e:
  1334. return _api_err(e, "config/reset")
  1335. @app.get("/health")
  1336. def health():
  1337. return {
  1338. "status": "ok",
  1339. "uptime": round(time.monotonic() - _PROCESS_STARTED_AT, 3),
  1340. "version": _VERSION_HASH,
  1341. }