mcp_server_fastmcp.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573
  1. from __future__ import annotations
  2. from fastapi import FastAPI
  3. from mcp.server.fastmcp import FastMCP
  4. from mcp.server.transport_security import TransportSecuritySettings
  5. from news_mcp.config import CLUSTERS_TTL_HOURS, DEFAULT_TOPICS, DB_PATH
  6. from news_mcp.config import NEWS_REFRESH_INTERVAL_SECONDS, NEWS_BACKGROUND_REFRESH_ENABLED, NEWS_BACKGROUND_REFRESH_ON_START
  7. from news_mcp.jobs.poller import refresh_clusters
  8. from news_mcp.storage.sqlite_store import SQLiteClusterStore
  9. from news_mcp.enrichment.llm_enrich import summarize_cluster_groq
  10. from news_mcp.trends_resolution import resolve_entity_via_trends
  11. from news_mcp.llm import active_llm_config
  12. from news_mcp.entity_normalize import normalize_query
  13. from collections import Counter
  14. import logging
  15. mcp = FastMCP(
  16. "news-mcp",
  17. transport_security=TransportSecuritySettings(enable_dns_rebinding_protection=False),
  18. )
  19. def _cluster_entity_haystack(cluster: dict) -> list[str]:
  20. """Collect the normalized entity clues attached to a cluster."""
  21. values: list[str] = []
  22. for ent in cluster.get("entities", []) or []:
  23. values.append(str(ent).strip().lower())
  24. for res in cluster.get("entityResolutions", []) or []:
  25. if not isinstance(res, dict):
  26. continue
  27. for key in ("normalized", "canonical_label", "mid"):
  28. val = res.get(key)
  29. if val:
  30. values.append(str(val).strip().lower())
  31. return [v for v in values if v]
  32. @mcp.tool(description="What is happening right now? Return the latest deduplicated news clusters for a topic.")
  33. async def get_latest_events(topic: str = "crypto", limit: int = 5, include_articles: bool = False):
  34. limit = max(1, min(int(limit), 20))
  35. # If the caller passes an entity-like value, resolve it and use the canonical
  36. # entity as the query lens. Otherwise keep the original topic path.
  37. topic_norm = normalize_query(topic).lower()
  38. resolved = resolve_entity_via_trends(topic_norm)
  39. allowed = {t.lower() for t in DEFAULT_TOPICS}
  40. is_topic = topic_norm in allowed
  41. query_terms = {
  42. topic_norm,
  43. str(resolved.get("normalized") or "").strip().lower(),
  44. str(resolved.get("canonical_label") or "").strip().lower(),
  45. str(resolved.get("mid") or "").strip().lower(),
  46. }
  47. query_terms = {q for q in query_terms if q}
  48. store = SQLiteClusterStore(DB_PATH)
  49. if is_topic:
  50. # Cache-first: only refresh if we currently have no fresh clusters for this topic.
  51. clusters = store.get_latest_clusters(topic=topic_norm, ttl_hours=CLUSTERS_TTL_HOURS, limit=limit)
  52. if not clusters:
  53. await refresh_clusters(topic=topic_norm, limit=200)
  54. clusters = store.get_latest_clusters(topic=topic_norm, ttl_hours=CLUSTERS_TTL_HOURS, limit=limit)
  55. else:
  56. # Entity-aware mode: search recent clusters across all topics and match by
  57. # raw entity, canonical label, or MID.
  58. clusters = store.get_latest_clusters_all_topics(ttl_hours=CLUSTERS_TTL_HOURS, limit=limit * 8)
  59. filtered = []
  60. for c in clusters:
  61. haystack = _cluster_entity_haystack(c)
  62. if any(any(term in item for item in haystack) for term in query_terms):
  63. filtered.append(c)
  64. if len(filtered) >= limit:
  65. break
  66. clusters = filtered
  67. # Ensure the response is compact and agent-friendly.
  68. clusters_sorted = sorted(clusters, key=lambda x: float(x.get("importance", 0.0)), reverse=True)
  69. out = []
  70. for c in clusters_sorted:
  71. item = {
  72. "cluster_id": c.get("cluster_id"),
  73. "headline": c.get("headline"),
  74. "summary": c.get("summary"),
  75. "entities": c.get("entities", []),
  76. "sentiment": c.get("sentiment", "neutral"),
  77. "importance": c.get("importance", 0.0),
  78. "sources": c.get("sources", []),
  79. "timestamp": c.get("timestamp"),
  80. }
  81. if include_articles:
  82. # Return minimal article fields to keep responses compact.
  83. arts = c.get("articles", []) or []
  84. item["articles"] = [
  85. {
  86. "title": a.get("title"),
  87. "url": a.get("url"),
  88. "source": a.get("source"),
  89. "timestamp": a.get("timestamp"),
  90. }
  91. for a in arts
  92. if isinstance(a, dict)
  93. ]
  94. out.append(item)
  95. return out
  96. @mcp.tool(description="What's happening with X? Filter latest clusters by extracted entity substring (case-insensitive).")
  97. async def get_events_for_entity(entity: str, limit: int = 10, include_articles: bool = False):
  98. limit = max(1, min(int(limit), 30))
  99. query = normalize_query(entity).strip().lower()
  100. if not query:
  101. return []
  102. resolved = resolve_entity_via_trends(query)
  103. query_terms = {
  104. query,
  105. str(resolved.get("normalized") or "").strip().lower(),
  106. str(resolved.get("canonical_label") or "").strip().lower(),
  107. str(resolved.get("mid") or "").strip().lower(),
  108. }
  109. query_terms = {q for q in query_terms if q}
  110. # Cache-first: search recent clusters across all topics.
  111. store = SQLiteClusterStore(DB_PATH)
  112. def _match_clusters(clusters: list[dict]) -> list[dict]:
  113. hits: list[dict] = []
  114. for c in clusters:
  115. haystack = _cluster_entity_haystack(c)
  116. if any(any(term in item for item in haystack) for term in query_terms):
  117. hits.append(c)
  118. if len(hits) >= limit:
  119. break
  120. return hits
  121. clusters = store.get_latest_clusters_all_topics(ttl_hours=CLUSTERS_TTL_HOURS, limit=limit * 5)
  122. hits = _match_clusters(clusters)
  123. # If the recent slice misses, broaden the search window before giving up.
  124. if not hits:
  125. clusters = store.get_latest_clusters_all_topics(ttl_hours=24 * 7, limit=500)
  126. hits = _match_clusters(clusters)
  127. # Compress to tool response shape.
  128. out = []
  129. for c in hits:
  130. item = {
  131. "cluster_id": c.get("cluster_id"),
  132. "headline": c.get("headline"),
  133. "summary": c.get("summary"),
  134. "entities": c.get("entities", []),
  135. "sentiment": c.get("sentiment", "neutral"),
  136. "importance": c.get("importance", 0.0),
  137. "sources": c.get("sources", []),
  138. "timestamp": c.get("timestamp"),
  139. }
  140. if include_articles:
  141. arts = c.get("articles", []) or []
  142. item["articles"] = [
  143. {
  144. "title": a.get("title"),
  145. "url": a.get("url"),
  146. "source": a.get("source"),
  147. "timestamp": a.get("timestamp"),
  148. }
  149. for a in arts
  150. if isinstance(a, dict)
  151. ]
  152. out.append(item)
  153. return out
  154. @mcp.tool(description="Explain an event clearly by cluster_id (Groq summary).")
  155. async def get_event_summary(event_id: str, include_articles: bool = False):
  156. store = SQLiteClusterStore(DB_PATH)
  157. # Summary cache: reuse if present within TTL.
  158. cached_summary = store.get_cluster_summary(
  159. cluster_id=event_id,
  160. ttl_hours=CLUSTERS_TTL_HOURS,
  161. )
  162. if cached_summary:
  163. out = {
  164. "event_id": event_id,
  165. "headline": cached_summary.get("headline"),
  166. "mergedSummary": cached_summary.get("mergedSummary"),
  167. "keyFacts": cached_summary.get("keyFacts", []),
  168. "sources": cached_summary.get("sources", []),
  169. }
  170. if include_articles:
  171. cluster = store.get_cluster_by_id(event_id)
  172. arts = (cluster or {}).get("articles", []) or []
  173. out["articles"] = [
  174. {
  175. "title": a.get("title"),
  176. "url": a.get("url"),
  177. "source": a.get("source"),
  178. "timestamp": a.get("timestamp"),
  179. }
  180. for a in arts
  181. if isinstance(a, dict)
  182. ]
  183. return out
  184. cluster = store.get_cluster_by_id(event_id)
  185. if not cluster:
  186. return {
  187. "event_id": event_id,
  188. "error": "NOT_FOUND",
  189. }
  190. articles_out = None
  191. if include_articles:
  192. arts = cluster.get("articles", []) or []
  193. articles_out = [
  194. {
  195. "title": a.get("title"),
  196. "url": a.get("url"),
  197. "source": a.get("source"),
  198. "timestamp": a.get("timestamp"),
  199. }
  200. for a in arts
  201. if isinstance(a, dict)
  202. ]
  203. summary = await summarize_cluster_groq(cluster)
  204. store.upsert_cluster_summary(event_id, summary)
  205. out = {
  206. "event_id": event_id,
  207. "headline": summary.get("headline"),
  208. "mergedSummary": summary.get("mergedSummary"),
  209. "keyFacts": summary.get("keyFacts", []),
  210. "sources": summary.get("sources", []),
  211. }
  212. if include_articles:
  213. out["articles"] = articles_out or []
  214. return out
  215. @mcp.tool(description="Detect emerging topics/entities from recent cached news clusters.")
  216. async def detect_emerging_topics(limit: int = 10):
  217. limit = max(1, min(int(limit), 20))
  218. store = SQLiteClusterStore(DB_PATH)
  219. clusters = store.get_latest_clusters_all_topics(ttl_hours=CLUSTERS_TTL_HOURS, limit=200)
  220. from collections import Counter
  221. import re
  222. entity_counts = Counter()
  223. entity_importance_sum = Counter()
  224. # co-occurrence: ent -> other_ent -> count
  225. entity_cooccur = {}
  226. phrase_counts = Counter()
  227. topic_counts = Counter()
  228. # Very light heuristics to reduce “meta entities” dominating emerging topics.
  229. # Keep it conservative: only skip obvious boilerplate.
  230. def _is_generic_entity(ent: str) -> bool:
  231. e = str(ent).strip().lower()
  232. if not e:
  233. return True
  234. if len(e) < 4:
  235. return True
  236. # common outlet-ish / meta-ish tokens
  237. if e in {"news", "latest", "breaking"}:
  238. return True
  239. return False
  240. for c in clusters:
  241. topic_counts[c.get("topic", "other")] += 1
  242. ents_in_cluster = [e for e in (c.get("entities", []) or []) if not _is_generic_entity(e)]
  243. ents_in_cluster_norm = [str(e).strip().lower() for e in ents_in_cluster if str(e).strip()]
  244. for ent in ents_in_cluster_norm:
  245. if _is_generic_entity(ent):
  246. continue
  247. entity_counts[ent] += 1
  248. try:
  249. entity_importance_sum[ent] += float(c.get("importance", 0.0) or 0.0)
  250. except Exception:
  251. pass
  252. # update co-occurrence counts
  253. for i in range(len(ents_in_cluster_norm)):
  254. a = ents_in_cluster_norm[i]
  255. if not a:
  256. continue
  257. entity_cooccur.setdefault(a, Counter())
  258. for j in range(len(ents_in_cluster_norm)):
  259. if i == j:
  260. continue
  261. b = ents_in_cluster_norm[j]
  262. if not b:
  263. continue
  264. entity_cooccur[a][b] += 1
  265. text = f"{c.get('headline','')} {c.get('summary','')}"
  266. words = [w for w in re.findall(r"[A-Za-z][A-Za-z0-9\-]{2,}", text.lower())]
  267. for i in range(len(words) - 1):
  268. phrase = f"{words[i]} {words[i+1]}"
  269. if len(phrase) > 6:
  270. phrase_counts[phrase] += 1
  271. emerging = []
  272. # Combine frequency with average importance so “big signal” rises over pure repetition.
  273. for ent, count in entity_counts.most_common(limit):
  274. avg_imp = entity_importance_sum[ent] / max(1, count)
  275. # avg_imp is typically 0..~1; keep score bounded.
  276. trend_score = 0.25 + 0.40 * min(1.0, avg_imp) + 0.08 * min(6.0, float(count))
  277. related = []
  278. for other, _cnt in (entity_cooccur.get(ent) or Counter()).most_common(3):
  279. # avoid returning the entity itself (shouldn't happen, but be safe)
  280. if other != ent:
  281. related.append(other)
  282. emerging.append({
  283. "topic": ent,
  284. "trend_score": min(0.99, round(trend_score, 2)),
  285. "related_entities": related if related else [ent],
  286. "signal_type": "entity",
  287. "count": count,
  288. "avg_importance": round(avg_imp, 3),
  289. })
  290. for phrase, count in phrase_counts.most_common(limit * 2):
  291. if any(item["topic"] == phrase for item in emerging):
  292. continue
  293. emerging.append({
  294. "topic": phrase.title(),
  295. "trend_score": min(0.99, round(0.20 + 0.10 * count, 2)),
  296. "related_entities": [],
  297. "signal_type": "phrase",
  298. "count": count,
  299. })
  300. if len(emerging) >= limit:
  301. break
  302. return emerging[:limit]
  303. @mcp.tool(description="What's the overall sentiment around an entity within a timeframe?")
  304. async def get_news_sentiment(entity: str, timeframe: str = "24h"):
  305. store = SQLiteClusterStore(DB_PATH)
  306. ent = normalize_query(entity).strip().lower()
  307. resolved = resolve_entity_via_trends(ent)
  308. query_terms = {
  309. ent,
  310. str(resolved.get("normalized") or "").strip().lower(),
  311. str(resolved.get("canonical_label") or "").strip().lower(),
  312. str(resolved.get("mid") or "").strip().lower(),
  313. }
  314. query_terms = {q for q in query_terms if q}
  315. if not ent:
  316. return {
  317. "entity": entity,
  318. "sentiment": "neutral",
  319. "score": 0.0,
  320. "cluster_count": 0,
  321. }
  322. # timeframe: accept '24h' or '24'
  323. tf = str(timeframe).strip().lower()
  324. try:
  325. hours = int(tf[:-1]) if tf.endswith("h") else int(tf)
  326. except Exception:
  327. hours = 24
  328. hours = max(1, min(int(hours), 168))
  329. clusters = store.get_latest_clusters_all_topics(ttl_hours=hours, limit=500)
  330. matched = []
  331. for c in clusters:
  332. haystack = _cluster_entity_haystack(c)
  333. if any(any(term in item for item in haystack) for term in query_terms):
  334. matched.append(c)
  335. if not matched:
  336. return {
  337. "entity": entity,
  338. "sentiment": "neutral",
  339. "score": 0.0,
  340. "cluster_count": 0,
  341. }
  342. scores = []
  343. for c in matched:
  344. s = c.get("sentimentScore")
  345. if s is not None:
  346. try:
  347. scores.append(float(s))
  348. except Exception:
  349. pass
  350. avg_score = sum(scores) / len(scores) if scores else 0.0
  351. # Keep the label aligned with the numeric score.
  352. # Small magnitudes are treated as neutral to avoid noisy label flips.
  353. if avg_score >= 0.15:
  354. sentiment = "positive"
  355. elif avg_score <= -0.15:
  356. sentiment = "negative"
  357. else:
  358. sentiment = "neutral"
  359. return {
  360. "entity": entity,
  361. "sentiment": sentiment,
  362. "score": round(avg_score, 3),
  363. "cluster_count": len(matched),
  364. }
  365. def _parse_timeframe_to_hours(timeframe: str) -> int:
  366. tf = str(timeframe).strip().lower()
  367. try:
  368. if tf.endswith("d"):
  369. days = int(tf[:-1])
  370. return max(1, days * 24)
  371. if tf.endswith("h"):
  372. return max(1, int(tf[:-1]))
  373. return max(1, int(tf))
  374. except Exception:
  375. return 24
  376. @mcp.tool(
  377. description="Given a subject entity, find related entities via co-occurrence inside recent clusters (entity-only, no topic fallback)."
  378. )
  379. async def get_related_entities(subject: str, timeframe: str = "24h", limit: int = 10):
  380. store = SQLiteClusterStore(DB_PATH)
  381. limit = max(1, min(int(limit), 30))
  382. subj = normalize_query(subject).strip().lower()
  383. if not subj:
  384. return []
  385. resolved = resolve_entity_via_trends(subj)
  386. query_terms = {
  387. subj,
  388. str(resolved.get("normalized") or "").strip().lower(),
  389. str(resolved.get("canonical_label") or "").strip().lower(),
  390. str(resolved.get("mid") or "").strip().lower(),
  391. }
  392. query_terms = {q for q in query_terms if q}
  393. hours = _parse_timeframe_to_hours(timeframe)
  394. clusters = store.get_latest_clusters_all_topics(ttl_hours=hours, limit=500)
  395. # Aggregate related metrics per entity.
  396. rel_count = Counter()
  397. rel_imp_sum = Counter()
  398. rel_sent_sum = Counter()
  399. rel_sent_n = Counter()
  400. for c in clusters:
  401. haystack = _cluster_entity_haystack(c)
  402. if not any(term in item for item in haystack for term in query_terms):
  403. continue
  404. ents = [str(e).strip().lower() for e in (c.get("entities", []) or []) if str(e).strip()]
  405. # remove generic/meta-ish short tokens conservatively
  406. ents = [e for e in ents if len(e) >= 4]
  407. for e in ents:
  408. if e in query_terms:
  409. continue
  410. rel_count[e] += 1
  411. try:
  412. rel_imp_sum[e] += float(c.get("importance", 0.0) or 0.0)
  413. except Exception:
  414. pass
  415. # sentiment aggregation based on sentimentScore if available.
  416. s = c.get("sentimentScore")
  417. if s is not None:
  418. try:
  419. rel_sent_sum[e] += float(s)
  420. rel_sent_n[e] += 1
  421. except Exception:
  422. pass
  423. # Sort by count, then avg importance.
  424. items = []
  425. for ent, cnt in rel_count.most_common():
  426. avg_imp = rel_imp_sum[ent] / max(1, cnt)
  427. avg_score = rel_sent_sum[ent] / max(1, rel_sent_n[ent]) if rel_sent_n[ent] else 0.0
  428. if avg_score >= 0.15:
  429. sentiment = "positive"
  430. elif avg_score <= -0.15:
  431. sentiment = "negative"
  432. else:
  433. sentiment = "neutral"
  434. items.append(
  435. {
  436. "entity": ent,
  437. "count": cnt,
  438. "avg_importance": round(avg_imp, 3),
  439. "sentiment": sentiment,
  440. "score": round(avg_score, 3),
  441. }
  442. )
  443. if len(items) >= limit:
  444. break
  445. return items
  446. app = FastAPI(title="News MCP Server")
  447. logger = logging.getLogger("news_mcp.startup")
  448. app.mount("/mcp", mcp.sse_app())
  449. _background_task_started = False
  450. @app.on_event("startup")
  451. async def _start_background_refresh():
  452. global _background_task_started
  453. if _background_task_started:
  454. return
  455. if not NEWS_BACKGROUND_REFRESH_ENABLED:
  456. return
  457. _background_task_started = True
  458. logger.info("news-mcp llm config: %s", active_llm_config())
  459. async def _loop():
  460. if not NEWS_BACKGROUND_REFRESH_ON_START:
  461. await asyncio.sleep(float(NEWS_REFRESH_INTERVAL_SECONDS))
  462. while True:
  463. try:
  464. # Refresh all topics by passing topic=None
  465. await refresh_clusters(topic=None, limit=200)
  466. except Exception:
  467. # Avoid crashing the server on network errors.
  468. pass
  469. await asyncio.sleep(float(NEWS_REFRESH_INTERVAL_SECONDS))
  470. import asyncio
  471. asyncio.create_task(_loop())
  472. @app.get("/")
  473. def root():
  474. return {
  475. "status": "ok",
  476. "transport": "fastmcp+sse",
  477. "mount": "/mcp",
  478. "tools": ["get_latest_events", "get_events_for_entity", "get_event_summary", "detect_emerging_topics"],
  479. "refresh": {
  480. "enabled": NEWS_BACKGROUND_REFRESH_ENABLED,
  481. "interval_seconds": NEWS_REFRESH_INTERVAL_SECONDS,
  482. },
  483. }
  484. @app.get("/health")
  485. def health():
  486. store = SQLiteClusterStore(DB_PATH)
  487. return {
  488. "status": "ok",
  489. "ttl_hours": CLUSTERS_TTL_HOURS,
  490. "db": str(DB_PATH),
  491. "refresh": store.get_feed_state("breakingthenews"),
  492. }