mcp_server_fastmcp.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626
  1. from __future__ import annotations
  2. import asyncio
  3. import logging
  4. from collections import Counter
  5. from datetime import datetime, timezone
  6. from email.utils import parsedate_to_datetime
  7. from fastapi import FastAPI
  8. from mcp.server.fastmcp import FastMCP
  9. from mcp.server.transport_security import TransportSecuritySettings
  10. from news_mcp.config import DEFAULT_LOOKBACK_HOURS, DEFAULT_TOPICS, DB_PATH
  11. from news_mcp.config import (
  12. NEWS_PRUNE_INTERVAL_HOURS,
  13. NEWS_PRUNING_ENABLED,
  14. NEWS_REFRESH_INTERVAL_SECONDS,
  15. NEWS_BACKGROUND_REFRESH_ENABLED,
  16. NEWS_BACKGROUND_REFRESH_ON_START,
  17. NEWS_RETENTION_DAYS,
  18. )
  19. from news_mcp.jobs.poller import refresh_clusters
  20. from news_mcp.storage.sqlite_store import SQLiteClusterStore
  21. from news_mcp.enrichment.llm_enrich import summarize_cluster_llm
  22. from news_mcp.trends_resolution import resolve_entity_via_trends
  23. from news_mcp.llm import active_llm_config
  24. from news_mcp.entity_normalize import normalize_query
  25. mcp = FastMCP(
  26. "news-mcp",
  27. transport_security=TransportSecuritySettings(enable_dns_rebinding_protection=False),
  28. )
  29. def _cluster_entity_haystack(cluster: dict) -> list[str]:
  30. """Collect the normalized entity clues attached to a cluster."""
  31. values: list[str] = []
  32. for ent in cluster.get("entities", []) or []:
  33. values.append(str(ent).strip().lower())
  34. for res in cluster.get("entityResolutions", []) or []:
  35. if not isinstance(res, dict):
  36. continue
  37. for key in ("normalized", "canonical_label", "mid"):
  38. val = res.get(key)
  39. if val:
  40. values.append(str(val).strip().lower())
  41. return [v for v in values if v]
  42. def _parse_cluster_timestamp(value) -> datetime:
  43. if not value:
  44. return datetime.min.replace(tzinfo=timezone.utc)
  45. text = str(value).strip()
  46. if not text:
  47. return datetime.min.replace(tzinfo=timezone.utc)
  48. try:
  49. dt = datetime.fromisoformat(text.replace("Z", "+00:00"))
  50. if dt.tzinfo is None:
  51. dt = dt.replace(tzinfo=timezone.utc)
  52. return dt.astimezone(timezone.utc)
  53. except Exception:
  54. pass
  55. try:
  56. dt = parsedate_to_datetime(text)
  57. if dt.tzinfo is None:
  58. dt = dt.replace(tzinfo=timezone.utc)
  59. return dt.astimezone(timezone.utc)
  60. except Exception:
  61. return datetime.min.replace(tzinfo=timezone.utc)
  62. def _sort_clusters_by_recency(clusters: list[dict]) -> list[dict]:
  63. return sorted(
  64. clusters,
  65. key=lambda c: (
  66. _parse_cluster_timestamp(c.get("timestamp")),
  67. float(c.get("importance", 0.0) or 0.0),
  68. ),
  69. reverse=True,
  70. )
  71. @mcp.tool(description="Investigate a topic and return the newest deduplicated news clusters, sorted by recency.")
  72. async def get_latest_events(topic: str = "crypto", limit: int = 5, include_articles: bool = False):
  73. limit = max(1, min(int(limit), 20))
  74. # If the caller passes an entity-like value, resolve it and use the canonical
  75. # entity as the query lens. Otherwise keep the original topic path.
  76. topic_norm = normalize_query(topic).lower()
  77. resolved = resolve_entity_via_trends(topic_norm)
  78. allowed = {t.lower() for t in DEFAULT_TOPICS}
  79. is_topic = topic_norm in allowed
  80. query_terms = {
  81. topic_norm,
  82. str(resolved.get("normalized") or "").strip().lower(),
  83. str(resolved.get("canonical_label") or "").strip().lower(),
  84. str(resolved.get("mid") or "").strip().lower(),
  85. }
  86. query_terms = {q for q in query_terms if q}
  87. store = SQLiteClusterStore(DB_PATH)
  88. if is_topic:
  89. # Cache-first: only refresh if we currently have no fresh clusters for this topic.
  90. clusters = store.get_latest_clusters(topic=topic_norm, ttl_hours=DEFAULT_LOOKBACK_HOURS, limit=limit)
  91. if not clusters:
  92. await refresh_clusters(topic=topic_norm, limit=200)
  93. clusters = store.get_latest_clusters(topic=topic_norm, ttl_hours=DEFAULT_LOOKBACK_HOURS, limit=limit)
  94. else:
  95. # Entity-aware mode: search recent clusters across all topics and match by
  96. # raw entity, canonical label, or MID.
  97. clusters = store.get_latest_clusters_all_topics(ttl_hours=DEFAULT_LOOKBACK_HOURS, limit=limit * 8)
  98. filtered = []
  99. for c in clusters:
  100. haystack = _cluster_entity_haystack(c)
  101. if any(any(term in item for item in haystack) for term in query_terms):
  102. filtered.append(c)
  103. if len(filtered) >= limit:
  104. break
  105. clusters = filtered
  106. out = []
  107. for c in _sort_clusters_by_recency(clusters):
  108. item = {
  109. "cluster_id": c.get("cluster_id"),
  110. "headline": c.get("headline"),
  111. "summary": c.get("summary"),
  112. "entities": c.get("entities", []),
  113. "sentiment": c.get("sentiment", "neutral"),
  114. "importance": c.get("importance", 0.0),
  115. "sources": c.get("sources", []),
  116. "timestamp": c.get("timestamp"),
  117. }
  118. if include_articles:
  119. # Return minimal article fields to keep responses compact.
  120. arts = c.get("articles", []) or []
  121. item["articles"] = [
  122. {
  123. "title": a.get("title"),
  124. "url": a.get("url"),
  125. "source": a.get("source"),
  126. "timestamp": a.get("timestamp"),
  127. }
  128. for a in arts
  129. if isinstance(a, dict)
  130. ]
  131. out.append(item)
  132. return out
  133. @mcp.tool(description="Investigate a person, company, place, or theme by matching extracted entities within a time window.")
  134. async def get_events_for_entity(entity: str, limit: int = 10, timeframe: str = "24h", include_articles: bool = False):
  135. limit = max(1, min(int(limit), 30))
  136. query = normalize_query(entity).strip().lower()
  137. if not query:
  138. return []
  139. resolved = resolve_entity_via_trends(query)
  140. query_terms = {
  141. query,
  142. str(resolved.get("normalized") or "").strip().lower(),
  143. str(resolved.get("canonical_label") or "").strip().lower(),
  144. str(resolved.get("mid") or "").strip().lower(),
  145. }
  146. query_terms = {q for q in query_terms if q}
  147. store = SQLiteClusterStore(DB_PATH)
  148. def _match_clusters(clusters: list[dict]) -> list[dict]:
  149. hits: list[dict] = []
  150. for c in _sort_clusters_by_recency(clusters):
  151. haystack = _cluster_entity_haystack(c)
  152. if any(any(term in item for item in haystack) for term in query_terms):
  153. hits.append(c)
  154. if len(hits) >= limit:
  155. break
  156. return hits
  157. hours = _parse_timeframe_to_hours(timeframe)
  158. clusters = store.get_latest_clusters_all_topics(ttl_hours=hours, limit=max(200, limit * 10))
  159. hits = _match_clusters(clusters)
  160. out = []
  161. for c in hits:
  162. item = {
  163. "cluster_id": c.get("cluster_id"),
  164. "headline": c.get("headline"),
  165. "summary": c.get("summary"),
  166. "entities": c.get("entities", []),
  167. "sentiment": c.get("sentiment", "neutral"),
  168. "importance": c.get("importance", 0.0),
  169. "sources": c.get("sources", []),
  170. "timestamp": c.get("timestamp"),
  171. }
  172. if include_articles:
  173. arts = c.get("articles", []) or []
  174. item["articles"] = [
  175. {
  176. "title": a.get("title"),
  177. "url": a.get("url"),
  178. "source": a.get("source"),
  179. "timestamp": a.get("timestamp"),
  180. }
  181. for a in arts
  182. if isinstance(a, dict)
  183. ]
  184. out.append(item)
  185. return out
  186. @mcp.tool(description="Investigate one cluster in depth and return a concise LLM-written explanation plus key facts.")
  187. async def get_event_summary(event_id: str, include_articles: bool = False):
  188. store = SQLiteClusterStore(DB_PATH)
  189. # Summary cache: reuse if present within TTL.
  190. cached_summary = store.get_cluster_summary(
  191. cluster_id=event_id,
  192. ttl_hours=DEFAULT_LOOKBACK_HOURS,
  193. )
  194. if cached_summary:
  195. out = {
  196. "event_id": event_id,
  197. "headline": cached_summary.get("headline"),
  198. "mergedSummary": cached_summary.get("mergedSummary"),
  199. "keyFacts": cached_summary.get("keyFacts", []),
  200. "sources": cached_summary.get("sources", []),
  201. }
  202. if include_articles:
  203. cluster = store.get_cluster_by_id(event_id)
  204. arts = (cluster or {}).get("articles", []) or []
  205. out["articles"] = [
  206. {
  207. "title": a.get("title"),
  208. "url": a.get("url"),
  209. "source": a.get("source"),
  210. "timestamp": a.get("timestamp"),
  211. }
  212. for a in arts
  213. if isinstance(a, dict)
  214. ]
  215. return out
  216. cluster = store.get_cluster_by_id(event_id)
  217. if not cluster:
  218. return {
  219. "event_id": event_id,
  220. "error": "NOT_FOUND",
  221. }
  222. articles_out = None
  223. if include_articles:
  224. arts = cluster.get("articles", []) or []
  225. articles_out = [
  226. {
  227. "title": a.get("title"),
  228. "url": a.get("url"),
  229. "source": a.get("source"),
  230. "timestamp": a.get("timestamp"),
  231. }
  232. for a in arts
  233. if isinstance(a, dict)
  234. ]
  235. summary = await summarize_cluster_llm(cluster)
  236. store.upsert_cluster_summary(event_id, summary)
  237. out = {
  238. "event_id": event_id,
  239. "headline": summary.get("headline"),
  240. "mergedSummary": summary.get("mergedSummary"),
  241. "keyFacts": summary.get("keyFacts", []),
  242. "sources": summary.get("sources", []),
  243. }
  244. if include_articles:
  245. out["articles"] = articles_out or []
  246. return out
  247. @mcp.tool(description="Explore what is starting to matter: surface emerging entities and phrases from recent clusters.")
  248. async def detect_emerging_topics(limit: int = 10):
  249. limit = max(1, min(int(limit), 20))
  250. store = SQLiteClusterStore(DB_PATH)
  251. clusters = store.get_latest_clusters_all_topics(ttl_hours=DEFAULT_LOOKBACK_HOURS, limit=200)
  252. import re
  253. entity_counts = Counter()
  254. entity_importance_sum = Counter()
  255. # co-occurrence: ent -> other_ent -> count
  256. entity_cooccur = {}
  257. phrase_counts = Counter()
  258. topic_counts = Counter()
  259. # Very light heuristics to reduce “meta entities” dominating emerging topics.
  260. # Keep it conservative: only skip obvious boilerplate.
  261. def _is_generic_entity(ent: str) -> bool:
  262. e = str(ent).strip().lower()
  263. if not e:
  264. return True
  265. if len(e) < 4:
  266. return True
  267. # common outlet-ish / meta-ish tokens
  268. if e in {"news", "latest", "breaking"}:
  269. return True
  270. return False
  271. for c in clusters:
  272. topic_counts[c.get("topic", "other")] += 1
  273. ents_in_cluster = [e for e in (c.get("entities", []) or []) if not _is_generic_entity(e)]
  274. ents_in_cluster_norm = [str(e).strip().lower() for e in ents_in_cluster if str(e).strip()]
  275. for ent in ents_in_cluster_norm:
  276. if _is_generic_entity(ent):
  277. continue
  278. entity_counts[ent] += 1
  279. try:
  280. entity_importance_sum[ent] += float(c.get("importance", 0.0) or 0.0)
  281. except Exception:
  282. pass
  283. # update co-occurrence counts
  284. for i in range(len(ents_in_cluster_norm)):
  285. a = ents_in_cluster_norm[i]
  286. if not a:
  287. continue
  288. entity_cooccur.setdefault(a, Counter())
  289. for j in range(len(ents_in_cluster_norm)):
  290. if i == j:
  291. continue
  292. b = ents_in_cluster_norm[j]
  293. if not b:
  294. continue
  295. entity_cooccur[a][b] += 1
  296. text = f"{c.get('headline','')} {c.get('summary','')}"
  297. words = [w for w in re.findall(r"[A-Za-z][A-Za-z0-9\-]{2,}", text.lower())]
  298. for i in range(len(words) - 1):
  299. phrase = f"{words[i]} {words[i+1]}"
  300. if len(phrase) > 6:
  301. phrase_counts[phrase] += 1
  302. emerging = []
  303. # Combine frequency with average importance so “big signal” rises over pure repetition.
  304. for ent, count in entity_counts.most_common(limit):
  305. avg_imp = entity_importance_sum[ent] / max(1, count)
  306. # avg_imp is typically 0..~1; keep score bounded.
  307. trend_score = 0.25 + 0.40 * min(1.0, avg_imp) + 0.08 * min(6.0, float(count))
  308. related = []
  309. for other, _cnt in (entity_cooccur.get(ent) or Counter()).most_common(3):
  310. # avoid returning the entity itself (shouldn't happen, but be safe)
  311. if other != ent:
  312. related.append(other)
  313. emerging.append({
  314. "topic": ent,
  315. "trend_score": min(0.99, round(trend_score, 2)),
  316. "related_entities": related if related else [ent],
  317. "signal_type": "entity",
  318. "count": count,
  319. "avg_importance": round(avg_imp, 3),
  320. })
  321. for phrase, count in phrase_counts.most_common(limit * 2):
  322. if any(item["topic"] == phrase for item in emerging):
  323. continue
  324. emerging.append({
  325. "topic": phrase.title(),
  326. "trend_score": min(0.99, round(0.20 + 0.10 * count, 2)),
  327. "related_entities": [],
  328. "signal_type": "phrase",
  329. "count": count,
  330. })
  331. if len(emerging) >= limit:
  332. break
  333. return emerging[:limit]
  334. @mcp.tool(description="Investigate whether sentiment around an entity is positive, negative, or neutral over a chosen lookback window.")
  335. async def get_news_sentiment(entity: str, timeframe: str = "24h"):
  336. store = SQLiteClusterStore(DB_PATH)
  337. ent = normalize_query(entity).strip().lower()
  338. resolved = resolve_entity_via_trends(ent)
  339. query_terms = {
  340. ent,
  341. str(resolved.get("normalized") or "").strip().lower(),
  342. str(resolved.get("canonical_label") or "").strip().lower(),
  343. str(resolved.get("mid") or "").strip().lower(),
  344. }
  345. query_terms = {q for q in query_terms if q}
  346. if not ent:
  347. return {
  348. "entity": entity,
  349. "sentiment": "neutral",
  350. "score": 0.0,
  351. "cluster_count": 0,
  352. }
  353. # timeframe: accept '24h' or '24'
  354. tf = str(timeframe).strip().lower()
  355. try:
  356. hours = int(tf[:-1]) if tf.endswith("h") else int(tf)
  357. except Exception:
  358. hours = 24
  359. hours = max(1, min(int(hours), 168))
  360. clusters = store.get_latest_clusters_all_topics(ttl_hours=hours, limit=500)
  361. matched = []
  362. for c in clusters:
  363. haystack = _cluster_entity_haystack(c)
  364. if any(any(term in item for item in haystack) for term in query_terms):
  365. matched.append(c)
  366. if not matched:
  367. return {
  368. "entity": entity,
  369. "sentiment": "neutral",
  370. "score": 0.0,
  371. "cluster_count": 0,
  372. }
  373. scores = []
  374. for c in matched:
  375. s = c.get("sentimentScore")
  376. if s is not None:
  377. try:
  378. scores.append(float(s))
  379. except Exception:
  380. pass
  381. avg_score = sum(scores) / len(scores) if scores else 0.0
  382. # Keep the label aligned with the numeric score.
  383. # Small magnitudes are treated as neutral to avoid noisy label flips.
  384. if avg_score >= 0.15:
  385. sentiment = "positive"
  386. elif avg_score <= -0.15:
  387. sentiment = "negative"
  388. else:
  389. sentiment = "neutral"
  390. return {
  391. "entity": entity,
  392. "sentiment": sentiment,
  393. "score": round(avg_score, 3),
  394. "cluster_count": len(matched),
  395. }
  396. def _parse_timeframe_to_hours(timeframe: str) -> int:
  397. tf = str(timeframe).strip().lower()
  398. try:
  399. if tf.endswith("d"):
  400. days = int(tf[:-1])
  401. return max(1, days * 24)
  402. if tf.endswith("h"):
  403. return max(1, int(tf[:-1]))
  404. return max(1, int(tf))
  405. except Exception:
  406. return 24
  407. @mcp.tool(
  408. description="Investigate which entities tend to appear alongside a subject entity in recent clusters, based on co-occurrence."
  409. )
  410. async def get_related_entities(subject: str, timeframe: str = "24h", limit: int = 10):
  411. store = SQLiteClusterStore(DB_PATH)
  412. limit = max(1, min(int(limit), 30))
  413. subj = normalize_query(subject).strip().lower()
  414. if not subj:
  415. return []
  416. resolved = resolve_entity_via_trends(subj)
  417. query_terms = {
  418. subj,
  419. str(resolved.get("normalized") or "").strip().lower(),
  420. str(resolved.get("canonical_label") or "").strip().lower(),
  421. str(resolved.get("mid") or "").strip().lower(),
  422. }
  423. query_terms = {q for q in query_terms if q}
  424. hours = _parse_timeframe_to_hours(timeframe)
  425. clusters = store.get_latest_clusters_all_topics(ttl_hours=hours, limit=500)
  426. # Aggregate related metrics per entity.
  427. rel_count = Counter()
  428. rel_imp_sum = Counter()
  429. rel_sent_sum = Counter()
  430. rel_sent_n = Counter()
  431. for c in clusters:
  432. haystack = _cluster_entity_haystack(c)
  433. if not any(term in item for item in haystack for term in query_terms):
  434. continue
  435. ents = [str(e).strip().lower() for e in (c.get("entities", []) or []) if str(e).strip()]
  436. # remove generic/meta-ish short tokens conservatively
  437. ents = [e for e in ents if len(e) >= 4]
  438. for e in ents:
  439. if e in query_terms:
  440. continue
  441. rel_count[e] += 1
  442. try:
  443. rel_imp_sum[e] += float(c.get("importance", 0.0) or 0.0)
  444. except Exception:
  445. pass
  446. # sentiment aggregation based on sentimentScore if available.
  447. s = c.get("sentimentScore")
  448. if s is not None:
  449. try:
  450. rel_sent_sum[e] += float(s)
  451. rel_sent_n[e] += 1
  452. except Exception:
  453. pass
  454. # Sort by count, then avg importance.
  455. items = []
  456. for ent, cnt in rel_count.most_common():
  457. avg_imp = rel_imp_sum[ent] / max(1, cnt)
  458. avg_score = rel_sent_sum[ent] / max(1, rel_sent_n[ent]) if rel_sent_n[ent] else 0.0
  459. if avg_score >= 0.15:
  460. sentiment = "positive"
  461. elif avg_score <= -0.15:
  462. sentiment = "negative"
  463. else:
  464. sentiment = "neutral"
  465. items.append(
  466. {
  467. "entity": ent,
  468. "count": cnt,
  469. "avg_importance": round(avg_imp, 3),
  470. "sentiment": sentiment,
  471. "score": round(avg_score, 3),
  472. }
  473. )
  474. if len(items) >= limit:
  475. break
  476. return items
  477. app = FastAPI(title="News MCP Server")
  478. logger = logging.getLogger("news_mcp.startup")
  479. app.mount("/mcp", mcp.sse_app())
  480. _background_task_started = False
  481. @app.on_event("startup")
  482. async def _start_background_refresh():
  483. global _background_task_started
  484. if _background_task_started:
  485. return
  486. if not NEWS_BACKGROUND_REFRESH_ENABLED:
  487. return
  488. _background_task_started = True
  489. logger.info("news-mcp llm config: %s", active_llm_config())
  490. store = SQLiteClusterStore(DB_PATH)
  491. prune_result = store.prune_if_due(
  492. pruning_enabled=NEWS_PRUNING_ENABLED,
  493. retention_days=NEWS_RETENTION_DAYS,
  494. interval_hours=NEWS_PRUNE_INTERVAL_HOURS,
  495. )
  496. logger.info("startup prune_result=%s", prune_result)
  497. async def _loop():
  498. if not NEWS_BACKGROUND_REFRESH_ON_START:
  499. await asyncio.sleep(float(NEWS_REFRESH_INTERVAL_SECONDS))
  500. while True:
  501. try:
  502. # Refresh all topics by passing topic=None
  503. await refresh_clusters(topic=None, limit=200)
  504. except Exception:
  505. # Avoid crashing the server on network errors.
  506. pass
  507. await asyncio.sleep(float(NEWS_REFRESH_INTERVAL_SECONDS))
  508. asyncio.create_task(_loop())
  509. @app.get("/")
  510. def root():
  511. return {
  512. "status": "ok",
  513. "transport": "fastmcp+sse",
  514. "mount": "/mcp",
  515. "tools": ["get_latest_events", "get_events_for_entity", "get_event_summary", "detect_emerging_topics"],
  516. "refresh": {
  517. "enabled": NEWS_BACKGROUND_REFRESH_ENABLED,
  518. "interval_seconds": NEWS_REFRESH_INTERVAL_SECONDS,
  519. },
  520. "retention": {
  521. "lookback_hours": DEFAULT_LOOKBACK_HOURS,
  522. "retention_days": NEWS_RETENTION_DAYS,
  523. },
  524. "pruning": {
  525. "enabled": NEWS_PRUNING_ENABLED,
  526. "interval_hours": NEWS_PRUNE_INTERVAL_HOURS,
  527. },
  528. }
  529. @app.get("/health")
  530. def health():
  531. store = SQLiteClusterStore(DB_PATH)
  532. return {
  533. "status": "ok",
  534. "lookback_hours": DEFAULT_LOOKBACK_HOURS,
  535. "db": str(DB_PATH),
  536. "refresh": store.get_feed_state("breakingthenews"),
  537. "pruning": store.get_prune_state(
  538. pruning_enabled=NEWS_PRUNING_ENABLED,
  539. retention_days=NEWS_RETENTION_DAYS,
  540. interval_hours=NEWS_PRUNE_INTERVAL_HOURS,
  541. ),
  542. }