mcp_server_fastmcp.py 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953
  1. from __future__ import annotations
  2. import asyncio
  3. import logging
  4. import time
  5. from collections import Counter
  6. from datetime import datetime, timezone
  7. from email.utils import parsedate_to_datetime
  8. from fastapi import FastAPI, Form
  9. from mcp.server.fastmcp import FastMCP
  10. from mcp.server.transport_security import TransportSecuritySettings
  11. from news_mcp.config import DEFAULT_LOOKBACK_HOURS, DEFAULT_TOPICS, DB_PATH
  12. from news_mcp.config import (
  13. NEWS_FEED_URL,
  14. NEWS_FEED_URLS,
  15. NEWS_PRUNE_INTERVAL_HOURS,
  16. NEWS_PRUNING_ENABLED,
  17. NEWS_REFRESH_INTERVAL_SECONDS,
  18. NEWS_BACKGROUND_REFRESH_ENABLED,
  19. NEWS_BACKGROUND_REFRESH_ON_START,
  20. NEWS_RETENTION_DAYS,
  21. )
  22. from news_mcp.jobs.poller import refresh_clusters
  23. from news_mcp.storage.sqlite_store import SQLiteClusterStore
  24. from news_mcp.dashboard.dashboard_store import DashboardStore
  25. from news_mcp.enrichment.llm_enrich import summarize_cluster_llm
  26. from news_mcp.trends_resolution import resolve_entity_via_trends
  27. from news_mcp.llm import active_llm_config
  28. from news_mcp.entity_normalize import normalize_query
  29. from news_mcp.related_entities import related_recent_entities
  30. logging.basicConfig(
  31. level=logging.INFO,
  32. format="%(asctime)s %(levelname)s %(name)s: %(message)s",
  33. )
  34. _PROCESS_STARTED_AT = time.monotonic()
  35. mcp = FastMCP(
  36. "news-mcp",
  37. transport_security=TransportSecuritySettings(enable_dns_rebinding_protection=False),
  38. )
  39. def _cluster_entity_haystack(cluster: dict) -> list[str]:
  40. """Collect the normalized entity clues attached to a cluster."""
  41. values: list[str] = []
  42. for ent in cluster.get("entities", []) or []:
  43. values.append(str(ent).strip().lower())
  44. for res in cluster.get("entityResolutions", []) or []:
  45. if not isinstance(res, dict):
  46. continue
  47. for key in ("normalized", "canonical_label", "mid"):
  48. val = res.get(key)
  49. if val:
  50. values.append(str(val).strip().lower())
  51. return [v for v in values if v]
  52. def _parse_cluster_timestamp(value) -> datetime:
  53. if not value:
  54. return datetime.min.replace(tzinfo=timezone.utc)
  55. text = str(value).strip()
  56. if not text:
  57. return datetime.min.replace(tzinfo=timezone.utc)
  58. try:
  59. dt = datetime.fromisoformat(text.replace("Z", "+00:00"))
  60. if dt.tzinfo is None:
  61. dt = dt.replace(tzinfo=timezone.utc)
  62. return dt.astimezone(timezone.utc)
  63. except Exception:
  64. pass
  65. try:
  66. dt = parsedate_to_datetime(text)
  67. if dt.tzinfo is None:
  68. dt = dt.replace(tzinfo=timezone.utc)
  69. return dt.astimezone(timezone.utc)
  70. except Exception:
  71. return datetime.min.replace(tzinfo=timezone.utc)
  72. def _sort_clusters_by_recency(clusters: list[dict]) -> list[dict]:
  73. return sorted(
  74. clusters,
  75. key=lambda c: (
  76. _parse_cluster_timestamp(c.get("timestamp")),
  77. float(c.get("importance", 0.0) or 0.0),
  78. ),
  79. reverse=True,
  80. )
  81. def _tool_card(name: str, description: str, inputs: list[dict], outputs: list[str], notes: list[str] | None = None) -> dict:
  82. return {
  83. "name": name,
  84. "description": description,
  85. "inputs": inputs,
  86. "outputs": outputs,
  87. "notes": notes or [],
  88. }
  89. NEWS_TOOL_CARDS = [
  90. _tool_card(
  91. "get_feeds",
  92. "List all configured RSS feeds with their enabled/disabled status.",
  93. [],
  94. ["feeds[]: {feed_key, enabled, last_hash, last_item_count, updated_at}"],
  95. ["Use this to see which feeds are currently active or disabled."],
  96. ),
  97. _tool_card(
  98. "toggle_feed",
  99. "Enable or disable a specific RSS feed by URL.",
  100. [
  101. {"name": "feed_url", "type": "string", "meaning": "the feed URL to toggle"},
  102. {"name": "enabled", "type": "boolean", "meaning": "true to enable, false to disable"},
  103. ],
  104. ["ok", "feed_key", "enabled"],
  105. ["Changes take effect on the next refresh cycle."],
  106. ),
  107. _tool_card(
  108. "get_latest_events",
  109. "Get the newest deduplicated clusters for a topic or resolved entity-like query.",
  110. [
  111. {"name": "topic", "type": "string", "default": "all topics", "meaning": "coarse category (crypto, macro, regulation, ai, other), entity-like topic, or omit for all topics"},
  112. {"name": "limit", "type": "integer", "default": 5, "range": "1-20"},
  113. {"name": "include_articles", "type": "boolean", "default": False},
  114. ],
  115. ["headline", "summary", "entities", "sentiment", "importance", "sources", "timestamp", "articles?"],
  116. ["Use when you want the freshest clusters and are willing to let the server decide topic vs entity mode."],
  117. ),
  118. _tool_card(
  119. "get_events_for_entity",
  120. "Search recent clusters for a person, place, company, or theme by entity matching.",
  121. [
  122. {"name": "entity", "type": "string", "meaning": "entity label or phrase"},
  123. {"name": "timeframe", "type": "string", "default": "24h", "examples": ["24h", "72h", "3d"]},
  124. {"name": "limit", "type": "integer", "default": 10, "range": "1-30"},
  125. {"name": "include_articles", "type": "boolean", "default": False},
  126. ],
  127. ["headline", "summary", "entities", "sentiment", "importance", "sources", "timestamp", "articles?"],
  128. ["Normalization is automatic; use this for an entity-centered deep dive."],
  129. ),
  130. _tool_card(
  131. "get_event_summary",
  132. "Produce a concise LLM-written explanation for one cluster and key facts.",
  133. [
  134. {"name": "event_id", "type": "string", "meaning": "cluster_id; do not surface in user-facing prose"},
  135. {"name": "include_articles", "type": "boolean", "default": False},
  136. ],
  137. ["headline", "mergedSummary", "keyFacts", "sources", "articles?"],
  138. ["Prefer this after you have already chosen a specific cluster to explain."],
  139. ),
  140. _tool_card(
  141. "detect_emerging_topics",
  142. "Surface entities and phrases starting to matter in the recent window.",
  143. [{"name": "limit", "type": "integer", "default": 10, "range": "1-20"}],
  144. ["topic", "trend_score", "related_entities", "signal_type", "count", "avg_importance"],
  145. ["Good for 'what is heating up?' style questions."],
  146. ),
  147. _tool_card(
  148. "get_news_sentiment",
  149. "Estimate sentiment around an entity over a lookback window.",
  150. [
  151. {"name": "entity", "type": "string"},
  152. {"name": "timeframe", "type": "string", "default": "24h"},
  153. ],
  154. ["entity", "sentiment", "score", "cluster_count"],
  155. ["Use after locating a cluster set or entity neighborhood."],
  156. ),
  157. _tool_card(
  158. "get_related_recent_entities",
  159. "Blend local co-occurrence with Google Trends related topics, while preserving mids where available.",
  160. [
  161. {"name": "subject", "type": "string", "meaning": "canonical entity or subject phrase"},
  162. {"name": "timeframe", "type": "string", "default": "72h"},
  163. {"name": "limit", "type": "integer", "default": 10, "range": "1-25"},
  164. {"name": "include_trends", "type": "boolean", "default": True},
  165. ],
  166. ["subject", "related[].normalized", "related[].canonical_label", "related[].mid", "related[].sources", "related[].scores"],
  167. ["Use this to drill from a subject into related entities, then feed those into get_events_for_entity."],
  168. ),
  169. ]
  170. NEWS_COMPOSITION_RECIPES = [
  171. {
  172. "name": "fresh-news-tail",
  173. "steps": [
  174. "get_latest_events(topic=...)",
  175. "optionally get_event_summary(event_id=...) for the strongest cluster",
  176. ],
  177. "notes": ["Best for a quick tail of what is happening now. Omit topic for all topics, or pass crypto/macro/regulation/ai/other to filter."]
  178. },
  179. {
  180. "name": "entity-deep-dive",
  181. "steps": [
  182. "get_events_for_entity(entity=...)",
  183. "get_event_summary(event_id=...)",
  184. "get_news_sentiment(entity=..., timeframe=...)",
  185. ],
  186. "notes": ["Prefer canonical entity labels when you have them; the server normalizes for you."],
  187. },
  188. {
  189. "name": "subject-neighborhood",
  190. "steps": [
  191. "get_related_recent_entities(subject=...)",
  192. "for each strong related entity, call get_events_for_entity(entity=...)",
  193. ],
  194. "notes": ["Use this when you want a graph-like expansion around a subject."]
  195. },
  196. {
  197. "name": "emerging-signal",
  198. "steps": [
  199. "detect_emerging_topics(limit=...)",
  200. "choose a topic/entity",
  201. "get_events_for_entity(entity=...)",
  202. "get_news_sentiment(entity=...)",
  203. ],
  204. "notes": ["Good for trend scouting and risk mapping."],
  205. },
  206. ]
  207. NEWS_AGENT_TIPS = [
  208. "If you need a fast answer, start with get_latest_events, then summarize the strongest cluster with get_event_summary.",
  209. "If a user asks about a person/place/company/theme, use get_events_for_entity before broadening to get_related_recent_entities.",
  210. "Treat cluster_id as an internal cursor, not user-facing output; use it only for follow-up tool calls.",
  211. "When describing clusters, keep sources and timestamps visible so the user can assess recency and provenance.",
  212. "Prefer a short chain of tools over many parallel calls unless you are building a neighborhood map or comparison table.",
  213. "For tricky names, rely on the server’s resolver instead of inventing alias rules in the client.",
  214. ]
  215. NEWS_EXAMPLE_CHAINS = [
  216. {
  217. "task": "What is happening now?",
  218. "chain": [
  219. "get_latest_events(topic=...)",
  220. "get_event_summary(event_id=...) if one cluster looks important",
  221. ],
  222. },
  223. {
  224. "task": "Deep dive on an entity",
  225. "chain": [
  226. "get_events_for_entity(entity=..., timeframe=...)",
  227. "get_news_sentiment(entity=..., timeframe=...)",
  228. "get_event_summary(event_id=...) for the strongest cluster",
  229. ],
  230. },
  231. {
  232. "task": "Broaden from a subject",
  233. "chain": [
  234. "get_related_recent_entities(subject=..., include_trends=true)",
  235. "get_events_for_entity(entity=...) for the strongest related entities",
  236. ],
  237. },
  238. {
  239. "task": "Find what is emerging",
  240. "chain": [
  241. "detect_emerging_topics(limit=...)",
  242. "get_events_for_entity(entity=...) on one or two emerging terms",
  243. ],
  244. },
  245. ]
  246. def _configured_feed_urls() -> list[str]:
  247. """Return the configured feed URLs from environment variables."""
  248. urls = [u.strip() for u in NEWS_FEED_URLS.split(",") if u.strip()]
  249. if not urls:
  250. urls = [NEWS_FEED_URL]
  251. return urls
  252. @mcp.tool(description="List all configured RSS feeds with their current enabled/disabled status.")
  253. async def get_feeds() -> list[dict]:
  254. """Return each feed URL with its enabled flag, last fetch stats, and timestamps."""
  255. store = SQLiteClusterStore(DB_PATH)
  256. return store.get_feed_state_list()
  257. @mcp.tool(description="Enable or disable a specific RSS feed by URL.")
  258. async def toggle_feed(feed_url: str, enabled: bool) -> dict:
  259. """Toggle a feed's active/inactive state.
  260. Changes take effect on the next background refresh cycle.
  261. Returns the updated feed state.
  262. """
  263. store = SQLiteClusterStore(DB_PATH)
  264. store.set_feed_enabled(feed_url.strip(), enabled)
  265. updated = store.get_feed_state(feed_url.strip())
  266. return {"ok": True, "feed_key": feed_url.strip(), "enabled": enabled, "details": updated}
  267. @mcp.tool(description="Investigate a topic and return the newest deduplicated news clusters, sorted by recency.")
  268. async def get_latest_events(topic: str | None = None, limit: int = 5, include_articles: bool = False):
  269. limit = max(1, min(int(limit), 20))
  270. # When topic is omitted, search across all topics (no topic filter).
  271. # When topic is provided and matches a known topic, filter by that topic.
  272. # Otherwise treat the value as an entity-like query.
  273. topic_norm = normalize_query(topic).lower() if topic else ""
  274. resolved = resolve_entity_via_trends(topic_norm) if topic_norm else {}
  275. allowed = {t.lower() for t in DEFAULT_TOPICS}
  276. is_topic = topic_norm in allowed
  277. is_all_topics = not topic_norm
  278. query_terms = {
  279. topic_norm,
  280. str(resolved.get("normalized") or "").strip().lower(),
  281. str(resolved.get("canonical_label") or "").strip().lower(),
  282. str(resolved.get("mid") or "").strip().lower(),
  283. }
  284. query_terms = {q for q in query_terms if q}
  285. store = SQLiteClusterStore(DB_PATH)
  286. if is_all_topics:
  287. # No topic specified: return freshest clusters across all topics.
  288. clusters = store.get_latest_clusters_all_topics(ttl_hours=DEFAULT_LOOKBACK_HOURS, limit=limit)
  289. elif is_topic:
  290. # Cache-first: only refresh if we currently have no fresh clusters for this topic.
  291. clusters = store.get_latest_clusters(topic=topic_norm, ttl_hours=DEFAULT_LOOKBACK_HOURS, limit=limit)
  292. if not clusters:
  293. await refresh_clusters(topic=topic_norm, limit=200)
  294. clusters = store.get_latest_clusters(topic=topic_norm, ttl_hours=DEFAULT_LOOKBACK_HOURS, limit=limit)
  295. else:
  296. # Entity-aware mode: search recent clusters across all topics and match by
  297. # raw entity, canonical label, or MID.
  298. clusters = store.get_latest_clusters_all_topics(ttl_hours=DEFAULT_LOOKBACK_HOURS, limit=limit * 8)
  299. filtered = []
  300. for c in clusters:
  301. haystack = _cluster_entity_haystack(c)
  302. if any(any(term in item for item in haystack) for term in query_terms):
  303. filtered.append(c)
  304. if len(filtered) >= limit:
  305. break
  306. clusters = filtered
  307. out = []
  308. for c in _sort_clusters_by_recency(clusters):
  309. item = {
  310. "cluster_id": c.get("cluster_id"),
  311. "headline": c.get("headline"),
  312. "summary": c.get("summary"),
  313. "entities": c.get("entities", []),
  314. "sentiment": c.get("sentiment", "neutral"),
  315. "importance": c.get("importance", 0.0),
  316. "sources": c.get("sources", []),
  317. "timestamp": c.get("timestamp"),
  318. }
  319. if include_articles:
  320. # Return minimal article fields to keep responses compact.
  321. arts = c.get("articles", []) or []
  322. item["articles"] = [
  323. {
  324. "title": a.get("title"),
  325. "url": a.get("url"),
  326. "source": a.get("source"),
  327. "timestamp": a.get("timestamp"),
  328. }
  329. for a in arts
  330. if isinstance(a, dict)
  331. ]
  332. out.append(item)
  333. return out
  334. @mcp.tool(description="Investigate a person, company, place, or theme by matching extracted entities within a time window.")
  335. async def get_events_for_entity(entity: str, limit: int = 10, timeframe: str = "24h", include_articles: bool = False):
  336. limit = max(1, min(int(limit), 30))
  337. query = normalize_query(entity).strip().lower()
  338. if not query:
  339. return []
  340. resolved = resolve_entity_via_trends(query)
  341. query_terms = {
  342. query,
  343. str(resolved.get("normalized") or "").strip().lower(),
  344. str(resolved.get("canonical_label") or "").strip().lower(),
  345. str(resolved.get("mid") or "").strip().lower(),
  346. }
  347. query_terms = {q for q in query_terms if q}
  348. store = SQLiteClusterStore(DB_PATH)
  349. def _match_clusters(clusters: list[dict]) -> list[dict]:
  350. hits: list[dict] = []
  351. for c in _sort_clusters_by_recency(clusters):
  352. haystack = _cluster_entity_haystack(c)
  353. if any(any(term in item for item in haystack) for term in query_terms):
  354. hits.append(c)
  355. if len(hits) >= limit:
  356. break
  357. return hits
  358. hours = _parse_timeframe_to_hours(timeframe)
  359. clusters = store.get_latest_clusters_all_topics(ttl_hours=hours, limit=max(200, limit * 10))
  360. hits = _match_clusters(clusters)
  361. out = []
  362. for c in hits:
  363. item = {
  364. "cluster_id": c.get("cluster_id"),
  365. "headline": c.get("headline"),
  366. "summary": c.get("summary"),
  367. "entities": c.get("entities", []),
  368. "sentiment": c.get("sentiment", "neutral"),
  369. "importance": c.get("importance", 0.0),
  370. "sources": c.get("sources", []),
  371. "timestamp": c.get("timestamp"),
  372. }
  373. if include_articles:
  374. arts = c.get("articles", []) or []
  375. item["articles"] = [
  376. {
  377. "title": a.get("title"),
  378. "url": a.get("url"),
  379. "source": a.get("source"),
  380. "timestamp": a.get("timestamp"),
  381. }
  382. for a in arts
  383. if isinstance(a, dict)
  384. ]
  385. out.append(item)
  386. return out
  387. @mcp.tool(description="Return entities most commonly associated with the subject in recent clusters, optionally blended with Google Trends suggestions.")
  388. async def get_related_recent_entities(subject: str, timeframe: str = "72h", limit: int = 10, include_trends: bool = True):
  389. limit = max(1, min(int(limit), 25))
  390. hours = _parse_timeframe_to_hours(timeframe)
  391. include_trends_bool = str(include_trends).strip().lower() not in {"false", "0", "no"}
  392. store = SQLiteClusterStore(DB_PATH)
  393. result = related_recent_entities(
  394. store=store,
  395. subject=subject,
  396. timeframe_hours=hours,
  397. limit=limit,
  398. include_trends=include_trends_bool,
  399. )
  400. return result
  401. @mcp.tool(description="Investigate one cluster in depth and return a concise LLM-written explanation plus key facts.")
  402. async def get_event_summary(event_id: str, include_articles: bool = False):
  403. store = SQLiteClusterStore(DB_PATH)
  404. # Summary cache: reuse if present within TTL.
  405. cached_summary = store.get_cluster_summary(
  406. cluster_id=event_id,
  407. ttl_hours=DEFAULT_LOOKBACK_HOURS,
  408. )
  409. if cached_summary:
  410. out = {
  411. "event_id": event_id,
  412. "headline": cached_summary.get("headline"),
  413. "mergedSummary": cached_summary.get("mergedSummary"),
  414. "keyFacts": cached_summary.get("keyFacts", []),
  415. "sources": cached_summary.get("sources", []),
  416. }
  417. if include_articles:
  418. cluster = store.get_cluster_by_id(event_id)
  419. arts = (cluster or {}).get("articles", []) or []
  420. out["articles"] = [
  421. {
  422. "title": a.get("title"),
  423. "url": a.get("url"),
  424. "source": a.get("source"),
  425. "timestamp": a.get("timestamp"),
  426. }
  427. for a in arts
  428. if isinstance(a, dict)
  429. ]
  430. return out
  431. cluster = store.get_cluster_by_id(event_id)
  432. if not cluster:
  433. return {
  434. "event_id": event_id,
  435. "error": "NOT_FOUND",
  436. }
  437. articles_out = None
  438. if include_articles:
  439. arts = cluster.get("articles", []) or []
  440. articles_out = [
  441. {
  442. "title": a.get("title"),
  443. "url": a.get("url"),
  444. "source": a.get("source"),
  445. "timestamp": a.get("timestamp"),
  446. }
  447. for a in arts
  448. if isinstance(a, dict)
  449. ]
  450. summary = await summarize_cluster_llm(cluster)
  451. store.upsert_cluster_summary(event_id, summary)
  452. out = {
  453. "event_id": event_id,
  454. "headline": summary.get("headline"),
  455. "mergedSummary": summary.get("mergedSummary"),
  456. "keyFacts": summary.get("keyFacts", []),
  457. "sources": summary.get("sources", []),
  458. }
  459. if include_articles:
  460. out["articles"] = articles_out or []
  461. return out
  462. @mcp.tool(description="Explore what is starting to matter: surface emerging entities and phrases from recent clusters.")
  463. async def detect_emerging_topics(limit: int = 10):
  464. limit = max(1, min(int(limit), 20))
  465. store = SQLiteClusterStore(DB_PATH)
  466. clusters = store.get_latest_clusters_all_topics(ttl_hours=DEFAULT_LOOKBACK_HOURS, limit=200)
  467. import re
  468. entity_counts = Counter()
  469. entity_importance_sum = Counter()
  470. # co-occurrence: ent -> other_ent -> count
  471. entity_cooccur = {}
  472. phrase_counts = Counter()
  473. topic_counts = Counter()
  474. # Very light heuristics to reduce “meta entities” dominating emerging topics.
  475. # Keep it conservative: only skip obvious boilerplate.
  476. def _is_generic_entity(ent: str) -> bool:
  477. e = str(ent).strip().lower()
  478. if not e:
  479. return True
  480. if len(e) < 4:
  481. return True
  482. # common outlet-ish / meta-ish tokens
  483. if e in {"news", "latest", "breaking"}:
  484. return True
  485. return False
  486. for c in clusters:
  487. topic_counts[c.get("topic", "other")] += 1
  488. ents_in_cluster = [e for e in (c.get("entities", []) or []) if not _is_generic_entity(e)]
  489. ents_in_cluster_norm = [str(e).strip().lower() for e in ents_in_cluster if str(e).strip()]
  490. for ent in ents_in_cluster_norm:
  491. if _is_generic_entity(ent):
  492. continue
  493. entity_counts[ent] += 1
  494. try:
  495. entity_importance_sum[ent] += float(c.get("importance", 0.0) or 0.0)
  496. except Exception:
  497. pass
  498. # update co-occurrence counts
  499. for i in range(len(ents_in_cluster_norm)):
  500. a = ents_in_cluster_norm[i]
  501. if not a:
  502. continue
  503. entity_cooccur.setdefault(a, Counter())
  504. for j in range(len(ents_in_cluster_norm)):
  505. if i == j:
  506. continue
  507. b = ents_in_cluster_norm[j]
  508. if not b:
  509. continue
  510. entity_cooccur[a][b] += 1
  511. text = f"{c.get('headline','')} {c.get('summary','')}"
  512. words = [w for w in re.findall(r"[A-Za-z][A-Za-z0-9\-]{2,}", text.lower())]
  513. for i in range(len(words) - 1):
  514. phrase = f"{words[i]} {words[i+1]}"
  515. if len(phrase) > 6:
  516. phrase_counts[phrase] += 1
  517. emerging = []
  518. # Combine frequency with average importance so “big signal” rises over pure repetition.
  519. for ent, count in entity_counts.most_common(limit):
  520. avg_imp = entity_importance_sum[ent] / max(1, count)
  521. # avg_imp is typically 0..~1; keep score bounded.
  522. trend_score = 0.25 + 0.40 * min(1.0, avg_imp) + 0.08 * min(6.0, float(count))
  523. related = []
  524. for other, _cnt in (entity_cooccur.get(ent) or Counter()).most_common(3):
  525. # avoid returning the entity itself (shouldn't happen, but be safe)
  526. if other != ent:
  527. related.append(other)
  528. emerging.append({
  529. "topic": ent,
  530. "trend_score": min(0.99, round(trend_score, 2)),
  531. "related_entities": related if related else [ent],
  532. "signal_type": "entity",
  533. "count": count,
  534. "avg_importance": round(avg_imp, 3),
  535. })
  536. for phrase, count in phrase_counts.most_common(limit * 2):
  537. if any(item["topic"] == phrase for item in emerging):
  538. continue
  539. emerging.append({
  540. "topic": phrase.title(),
  541. "trend_score": min(0.99, round(0.20 + 0.10 * count, 2)),
  542. "related_entities": [],
  543. "signal_type": "phrase",
  544. "count": count,
  545. })
  546. if len(emerging) >= limit:
  547. break
  548. return emerging[:limit]
  549. @mcp.tool(description="Investigate whether sentiment around an entity is positive, negative, or neutral over a chosen lookback window.")
  550. async def get_news_sentiment(entity: str, timeframe: str = "24h"):
  551. store = SQLiteClusterStore(DB_PATH)
  552. ent = normalize_query(entity).strip().lower()
  553. resolved = resolve_entity_via_trends(ent)
  554. query_terms = {
  555. ent,
  556. str(resolved.get("normalized") or "").strip().lower(),
  557. str(resolved.get("canonical_label") or "").strip().lower(),
  558. str(resolved.get("mid") or "").strip().lower(),
  559. }
  560. query_terms = {q for q in query_terms if q}
  561. if not ent:
  562. return {
  563. "entity": entity,
  564. "sentiment": "neutral",
  565. "score": 0.0,
  566. "cluster_count": 0,
  567. }
  568. # timeframe: accept '24h' or '24'
  569. tf = str(timeframe).strip().lower()
  570. try:
  571. hours = int(tf[:-1]) if tf.endswith("h") else int(tf)
  572. except Exception:
  573. hours = 24
  574. hours = max(1, min(int(hours), 168))
  575. clusters = store.get_latest_clusters_all_topics(ttl_hours=hours, limit=500)
  576. matched = []
  577. for c in clusters:
  578. haystack = _cluster_entity_haystack(c)
  579. if any(any(term in item for item in haystack) for term in query_terms):
  580. matched.append(c)
  581. if not matched:
  582. return {
  583. "entity": entity,
  584. "sentiment": "neutral",
  585. "score": 0.0,
  586. "cluster_count": 0,
  587. }
  588. scores = []
  589. for c in matched:
  590. s = c.get("sentimentScore")
  591. if s is not None:
  592. try:
  593. scores.append(float(s))
  594. except Exception:
  595. pass
  596. avg_score = sum(scores) / len(scores) if scores else 0.0
  597. # Keep the label aligned with the numeric score.
  598. # Small magnitudes are treated as neutral to avoid noisy label flips.
  599. if avg_score >= 0.15:
  600. sentiment = "positive"
  601. elif avg_score <= -0.15:
  602. sentiment = "negative"
  603. else:
  604. sentiment = "neutral"
  605. return {
  606. "entity": entity,
  607. "sentiment": sentiment,
  608. "score": round(avg_score, 3),
  609. "cluster_count": len(matched),
  610. }
  611. @mcp.tool(description="Describe the server tool surface, how tools fit together, and output conventions for downstream agents.")
  612. async def get_capabilities():
  613. return {
  614. "server": {
  615. "name": "news-mcp",
  616. "purpose": "Recent news clusters, entity drill-down, sentiment, emerging topics, and related-entity expansion.",
  617. "output_conventions": {
  618. "cluster_ids": "Do not surface cluster_id in user-facing prose unless explicitly requested; treat it as internal navigation metadata.",
  619. "sources": "Always preserve and display sources when summarizing a cluster or entity result.",
  620. "timestamps": "Mention timestamps consistently when comparing multiple clusters or when recency matters.",
  621. },
  622. },
  623. "tools": NEWS_TOOL_CARDS,
  624. "recipes": NEWS_COMPOSITION_RECIPES,
  625. "example_chains": NEWS_EXAMPLE_CHAINS,
  626. "agent_tips": NEWS_AGENT_TIPS,
  627. "guidance": [
  628. "Use get_latest_events for a tail, get_events_for_entity for entity deep dives, and get_related_recent_entities for neighborhood expansion.",
  629. "Prefer normalized/canonical entities when possible, but the server will resolve common aliases and MIDs for you.",
  630. "When presenting results to users, summarize the cluster; avoid exposing internal IDs unless they are needed for follow-up tool calls.",
  631. ],
  632. }
  633. def _parse_timeframe_to_hours(timeframe: str) -> int:
  634. tf = str(timeframe).strip().lower()
  635. try:
  636. if tf.endswith("d"):
  637. days = int(tf[:-1])
  638. return max(1, days * 24)
  639. if tf.endswith("h"):
  640. return max(1, int(tf[:-1]))
  641. return max(1, int(tf))
  642. except Exception:
  643. return 24
  644. from contextlib import asynccontextmanager
  645. @asynccontextmanager
  646. async def _lifespan(app: FastAPI):
  647. asyncio.ensure_future(_background_refresh_loop())
  648. yield
  649. app = FastAPI(title="News MCP Server", lifespan=_lifespan)
  650. logger = logging.getLogger("news_mcp.startup")
  651. app.mount("/mcp", mcp.sse_app())
  652. # Shared store — single connection pool
  653. _shared_store = SQLiteClusterStore(DB_PATH)
  654. _refresh_lock = asyncio.Lock()
  655. _refresh_started = False
  656. async def _background_refresh_loop():
  657. """Non-blocking background refresher: prune then poll.
  658. Protected by an async lock so a second event-loop wake-up cannot
  659. start a parallel ingestion cycle.
  660. """
  661. global _refresh_started
  662. async with _refresh_lock:
  663. if _refresh_started:
  664. return
  665. _refresh_started = True
  666. logger.info("news-mcp llm config: %s", active_llm_config())
  667. # Prune off-thread so we do not block the event loop
  668. prune_result = await asyncio.to_thread(
  669. _shared_store.prune_if_due,
  670. NEWS_PRUNING_ENABLED,
  671. NEWS_RETENTION_DAYS,
  672. NEWS_PRUNE_INTERVAL_HOURS,
  673. )
  674. logger.info("startup prune_result=%s", prune_result)
  675. if not NEWS_BACKGROUND_REFRESH_ENABLED:
  676. return
  677. async def _loop():
  678. if not NEWS_BACKGROUND_REFRESH_ON_START:
  679. logger.info("background refresh delayed start interval_seconds=%s", NEWS_REFRESH_INTERVAL_SECONDS)
  680. await asyncio.sleep(float(NEWS_REFRESH_INTERVAL_SECONDS))
  681. while True:
  682. try:
  683. logger.info("background refresh tick start")
  684. await refresh_clusters(topic=None, limit=200)
  685. logger.info("background refresh tick complete")
  686. except Exception:
  687. logger.exception("background refresh tick failed")
  688. await asyncio.sleep(float(NEWS_REFRESH_INTERVAL_SECONDS))
  689. asyncio.create_task(_loop())
  690. @app.get("/")
  691. def root():
  692. return {
  693. "status": "ok",
  694. "transport": "fastmcp+sse",
  695. "mount": "/mcp",
  696. "tools": [
  697. "get_latest_events",
  698. "get_events_for_entity",
  699. "get_event_summary",
  700. "detect_emerging_topics",
  701. "get_news_sentiment",
  702. "get_related_recent_entities",
  703. "get_capabilities",
  704. ],
  705. "refresh": {
  706. "enabled": NEWS_BACKGROUND_REFRESH_ENABLED,
  707. "interval_seconds": NEWS_REFRESH_INTERVAL_SECONDS,
  708. },
  709. "retention": {
  710. "lookback_hours": DEFAULT_LOOKBACK_HOURS,
  711. "retention_days": NEWS_RETENTION_DAYS,
  712. },
  713. "pruning": {
  714. "enabled": NEWS_PRUNING_ENABLED,
  715. "interval_hours": NEWS_PRUNE_INTERVAL_HOURS,
  716. },
  717. }
  718. # ------------------------------------------------------------------
  719. # Dashboard REST API endpoints
  720. # ------------------------------------------------------------------
  721. from fastapi.staticfiles import StaticFiles
  722. from fastapi.responses import JSONResponse
  723. app.mount("/dashboard", StaticFiles(directory="dashboard", html=True), name="dashboard")
  724. import logging as _log
  725. API_LOG = _log.getLogger("news_mcp.api")
  726. def _api_ok(data: dict) -> dict:
  727. return data
  728. def _api_err(exc: Exception, ctx: str) -> JSONResponse:
  729. API_LOG.exception(f"API error in {ctx}")
  730. return JSONResponse(status_code=500, content={"error": str(exc), "ctx": ctx})
  731. @app.get("/api/v1/health")
  732. def api_health():
  733. """Extended health + dashboard stats."""
  734. try:
  735. store = DashboardStore(_shared_store)
  736. return store.get_dashboard_stats()
  737. except Exception as e:
  738. return _api_err(e, "health")
  739. @app.get("/api/v1/clusters")
  740. def api_clusters(
  741. topic: str | None = None,
  742. hours: int = 24,
  743. limit: int = 50,
  744. offset: int = 0,
  745. ):
  746. """Paginated cluster listing."""
  747. try:
  748. store = DashboardStore(_shared_store)
  749. clusters = store.get_clusters_page(topic=topic, hours=hours, limit=limit, offset=offset)
  750. with store._store._conn() as conn:
  751. if topic and topic != "all":
  752. count_row = conn.execute(
  753. "SELECT COUNT(*) FROM clusters WHERE updated_at >= datetime('now', ? || ' hours') AND topic = ?",
  754. (-hours, topic),
  755. ).fetchone()
  756. else:
  757. count_row = conn.execute(
  758. "SELECT COUNT(*) FROM clusters WHERE updated_at >= datetime('now', ? || ' hours')",
  759. (-hours,),
  760. ).fetchone()
  761. total = count_row[0] if count_row else 0
  762. return {"clusters": clusters, "total": total, "topic": topic or "all", "hours": hours}
  763. except Exception as e:
  764. return _api_err(e, f"clusters(topic={topic},hours={hours})")
  765. @app.get("/api/v1/sentiment-series")
  766. def api_sentiment_series(
  767. topic: str | None = None,
  768. hours: int = 24,
  769. bucket_hours: float = 1.0,
  770. ):
  771. """Sentiment time-series for Chart.js."""
  772. try:
  773. store = DashboardStore(_shared_store)
  774. series = store.get_sentiment_series(topic=topic, hours=hours, bucket_hours=bucket_hours)
  775. return {"series": series, "topic": topic or "all"}
  776. except Exception as e:
  777. return _api_err(e, f"sentiment(topic={topic})")
  778. @app.get("/api/v1/entities")
  779. def api_entities(
  780. hours: int = 24,
  781. limit: int = 30,
  782. ):
  783. """Top entity frequencies."""
  784. try:
  785. store = DashboardStore(_shared_store)
  786. entities = store.get_entity_frequencies(hours=hours, limit=limit)
  787. return {"entities": entities, "hours": hours}
  788. except Exception as e:
  789. return _api_err(e, f"entities(hours={hours})")
  790. @app.get("/api/v1/cluster/{cluster_id}")
  791. def api_cluster_detail(cluster_id: str):
  792. """Full cluster detail for drill-down."""
  793. try:
  794. store = DashboardStore(_shared_store)
  795. detail = store.get_cluster_detail(cluster_id)
  796. if not detail:
  797. return JSONResponse(status_code=404, content={"error": "Cluster not found", "id": cluster_id})
  798. return detail
  799. except Exception as e:
  800. return _api_err(e, f"detail({cluster_id})")
  801. # ------------------------------------------------------------------
  802. # Feed management endpoints (toggle on/off from dashboard)
  803. # ------------------------------------------------------------------
  804. @app.get("/api/v1/feeds")
  805. def api_feeds():
  806. """List all configured feeds with enabled/disabled status."""
  807. try:
  808. store = SQLiteClusterStore(DB_PATH)
  809. feed_list = store.get_feed_state_list()
  810. configured = _configured_feed_urls()
  811. return {
  812. "feeds": feed_list,
  813. "configured_urls": configured,
  814. }
  815. except Exception as e:
  816. return _api_err(e, "feeds")
  817. @app.post("/api/v1/feeds/toggle")
  818. async def api_feed_toggle(feed_url: str = Form(), enabled: bool = Form()):
  819. """Toggle a feed's enabled state."""
  820. try:
  821. store = SQLiteClusterStore(DB_PATH)
  822. ok = store.set_feed_enabled(feed_url.strip(), enabled)
  823. if not ok:
  824. return JSONResponse(
  825. status_code=404,
  826. content={"error": f"Feed not found: {feed_url}"},
  827. )
  828. return {"ok": True, "feed_url": feed_url.strip(), "enabled": enabled}
  829. except Exception as e:
  830. return _api_err(e, f"toggle({feed_url})")
  831. @app.get("/health")
  832. def health():
  833. return {
  834. "status": "ok",
  835. "uptime": round(time.monotonic() - _PROCESS_STARTED_AT, 3),
  836. }