mcp_server_fastmcp.py 41 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092
  1. from __future__ import annotations
  2. import asyncio
  3. import logging
  4. import math
  5. import re
  6. import time
  7. from collections import Counter
  8. from datetime import datetime, timezone
  9. from email.utils import parsedate_to_datetime
  10. from fastapi import FastAPI, Form
  11. from mcp.server.fastmcp import FastMCP
  12. from mcp.server.transport_security import TransportSecuritySettings
  13. from news_mcp.config import DEFAULT_LOOKBACK_HOURS, DEFAULT_TOPICS, DB_PATH
  14. from news_mcp.config import (
  15. NEWS_FEED_URL,
  16. NEWS_FEED_URLS,
  17. NEWS_PRUNE_INTERVAL_HOURS,
  18. NEWS_PRUNING_ENABLED,
  19. NEWS_REFRESH_INTERVAL_SECONDS,
  20. NEWS_BACKGROUND_REFRESH_ENABLED,
  21. NEWS_BACKGROUND_REFRESH_ON_START,
  22. NEWS_RETENTION_DAYS,
  23. )
  24. from news_mcp.jobs.poller import refresh_clusters
  25. from news_mcp.storage.sqlite_store import SQLiteClusterStore
  26. from news_mcp.dashboard.dashboard_store import DashboardStore
  27. from news_mcp.enrichment.llm_enrich import summarize_cluster_llm
  28. from news_mcp.trends_resolution import resolve_entity_via_trends
  29. from news_mcp.llm import active_llm_config
  30. from news_mcp.entity_normalize import normalize_query
  31. from news_mcp.related_entities import related_recent_entities
  32. logging.basicConfig(
  33. level=logging.INFO,
  34. format="%(asctime)s %(levelname)s %(name)s: %(message)s",
  35. )
  36. _PROCESS_STARTED_AT = time.monotonic()
  37. mcp = FastMCP(
  38. "news-mcp",
  39. transport_security=TransportSecuritySettings(enable_dns_rebinding_protection=False),
  40. )
  41. def _cluster_entity_haystack(cluster: dict) -> list[str]:
  42. """Collect the normalized entity clues attached to a cluster."""
  43. values: list[str] = []
  44. for ent in cluster.get("entities", []) or []:
  45. values.append(str(ent).strip().lower())
  46. for res in cluster.get("entityResolutions", []) or []:
  47. if not isinstance(res, dict):
  48. continue
  49. for key in ("normalized", "canonical_label", "mid"):
  50. val = res.get(key)
  51. if val:
  52. values.append(str(val).strip().lower())
  53. return [v for v in values if v]
  54. def _parse_cluster_timestamp(value) -> datetime:
  55. if not value:
  56. return datetime.min.replace(tzinfo=timezone.utc)
  57. text = str(value).strip()
  58. if not text:
  59. return datetime.min.replace(tzinfo=timezone.utc)
  60. try:
  61. dt = datetime.fromisoformat(text.replace("Z", "+00:00"))
  62. if dt.tzinfo is None:
  63. dt = dt.replace(tzinfo=timezone.utc)
  64. return dt.astimezone(timezone.utc)
  65. except Exception:
  66. pass
  67. try:
  68. dt = parsedate_to_datetime(text)
  69. if dt.tzinfo is None:
  70. dt = dt.replace(tzinfo=timezone.utc)
  71. return dt.astimezone(timezone.utc)
  72. except Exception:
  73. return datetime.min.replace(tzinfo=timezone.utc)
  74. def _sort_clusters_by_recency(clusters: list[dict]) -> list[dict]:
  75. return sorted(
  76. clusters,
  77. key=lambda c: (
  78. _parse_cluster_timestamp(c.get("timestamp")),
  79. float(c.get("importance", 0.0) or 0.0),
  80. ),
  81. reverse=True,
  82. )
  83. def _tool_card(name: str, description: str, inputs: list[dict], outputs: list[str], notes: list[str] | None = None) -> dict:
  84. return {
  85. "name": name,
  86. "description": description,
  87. "inputs": inputs,
  88. "outputs": outputs,
  89. "notes": notes or [],
  90. }
  91. NEWS_TOOL_CARDS = [
  92. _tool_card(
  93. "get_feeds",
  94. "List all configured RSS feeds with their enabled/disabled status.",
  95. [],
  96. ["feeds[]: {feed_key, enabled, last_hash, last_item_count, updated_at}"],
  97. ["Use this to see which feeds are currently active or disabled."],
  98. ),
  99. _tool_card(
  100. "toggle_feed",
  101. "Enable or disable a specific RSS feed by URL.",
  102. [
  103. {"name": "feed_url", "type": "string", "meaning": "the feed URL to toggle"},
  104. {"name": "enabled", "type": "boolean", "meaning": "true to enable, false to disable"},
  105. ],
  106. ["ok", "feed_key", "enabled"],
  107. ["Changes take effect on the next refresh cycle."],
  108. ),
  109. _tool_card(
  110. "get_latest_events",
  111. "Get the newest deduplicated clusters for a topic or resolved entity-like query.",
  112. [
  113. {"name": "topic", "type": "string", "default": "all topics", "meaning": "coarse category (crypto, macro, regulation, ai, other), entity-like topic, or omit for all topics"},
  114. {"name": "limit", "type": "integer", "default": 5, "range": "1-20"},
  115. {"name": "include_articles", "type": "boolean", "default": False},
  116. ],
  117. ["headline", "summary", "entities", "sentiment", "importance", "sources", "timestamp", "articles?"],
  118. ["Use when you want the freshest clusters and are willing to let the server decide topic vs entity mode."],
  119. ),
  120. _tool_card(
  121. "get_events_for_entity",
  122. "Search recent clusters for a person, place, company, or theme by entity matching.",
  123. [
  124. {"name": "entity", "type": "string", "meaning": "entity label or phrase"},
  125. {"name": "timeframe", "type": "string", "default": "24h", "examples": ["24h", "72h", "3d"]},
  126. {"name": "limit", "type": "integer", "default": 10, "range": "1-30"},
  127. {"name": "include_articles", "type": "boolean", "default": False},
  128. ],
  129. ["headline", "summary", "entities", "sentiment", "importance", "sources", "timestamp", "articles?"],
  130. ["Normalization is automatic; use this for an entity-centered deep dive."],
  131. ),
  132. _tool_card(
  133. "get_event_summary",
  134. "Produce a concise LLM-written explanation for one cluster and key facts.",
  135. [
  136. {"name": "event_id", "type": "string", "meaning": "cluster_id; do not surface in user-facing prose"},
  137. {"name": "include_articles", "type": "boolean", "default": False},
  138. ],
  139. ["headline", "mergedSummary", "keyFacts", "sources", "articles?"],
  140. ["Prefer this after you have already chosen a specific cluster to explain."],
  141. ),
  142. _tool_card(
  143. "detect_emerging_topics",
  144. "Surface entities and phrases starting to matter in the recent window.",
  145. [
  146. {"name": "limit", "type": "integer", "default": 10, "range": "1-20"},
  147. {"name": "timeframe", "type": "string", "default": "24h", "examples": ["4h", "24h", "3d"]},
  148. {"name": "topic", "type": "string", "default": "all topics", "examples": ["crypto", "macro", "regulation", "ai", "other"]},
  149. {"name": "around", "type": "string", "default": "none", "meaning": "entity to scope results to its neighborhood (e.g. \"Bitcoin\")"},
  150. ],
  151. ["topic", "trend_score", "velocity", "recent_count", "prior_count", "source_count", "related_entities", "signal_type"],
  152. ["Use timeframe to control lookback, topic to scope to a category, around to find what's emerging near a specific entity."],
  153. ),
  154. _tool_card(
  155. "get_news_sentiment",
  156. "Estimate sentiment around an entity over a lookback window.",
  157. [
  158. {"name": "entity", "type": "string"},
  159. {"name": "timeframe", "type": "string", "default": "24h"},
  160. ],
  161. ["entity", "sentiment", "score", "cluster_count"],
  162. ["Use after locating a cluster set or entity neighborhood."],
  163. ),
  164. _tool_card(
  165. "get_related_recent_entities",
  166. "Blend local co-occurrence with Google Trends related topics, while preserving mids where available.",
  167. [
  168. {"name": "subject", "type": "string", "meaning": "canonical entity or subject phrase"},
  169. {"name": "timeframe", "type": "string", "default": "72h"},
  170. {"name": "limit", "type": "integer", "default": 10, "range": "1-25"},
  171. {"name": "include_trends", "type": "boolean", "default": True},
  172. ],
  173. ["subject", "related[].normalized", "related[].canonical_label", "related[].mid", "related[].sources", "related[].scores"],
  174. ["Use this to drill from a subject into related entities, then feed those into get_events_for_entity."],
  175. ),
  176. ]
  177. NEWS_COMPOSITION_RECIPES = [
  178. {
  179. "name": "fresh-news-tail",
  180. "steps": [
  181. "get_latest_events(topic=...)",
  182. "optionally get_event_summary(event_id=...) for the strongest cluster",
  183. ],
  184. "notes": ["Best for a quick tail of what is happening now. Omit topic for all topics, or pass crypto/macro/regulation/ai/other to filter."]
  185. },
  186. {
  187. "name": "entity-deep-dive",
  188. "steps": [
  189. "get_events_for_entity(entity=...)",
  190. "get_event_summary(event_id=...)",
  191. "get_news_sentiment(entity=..., timeframe=...)",
  192. ],
  193. "notes": ["Prefer canonical entity labels when you have them; the server normalizes for you."],
  194. },
  195. {
  196. "name": "subject-neighborhood",
  197. "steps": [
  198. "get_related_recent_entities(subject=...)",
  199. "for each strong related entity, call get_events_for_entity(entity=...)",
  200. ],
  201. "notes": ["Use this when you want a graph-like expansion around a subject."]
  202. },
  203. {
  204. "name": "emerging-signal",
  205. "steps": [
  206. "detect_emerging_topics(limit=...)",
  207. "choose a topic/entity",
  208. "get_events_for_entity(entity=...)",
  209. "get_news_sentiment(entity=...)",
  210. ],
  211. "notes": ["Use timeframe to control lookback, topic to scope to a category, around to find what's emerging near a specific entity. Good for trend scouting and risk mapping."],
  212. },
  213. ]
  214. NEWS_AGENT_TIPS = [
  215. "If you need a fast answer, start with get_latest_events, then summarize the strongest cluster with get_event_summary.",
  216. "If a user asks about a person/place/company/theme, use get_events_for_entity before broadening to get_related_recent_entities.",
  217. "Treat cluster_id as an internal cursor, not user-facing output; use it only for follow-up tool calls.",
  218. "When describing clusters, keep sources and timestamps visible so the user can assess recency and provenance.",
  219. "Prefer a short chain of tools over many parallel calls unless you are building a neighborhood map or comparison table.",
  220. "For tricky names, rely on the server’s resolver instead of inventing alias rules in the client.",
  221. ]
  222. NEWS_EXAMPLE_CHAINS = [
  223. {
  224. "task": "What is happening now?",
  225. "chain": [
  226. "get_latest_events(topic=...)",
  227. "get_event_summary(event_id=...) if one cluster looks important",
  228. ],
  229. },
  230. {
  231. "task": "Deep dive on an entity",
  232. "chain": [
  233. "get_events_for_entity(entity=..., timeframe=...)",
  234. "get_news_sentiment(entity=..., timeframe=...)",
  235. "get_event_summary(event_id=...) for the strongest cluster",
  236. ],
  237. },
  238. {
  239. "task": "Broaden from a subject",
  240. "chain": [
  241. "get_related_recent_entities(subject=..., include_trends=true)",
  242. "get_events_for_entity(entity=...) for the strongest related entities",
  243. ],
  244. },
  245. {
  246. "task": "Find what is emerging",
  247. "chain": [
  248. "detect_emerging_topics(limit=...)",
  249. "get_events_for_entity(entity=...) on one or two emerging terms",
  250. ],
  251. },
  252. ]
  253. def _configured_feed_urls() -> list[str]:
  254. """Return the configured feed URLs from environment variables."""
  255. urls = [u.strip() for u in NEWS_FEED_URLS.split(",") if u.strip()]
  256. if not urls:
  257. urls = [NEWS_FEED_URL]
  258. return urls
  259. @mcp.tool(description="List all configured RSS feeds with their current enabled/disabled status.")
  260. async def get_feeds() -> list[dict]:
  261. """Return each feed URL with its enabled flag, last fetch stats, and timestamps."""
  262. store = SQLiteClusterStore(DB_PATH)
  263. return store.get_feed_state_list()
  264. @mcp.tool(description="Enable or disable a specific RSS feed by URL.")
  265. async def toggle_feed(feed_url: str, enabled: bool) -> dict:
  266. """Toggle a feed's active/inactive state.
  267. Changes take effect on the next background refresh cycle.
  268. Returns the updated feed state.
  269. """
  270. store = SQLiteClusterStore(DB_PATH)
  271. store.set_feed_enabled(feed_url.strip(), enabled)
  272. updated = store.get_feed_state(feed_url.strip())
  273. return {"ok": True, "feed_key": feed_url.strip(), "enabled": enabled, "details": updated}
  274. @mcp.tool(description="Investigate a topic and return the newest deduplicated news clusters, sorted by recency.")
  275. async def get_latest_events(topic: str | None = None, limit: int = 5, include_articles: bool = False):
  276. limit = max(1, min(int(limit), 20))
  277. # When topic is omitted, search across all topics (no topic filter).
  278. # When topic is provided and matches a known topic, filter by that topic.
  279. # Otherwise treat the value as an entity-like query.
  280. topic_norm = normalize_query(topic).lower() if topic else ""
  281. resolved = resolve_entity_via_trends(topic_norm) if topic_norm else {}
  282. allowed = {t.lower() for t in DEFAULT_TOPICS}
  283. is_topic = topic_norm in allowed
  284. is_all_topics = not topic_norm
  285. query_terms = {
  286. topic_norm,
  287. str(resolved.get("normalized") or "").strip().lower(),
  288. str(resolved.get("canonical_label") or "").strip().lower(),
  289. str(resolved.get("mid") or "").strip().lower(),
  290. }
  291. query_terms = {q for q in query_terms if q}
  292. store = SQLiteClusterStore(DB_PATH)
  293. if is_all_topics:
  294. # No topic specified: return freshest clusters across all topics.
  295. clusters = store.get_latest_clusters_all_topics(ttl_hours=DEFAULT_LOOKBACK_HOURS, limit=limit)
  296. elif is_topic:
  297. # Cache-first: only refresh if we currently have no fresh clusters for this topic.
  298. clusters = store.get_latest_clusters(topic=topic_norm, ttl_hours=DEFAULT_LOOKBACK_HOURS, limit=limit)
  299. if not clusters:
  300. await refresh_clusters(topic=topic_norm, limit=200)
  301. clusters = store.get_latest_clusters(topic=topic_norm, ttl_hours=DEFAULT_LOOKBACK_HOURS, limit=limit)
  302. else:
  303. # Entity-aware mode: search recent clusters across all topics and match by
  304. # raw entity, canonical label, or MID.
  305. clusters = store.get_latest_clusters_all_topics(ttl_hours=DEFAULT_LOOKBACK_HOURS, limit=limit * 8)
  306. filtered = []
  307. for c in clusters:
  308. haystack = _cluster_entity_haystack(c)
  309. if any(any(term in item for item in haystack) for term in query_terms):
  310. filtered.append(c)
  311. if len(filtered) >= limit:
  312. break
  313. clusters = filtered
  314. out = []
  315. for c in _sort_clusters_by_recency(clusters):
  316. item = {
  317. "cluster_id": c.get("cluster_id"),
  318. "headline": c.get("headline"),
  319. "summary": c.get("summary"),
  320. "entities": c.get("entities", []),
  321. "sentiment": c.get("sentiment", "neutral"),
  322. "importance": c.get("importance", 0.0),
  323. "sources": c.get("sources", []),
  324. "timestamp": c.get("timestamp"),
  325. }
  326. if include_articles:
  327. # Return minimal article fields to keep responses compact.
  328. arts = c.get("articles", []) or []
  329. item["articles"] = [
  330. {
  331. "title": a.get("title"),
  332. "url": a.get("url"),
  333. "source": a.get("source"),
  334. "timestamp": a.get("timestamp"),
  335. }
  336. for a in arts
  337. if isinstance(a, dict)
  338. ]
  339. out.append(item)
  340. return out
  341. @mcp.tool(description="Investigate a person, company, place, or theme by matching extracted entities within a time window.")
  342. async def get_events_for_entity(entity: str, limit: int = 10, timeframe: str = "24h", include_articles: bool = False):
  343. limit = max(1, min(int(limit), 30))
  344. query = normalize_query(entity).strip().lower()
  345. if not query:
  346. return []
  347. resolved = resolve_entity_via_trends(query)
  348. query_terms = {
  349. query,
  350. str(resolved.get("normalized") or "").strip().lower(),
  351. str(resolved.get("canonical_label") or "").strip().lower(),
  352. str(resolved.get("mid") or "").strip().lower(),
  353. }
  354. query_terms = {q for q in query_terms if q}
  355. store = SQLiteClusterStore(DB_PATH)
  356. def _match_clusters(clusters: list[dict]) -> list[dict]:
  357. hits: list[dict] = []
  358. for c in _sort_clusters_by_recency(clusters):
  359. haystack = _cluster_entity_haystack(c)
  360. if any(any(term in item for item in haystack) for term in query_terms):
  361. hits.append(c)
  362. if len(hits) >= limit:
  363. break
  364. return hits
  365. hours = _parse_timeframe_to_hours(timeframe)
  366. clusters = store.get_latest_clusters_all_topics(ttl_hours=hours, limit=max(200, limit * 10))
  367. hits = _match_clusters(clusters)
  368. out = []
  369. for c in hits:
  370. item = {
  371. "cluster_id": c.get("cluster_id"),
  372. "headline": c.get("headline"),
  373. "summary": c.get("summary"),
  374. "entities": c.get("entities", []),
  375. "sentiment": c.get("sentiment", "neutral"),
  376. "importance": c.get("importance", 0.0),
  377. "sources": c.get("sources", []),
  378. "timestamp": c.get("timestamp"),
  379. }
  380. if include_articles:
  381. arts = c.get("articles", []) or []
  382. item["articles"] = [
  383. {
  384. "title": a.get("title"),
  385. "url": a.get("url"),
  386. "source": a.get("source"),
  387. "timestamp": a.get("timestamp"),
  388. }
  389. for a in arts
  390. if isinstance(a, dict)
  391. ]
  392. out.append(item)
  393. return out
  394. @mcp.tool(description="Return entities most commonly associated with the subject in recent clusters, optionally blended with Google Trends suggestions.")
  395. async def get_related_recent_entities(subject: str, timeframe: str = "72h", limit: int = 10, include_trends: bool = True):
  396. limit = max(1, min(int(limit), 25))
  397. hours = _parse_timeframe_to_hours(timeframe)
  398. include_trends_bool = str(include_trends).strip().lower() not in {"false", "0", "no"}
  399. store = SQLiteClusterStore(DB_PATH)
  400. result = related_recent_entities(
  401. store=store,
  402. subject=subject,
  403. timeframe_hours=hours,
  404. limit=limit,
  405. include_trends=include_trends_bool,
  406. )
  407. return result
  408. @mcp.tool(description="Investigate one cluster in depth and return a concise LLM-written explanation plus key facts.")
  409. async def get_event_summary(event_id: str, include_articles: bool = False):
  410. store = SQLiteClusterStore(DB_PATH)
  411. # Summary cache: reuse if present within TTL.
  412. cached_summary = store.get_cluster_summary(
  413. cluster_id=event_id,
  414. ttl_hours=DEFAULT_LOOKBACK_HOURS,
  415. )
  416. if cached_summary:
  417. out = {
  418. "event_id": event_id,
  419. "headline": cached_summary.get("headline"),
  420. "mergedSummary": cached_summary.get("mergedSummary"),
  421. "keyFacts": cached_summary.get("keyFacts", []),
  422. "sources": cached_summary.get("sources", []),
  423. }
  424. if include_articles:
  425. cluster = store.get_cluster_by_id(event_id)
  426. arts = (cluster or {}).get("articles", []) or []
  427. out["articles"] = [
  428. {
  429. "title": a.get("title"),
  430. "url": a.get("url"),
  431. "source": a.get("source"),
  432. "timestamp": a.get("timestamp"),
  433. }
  434. for a in arts
  435. if isinstance(a, dict)
  436. ]
  437. return out
  438. cluster = store.get_cluster_by_id(event_id)
  439. if not cluster:
  440. return {
  441. "event_id": event_id,
  442. "error": "NOT_FOUND",
  443. }
  444. articles_out = None
  445. if include_articles:
  446. arts = cluster.get("articles", []) or []
  447. articles_out = [
  448. {
  449. "title": a.get("title"),
  450. "url": a.get("url"),
  451. "source": a.get("source"),
  452. "timestamp": a.get("timestamp"),
  453. }
  454. for a in arts
  455. if isinstance(a, dict)
  456. ]
  457. summary = await summarize_cluster_llm(cluster)
  458. store.upsert_cluster_summary(event_id, summary)
  459. out = {
  460. "event_id": event_id,
  461. "headline": summary.get("headline"),
  462. "mergedSummary": summary.get("mergedSummary"),
  463. "keyFacts": summary.get("keyFacts", []),
  464. "sources": summary.get("sources", []),
  465. }
  466. if include_articles:
  467. out["articles"] = articles_out or []
  468. return out
  469. @mcp.tool(description="Explore what is starting to matter: surface emerging entities and phrases from recent clusters. "
  470. "Use timeframe to control the lookback window, topic to scope to a category, and around to find what's emerging near a specific entity.")
  471. async def detect_emerging_topics(limit: int = 10, timeframe: str = "24h", topic: str | None = None, around: str | None = None):
  472. """Surface entities and phrases that are accelerating in recent clusters.
  473. Args:
  474. limit: max results to return (1-20, default 10).
  475. timeframe: lookback window like "4h", "24h", "3d" (default "24h").
  476. topic: optional coarse topic filter ("crypto", "macro", "regulation", "ai", "other").
  477. around: optional entity — only return entities that co-occur with this entity
  478. in the recent window (e.g. "Bitcoin" to find what's emerging in Bitcoin's neighborhood).
  479. """
  480. limit = max(1, min(int(limit), 20))
  481. hours = _parse_timeframe_to_hours(timeframe)
  482. half_hours = hours / 2.0
  483. store = SQLiteClusterStore(DB_PATH)
  484. # Fetch more clusters than needed so velocity stats are meaningful even for short windows.
  485. clusters = store.get_latest_clusters_all_topics(ttl_hours=hours, limit=500)
  486. # --- optional topic filter ---
  487. if topic:
  488. topic_norm = normalize_query(topic).strip().lower()
  489. if topic_norm:
  490. clusters = [c for c in clusters if (c.get("topic") or "other").strip().lower() == topic_norm]
  491. # --- resolve the 'around' entity ---
  492. around_terms: set[str] = set()
  493. if around:
  494. around_norm = normalize_query(around).strip().lower()
  495. if around_norm:
  496. resolved = resolve_entity_via_trends(around_norm)
  497. around_terms = {
  498. around_norm,
  499. str(resolved.get("normalized") or "").strip().lower(),
  500. str(resolved.get("canonical_label") or "").strip().lower(),
  501. }
  502. around_terms.discard("")
  503. # split clusters into first-half vs second-half by timestamp
  504. # clusters are already sorted most-recent-first from the store
  505. now = datetime.now(timezone.utc)
  506. def _cluster_age_hours(c: dict) -> float:
  507. """Return the cluster's age in hours (approximate, from now)."""
  508. ts = c.get("timestamp") or c.get("last_updated")
  509. if not ts:
  510. return 0.0 # treat un-dated as fresh
  511. try:
  512. s = str(ts).replace("Z", "+00:00")
  513. dt = datetime.fromisoformat(s)
  514. if dt.tzinfo is None:
  515. dt = dt.replace(tzinfo=timezone.utc)
  516. return max(0.0, (now - dt.astimezone(timezone.utc)).total_seconds() / 3600.0)
  517. except Exception:
  518. try:
  519. dt = parsedate_to_datetime(str(ts))
  520. if dt.tzinfo is None:
  521. dt = dt.replace(tzinfo=timezone.utc)
  522. return max(0.0, (now - dt.astimezone(timezone.utc)).total_seconds() / 3600.0)
  523. except Exception:
  524. return 0.0
  525. # Generic entity filter
  526. _generic_tokens = {"news", "latest", "breaking", "update", "updates", "report", "reports"}
  527. def _is_generic_entity(ent: str) -> bool:
  528. e = str(ent).strip().lower()
  529. if not e or len(e) < 4:
  530. return True
  531. if e in _generic_tokens:
  532. return True
  533. return False
  534. # --- accumulate signals ---
  535. # recent = second half of timeframe (newer), prior = first half (older)
  536. entity_counts_recent = Counter()
  537. entity_counts_prior = Counter()
  538. entity_importance_recent = Counter()
  539. entity_sources: dict[str, set] = {} # ent -> set of source names
  540. entity_buckets: dict[str, set] = {} # ent -> set of time-bucket indices (for sustained-spike detection)
  541. entity_cooccur: dict[str, Counter] = {}
  542. phrase_counts_recent = Counter()
  543. bucket_size_hours = max(1.0, hours / 6.0) # split window into ~6 buckets
  544. for c in clusters:
  545. ents_in_cluster = [e for e in (c.get("entities", []) or []) if not _is_generic_entity(e)]
  546. ents_norm = [str(e).strip().lower() for e in ents_in_cluster if str(e).strip()]
  547. age_h = _cluster_age_hours(c)
  548. is_recent = age_h <= half_hours
  549. bucket_idx = int(age_h / bucket_size_hours)
  550. # --- around filter: only count clusters that mention the target entity ---
  551. if around_terms:
  552. haystack = set(ents_norm)
  553. for res in c.get("entityResolutions", []) or []:
  554. if isinstance(res, dict):
  555. for key in ("normalized", "canonical_label"):
  556. val = res.get(key)
  557. if val:
  558. haystack.add(str(val).strip().lower())
  559. if not (haystack & around_terms):
  560. continue
  561. counts = entity_counts_recent if is_recent else entity_counts_prior
  562. imp_acc = entity_importance_recent if is_recent else None # only importance from recent window
  563. for ent in ents_norm:
  564. if _is_generic_entity(ent):
  565. continue
  566. counts[ent] += 1
  567. if ent not in entity_sources:
  568. entity_sources[ent] = set()
  569. src = c.get("source") or c.get("headline", "").split(" - ")[-1] if c.get("headline") else ""
  570. if src:
  571. entity_sources[ent].add(str(src))
  572. if ent not in entity_buckets:
  573. entity_buckets[ent] = set()
  574. entity_buckets[ent].add(bucket_idx)
  575. if imp_acc is not None:
  576. try:
  577. imp_acc[ent] += float(c.get("importance", 0.0) or 0.0)
  578. except Exception:
  579. pass
  580. # co-occurrence (only for clusters matching the around filter, if any)
  581. for i in range(len(ents_norm)):
  582. a = ents_norm[i]
  583. if _is_generic_entity(a):
  584. continue
  585. if a not in entity_cooccur:
  586. entity_cooccur[a] = Counter()
  587. for j in range(len(ents_norm)):
  588. if i == j:
  589. continue
  590. b = ents_norm[j]
  591. if _is_generic_entity(b):
  592. continue
  593. entity_cooccur[a][b] += 1
  594. # bigram phrases (recent only)
  595. if is_recent:
  596. text = f"{c.get('headline', '')} {c.get('summary', '')}"
  597. words = re.findall(r"[A-Za-z][A-Za-z0-9\-]{2,}", text.lower())
  598. for i in range(len(words) - 1):
  599. phrase = f"{words[i]} {words[i+1]}"
  600. if len(phrase) > 6:
  601. phrase_counts_recent[phrase] += 1
  602. # --- score entities ---
  603. all_entities = set(entity_counts_recent.keys()) | set(entity_counts_prior.keys())
  604. scored = []
  605. for ent in all_entities:
  606. recent_n = entity_counts_recent.get(ent, 0)
  607. prior_n = entity_counts_prior.get(ent, 0)
  608. total_n = recent_n + prior_n
  609. if total_n < 1:
  610. continue
  611. # velocity: ratio of recent vs prior (smoothed to avoid division noise)
  612. # 0 prior → velocity = recent_n (pure emergence)
  613. # equal → velocity = 1.0 (steady)
  614. velocity = (recent_n + 0.5) / (prior_n + 0.5)
  615. # recency weight: what fraction of total hits are in the recent window
  616. recency_ratio = recent_n / total_n
  617. # source diversity: how many distinct outlets
  618. n_sources = len(entity_sources.get(ent, set()))
  619. # sustained: how many distinct time buckets did it appear in (max ~6)
  620. n_buckets = len(entity_buckets.get(ent, set()))
  621. # average importance (recent window only)
  622. avg_imp = (entity_importance_recent.get(ent, 0.0) / max(1, recent_n)) if recent_n > 0 else 0.0
  623. composed_score = (
  624. 0.35 * min(1.0, math.log1p(velocity) / math.log1p(4.0)) + # velocity (0..1, 4x = max)
  625. 0.25 * recency_ratio + # recency concentration
  626. 0.15 * min(1.0, n_sources / 5.0) + # source diversity
  627. 0.10 * min(1.0, n_buckets / 4.0) + # sustained (>1 bucket)
  628. 0.15 * min(1.0, avg_imp) # importance
  629. )
  630. related = []
  631. if ent in entity_cooccur:
  632. for other, _cnt in entity_cooccur[ent].most_common(5):
  633. if other != ent:
  634. related.append(other)
  635. scored.append({
  636. "topic": ent,
  637. "trend_score": min(0.99, round(composed_score, 3)),
  638. "related_entities": related[:3] if related else [ent],
  639. "velocity": round(velocity, 2),
  640. "recent_count": recent_n,
  641. "prior_count": prior_n,
  642. "source_count": n_sources,
  643. "avg_importance": round(avg_imp, 3),
  644. "signal_type": "entity",
  645. })
  646. # sort by composed score descending
  647. scored.sort(key=lambda x: (-x["trend_score"], -x["velocity"], x["topic"]))
  648. # --- add phrase signals (only from recent window) ---
  649. emerging = list(scored) # start with entities
  650. for phrase, count in phrase_counts_recent.most_common(limit * 2):
  651. if any(item["topic"] == phrase for item in emerging):
  652. continue
  653. emerging.append({
  654. "topic": phrase.title(),
  655. "trend_score": min(0.99, round(0.30 + 0.15 * min(count, 5), 2)),
  656. "related_entities": [],
  657. "velocity": None,
  658. "recent_count": count,
  659. "prior_count": 0,
  660. "source_count": 0,
  661. "avg_importance": 0.0,
  662. "signal_type": "phrase",
  663. })
  664. if len(emerging) >= limit:
  665. break
  666. return emerging[:limit]
  667. @mcp.tool(description="Investigate whether sentiment around an entity is positive, negative, or neutral over a chosen lookback window.")
  668. async def get_news_sentiment(entity: str, timeframe: str = "24h"):
  669. store = SQLiteClusterStore(DB_PATH)
  670. ent = normalize_query(entity).strip().lower()
  671. resolved = resolve_entity_via_trends(ent)
  672. query_terms = {
  673. ent,
  674. str(resolved.get("normalized") or "").strip().lower(),
  675. str(resolved.get("canonical_label") or "").strip().lower(),
  676. str(resolved.get("mid") or "").strip().lower(),
  677. }
  678. query_terms = {q for q in query_terms if q}
  679. if not ent:
  680. return {
  681. "entity": entity,
  682. "sentiment": "neutral",
  683. "score": 0.0,
  684. "cluster_count": 0,
  685. }
  686. # timeframe: accept '24h' or '24'
  687. tf = str(timeframe).strip().lower()
  688. try:
  689. hours = int(tf[:-1]) if tf.endswith("h") else int(tf)
  690. except Exception:
  691. hours = 24
  692. hours = max(1, min(int(hours), 168))
  693. clusters = store.get_latest_clusters_all_topics(ttl_hours=hours, limit=500)
  694. matched = []
  695. for c in clusters:
  696. haystack = _cluster_entity_haystack(c)
  697. if any(any(term in item for item in haystack) for term in query_terms):
  698. matched.append(c)
  699. if not matched:
  700. return {
  701. "entity": entity,
  702. "sentiment": "neutral",
  703. "score": 0.0,
  704. "cluster_count": 0,
  705. }
  706. scores = []
  707. for c in matched:
  708. s = c.get("sentimentScore")
  709. if s is not None:
  710. try:
  711. scores.append(float(s))
  712. except Exception:
  713. pass
  714. avg_score = sum(scores) / len(scores) if scores else 0.0
  715. # Keep the label aligned with the numeric score.
  716. # Small magnitudes are treated as neutral to avoid noisy label flips.
  717. if avg_score >= 0.15:
  718. sentiment = "positive"
  719. elif avg_score <= -0.15:
  720. sentiment = "negative"
  721. else:
  722. sentiment = "neutral"
  723. return {
  724. "entity": entity,
  725. "sentiment": sentiment,
  726. "score": round(avg_score, 3),
  727. "cluster_count": len(matched),
  728. }
  729. @mcp.tool(description="Describe the server tool surface, how tools fit together, and output conventions for downstream agents.")
  730. async def get_capabilities():
  731. return {
  732. "server": {
  733. "name": "news-mcp",
  734. "purpose": "Recent news clusters, entity drill-down, sentiment, emerging topics, and related-entity expansion.",
  735. "output_conventions": {
  736. "cluster_ids": "Do not surface cluster_id in user-facing prose unless explicitly requested; treat it as internal navigation metadata.",
  737. "sources": "Always preserve and display sources when summarizing a cluster or entity result.",
  738. "timestamps": "Mention timestamps consistently when comparing multiple clusters or when recency matters.",
  739. },
  740. },
  741. "tools": NEWS_TOOL_CARDS,
  742. "recipes": NEWS_COMPOSITION_RECIPES,
  743. "example_chains": NEWS_EXAMPLE_CHAINS,
  744. "agent_tips": NEWS_AGENT_TIPS,
  745. "guidance": [
  746. "Use get_latest_events for a tail, get_events_for_entity for entity deep dives, and get_related_recent_entities for neighborhood expansion.",
  747. "Prefer normalized/canonical entities when possible, but the server will resolve common aliases and MIDs for you.",
  748. "When presenting results to users, summarize the cluster; avoid exposing internal IDs unless they are needed for follow-up tool calls.",
  749. ],
  750. }
  751. def _parse_timeframe_to_hours(timeframe: str) -> int:
  752. tf = str(timeframe).strip().lower()
  753. try:
  754. if tf.endswith("d"):
  755. days = int(tf[:-1])
  756. return max(1, days * 24)
  757. if tf.endswith("h"):
  758. return max(1, int(tf[:-1]))
  759. return max(1, int(tf))
  760. except Exception:
  761. return 24
  762. from contextlib import asynccontextmanager
  763. @asynccontextmanager
  764. async def _lifespan(app: FastAPI):
  765. asyncio.ensure_future(_background_refresh_loop())
  766. yield
  767. app = FastAPI(title="News MCP Server", lifespan=_lifespan)
  768. logger = logging.getLogger("news_mcp.startup")
  769. app.mount("/mcp", mcp.sse_app())
  770. # Shared store — single connection pool
  771. _shared_store = SQLiteClusterStore(DB_PATH)
  772. _refresh_lock = asyncio.Lock()
  773. _refresh_started = False
  774. async def _background_refresh_loop():
  775. """Non-blocking background refresher: prune then poll.
  776. Protected by an async lock so a second event-loop wake-up cannot
  777. start a parallel ingestion cycle.
  778. """
  779. global _refresh_started
  780. async with _refresh_lock:
  781. if _refresh_started:
  782. return
  783. _refresh_started = True
  784. logger.info("news-mcp llm config: %s", active_llm_config())
  785. # Prune off-thread so we do not block the event loop
  786. prune_result = await asyncio.to_thread(
  787. _shared_store.prune_if_due,
  788. NEWS_PRUNING_ENABLED,
  789. NEWS_RETENTION_DAYS,
  790. NEWS_PRUNE_INTERVAL_HOURS,
  791. )
  792. logger.info("startup prune_result=%s", prune_result)
  793. if not NEWS_BACKGROUND_REFRESH_ENABLED:
  794. return
  795. async def _loop():
  796. if not NEWS_BACKGROUND_REFRESH_ON_START:
  797. logger.info("background refresh delayed start interval_seconds=%s", NEWS_REFRESH_INTERVAL_SECONDS)
  798. await asyncio.sleep(float(NEWS_REFRESH_INTERVAL_SECONDS))
  799. while True:
  800. try:
  801. logger.info("background refresh tick start")
  802. await refresh_clusters(topic=None, limit=200)
  803. logger.info("background refresh tick complete")
  804. except Exception:
  805. logger.exception("background refresh tick failed")
  806. await asyncio.sleep(float(NEWS_REFRESH_INTERVAL_SECONDS))
  807. asyncio.create_task(_loop())
  808. @app.get("/")
  809. def root():
  810. return {
  811. "status": "ok",
  812. "transport": "fastmcp+sse",
  813. "mount": "/mcp",
  814. "tools": [
  815. "get_latest_events",
  816. "get_events_for_entity",
  817. "get_event_summary",
  818. "detect_emerging_topics",
  819. "get_news_sentiment",
  820. "get_related_recent_entities",
  821. "get_capabilities",
  822. ],
  823. "refresh": {
  824. "enabled": NEWS_BACKGROUND_REFRESH_ENABLED,
  825. "interval_seconds": NEWS_REFRESH_INTERVAL_SECONDS,
  826. },
  827. "retention": {
  828. "lookback_hours": DEFAULT_LOOKBACK_HOURS,
  829. "retention_days": NEWS_RETENTION_DAYS,
  830. },
  831. "pruning": {
  832. "enabled": NEWS_PRUNING_ENABLED,
  833. "interval_hours": NEWS_PRUNE_INTERVAL_HOURS,
  834. },
  835. }
  836. # ------------------------------------------------------------------
  837. # Dashboard REST API endpoints
  838. # ------------------------------------------------------------------
  839. from fastapi.staticfiles import StaticFiles
  840. from fastapi.responses import JSONResponse
  841. app.mount("/dashboard", StaticFiles(directory="dashboard", html=True), name="dashboard")
  842. import logging as _log
  843. API_LOG = _log.getLogger("news_mcp.api")
  844. def _api_ok(data: dict) -> dict:
  845. return data
  846. def _api_err(exc: Exception, ctx: str) -> JSONResponse:
  847. API_LOG.exception(f"API error in {ctx}")
  848. return JSONResponse(status_code=500, content={"error": str(exc), "ctx": ctx})
  849. @app.get("/api/v1/health")
  850. def api_health():
  851. """Extended health + dashboard stats."""
  852. try:
  853. store = DashboardStore(_shared_store)
  854. return store.get_dashboard_stats()
  855. except Exception as e:
  856. return _api_err(e, "health")
  857. @app.get("/api/v1/clusters")
  858. def api_clusters(
  859. topic: str | None = None,
  860. hours: int = 24,
  861. limit: int = 50,
  862. offset: int = 0,
  863. ):
  864. """Paginated cluster listing."""
  865. try:
  866. store = DashboardStore(_shared_store)
  867. clusters = store.get_clusters_page(topic=topic, hours=hours, limit=limit, offset=offset)
  868. with store._store._conn() as conn:
  869. if topic and topic != "all":
  870. count_row = conn.execute(
  871. "SELECT COUNT(*) FROM clusters WHERE updated_at >= datetime('now', ? || ' hours') AND topic = ?",
  872. (-hours, topic),
  873. ).fetchone()
  874. else:
  875. count_row = conn.execute(
  876. "SELECT COUNT(*) FROM clusters WHERE updated_at >= datetime('now', ? || ' hours')",
  877. (-hours,),
  878. ).fetchone()
  879. total = count_row[0] if count_row else 0
  880. return {"clusters": clusters, "total": total, "topic": topic or "all", "hours": hours}
  881. except Exception as e:
  882. return _api_err(e, f"clusters(topic={topic},hours={hours})")
  883. @app.get("/api/v1/sentiment-series")
  884. def api_sentiment_series(
  885. topic: str | None = None,
  886. hours: int = 24,
  887. bucket_hours: float = 1.0,
  888. ):
  889. """Sentiment time-series for Chart.js."""
  890. try:
  891. store = DashboardStore(_shared_store)
  892. series = store.get_sentiment_series(topic=topic, hours=hours, bucket_hours=bucket_hours)
  893. return {"series": series, "topic": topic or "all"}
  894. except Exception as e:
  895. return _api_err(e, f"sentiment(topic={topic})")
  896. @app.get("/api/v1/entities")
  897. def api_entities(
  898. hours: int = 24,
  899. limit: int = 30,
  900. ):
  901. """Top entity frequencies."""
  902. try:
  903. store = DashboardStore(_shared_store)
  904. entities = store.get_entity_frequencies(hours=hours, limit=limit)
  905. return {"entities": entities, "hours": hours}
  906. except Exception as e:
  907. return _api_err(e, f"entities(hours={hours})")
  908. @app.get("/api/v1/cluster/{cluster_id}")
  909. def api_cluster_detail(cluster_id: str):
  910. """Full cluster detail for drill-down."""
  911. try:
  912. store = DashboardStore(_shared_store)
  913. detail = store.get_cluster_detail(cluster_id)
  914. if not detail:
  915. return JSONResponse(status_code=404, content={"error": "Cluster not found", "id": cluster_id})
  916. return detail
  917. except Exception as e:
  918. return _api_err(e, f"detail({cluster_id})")
  919. # ------------------------------------------------------------------
  920. # Feed management endpoints (toggle on/off from dashboard)
  921. # ------------------------------------------------------------------
  922. @app.get("/api/v1/feeds")
  923. def api_feeds():
  924. """List all configured feeds with enabled/disabled status."""
  925. try:
  926. store = SQLiteClusterStore(DB_PATH)
  927. feed_list = store.get_feed_state_list()
  928. configured = _configured_feed_urls()
  929. return {
  930. "feeds": feed_list,
  931. "configured_urls": configured,
  932. }
  933. except Exception as e:
  934. return _api_err(e, "feeds")
  935. @app.post("/api/v1/feeds/toggle")
  936. async def api_feed_toggle(feed_url: str = Form(), enabled: bool = Form()):
  937. """Toggle a feed's enabled state."""
  938. try:
  939. store = SQLiteClusterStore(DB_PATH)
  940. ok = store.set_feed_enabled(feed_url.strip(), enabled)
  941. if not ok:
  942. return JSONResponse(
  943. status_code=404,
  944. content={"error": f"Feed not found: {feed_url}"},
  945. )
  946. return {"ok": True, "feed_url": feed_url.strip(), "enabled": enabled}
  947. except Exception as e:
  948. return _api_err(e, f"toggle({feed_url})")
  949. @app.get("/health")
  950. def health():
  951. return {
  952. "status": "ok",
  953. "uptime": round(time.monotonic() - _PROCESS_STARTED_AT, 3),
  954. }