mcp_server_fastmcp.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568
  1. from __future__ import annotations
  2. import asyncio
  3. import logging
  4. from collections import Counter
  5. from datetime import datetime, timezone
  6. from email.utils import parsedate_to_datetime
  7. from fastapi import FastAPI
  8. from mcp.server.fastmcp import FastMCP
  9. from mcp.server.transport_security import TransportSecuritySettings
  10. from news_mcp.config import DEFAULT_LOOKBACK_HOURS, DEFAULT_TOPICS, DB_PATH
  11. from news_mcp.config import (
  12. NEWS_PRUNE_INTERVAL_HOURS,
  13. NEWS_PRUNING_ENABLED,
  14. NEWS_REFRESH_INTERVAL_SECONDS,
  15. NEWS_BACKGROUND_REFRESH_ENABLED,
  16. NEWS_BACKGROUND_REFRESH_ON_START,
  17. NEWS_RETENTION_DAYS,
  18. )
  19. from news_mcp.jobs.poller import refresh_clusters
  20. from news_mcp.storage.sqlite_store import SQLiteClusterStore
  21. from news_mcp.enrichment.llm_enrich import summarize_cluster_llm
  22. from news_mcp.trends_resolution import resolve_entity_via_trends
  23. from news_mcp.llm import active_llm_config
  24. from news_mcp.entity_normalize import normalize_query
  25. from news_mcp.related_entities import related_recent_entities
  26. mcp = FastMCP(
  27. "news-mcp",
  28. transport_security=TransportSecuritySettings(enable_dns_rebinding_protection=False),
  29. )
  30. def _cluster_entity_haystack(cluster: dict) -> list[str]:
  31. """Collect the normalized entity clues attached to a cluster."""
  32. values: list[str] = []
  33. for ent in cluster.get("entities", []) or []:
  34. values.append(str(ent).strip().lower())
  35. for res in cluster.get("entityResolutions", []) or []:
  36. if not isinstance(res, dict):
  37. continue
  38. for key in ("normalized", "canonical_label", "mid"):
  39. val = res.get(key)
  40. if val:
  41. values.append(str(val).strip().lower())
  42. return [v for v in values if v]
  43. def _parse_cluster_timestamp(value) -> datetime:
  44. if not value:
  45. return datetime.min.replace(tzinfo=timezone.utc)
  46. text = str(value).strip()
  47. if not text:
  48. return datetime.min.replace(tzinfo=timezone.utc)
  49. try:
  50. dt = datetime.fromisoformat(text.replace("Z", "+00:00"))
  51. if dt.tzinfo is None:
  52. dt = dt.replace(tzinfo=timezone.utc)
  53. return dt.astimezone(timezone.utc)
  54. except Exception:
  55. pass
  56. try:
  57. dt = parsedate_to_datetime(text)
  58. if dt.tzinfo is None:
  59. dt = dt.replace(tzinfo=timezone.utc)
  60. return dt.astimezone(timezone.utc)
  61. except Exception:
  62. return datetime.min.replace(tzinfo=timezone.utc)
  63. def _sort_clusters_by_recency(clusters: list[dict]) -> list[dict]:
  64. return sorted(
  65. clusters,
  66. key=lambda c: (
  67. _parse_cluster_timestamp(c.get("timestamp")),
  68. float(c.get("importance", 0.0) or 0.0),
  69. ),
  70. reverse=True,
  71. )
  72. @mcp.tool(description="Investigate a topic and return the newest deduplicated news clusters, sorted by recency.")
  73. async def get_latest_events(topic: str = "crypto", limit: int = 5, include_articles: bool = False):
  74. limit = max(1, min(int(limit), 20))
  75. # If the caller passes an entity-like value, resolve it and use the canonical
  76. # entity as the query lens. Otherwise keep the original topic path.
  77. topic_norm = normalize_query(topic).lower()
  78. resolved = resolve_entity_via_trends(topic_norm)
  79. allowed = {t.lower() for t in DEFAULT_TOPICS}
  80. is_topic = topic_norm in allowed
  81. query_terms = {
  82. topic_norm,
  83. str(resolved.get("normalized") or "").strip().lower(),
  84. str(resolved.get("canonical_label") or "").strip().lower(),
  85. str(resolved.get("mid") or "").strip().lower(),
  86. }
  87. query_terms = {q for q in query_terms if q}
  88. store = SQLiteClusterStore(DB_PATH)
  89. if is_topic:
  90. # Cache-first: only refresh if we currently have no fresh clusters for this topic.
  91. clusters = store.get_latest_clusters(topic=topic_norm, ttl_hours=DEFAULT_LOOKBACK_HOURS, limit=limit)
  92. if not clusters:
  93. await refresh_clusters(topic=topic_norm, limit=200)
  94. clusters = store.get_latest_clusters(topic=topic_norm, ttl_hours=DEFAULT_LOOKBACK_HOURS, limit=limit)
  95. else:
  96. # Entity-aware mode: search recent clusters across all topics and match by
  97. # raw entity, canonical label, or MID.
  98. clusters = store.get_latest_clusters_all_topics(ttl_hours=DEFAULT_LOOKBACK_HOURS, limit=limit * 8)
  99. filtered = []
  100. for c in clusters:
  101. haystack = _cluster_entity_haystack(c)
  102. if any(any(term in item for item in haystack) for term in query_terms):
  103. filtered.append(c)
  104. if len(filtered) >= limit:
  105. break
  106. clusters = filtered
  107. out = []
  108. for c in _sort_clusters_by_recency(clusters):
  109. item = {
  110. "cluster_id": c.get("cluster_id"),
  111. "headline": c.get("headline"),
  112. "summary": c.get("summary"),
  113. "entities": c.get("entities", []),
  114. "sentiment": c.get("sentiment", "neutral"),
  115. "importance": c.get("importance", 0.0),
  116. "sources": c.get("sources", []),
  117. "timestamp": c.get("timestamp"),
  118. }
  119. if include_articles:
  120. # Return minimal article fields to keep responses compact.
  121. arts = c.get("articles", []) or []
  122. item["articles"] = [
  123. {
  124. "title": a.get("title"),
  125. "url": a.get("url"),
  126. "source": a.get("source"),
  127. "timestamp": a.get("timestamp"),
  128. }
  129. for a in arts
  130. if isinstance(a, dict)
  131. ]
  132. out.append(item)
  133. return out
  134. @mcp.tool(description="Investigate a person, company, place, or theme by matching extracted entities within a time window.")
  135. async def get_events_for_entity(entity: str, limit: int = 10, timeframe: str = "24h", include_articles: bool = False):
  136. limit = max(1, min(int(limit), 30))
  137. query = normalize_query(entity).strip().lower()
  138. if not query:
  139. return []
  140. resolved = resolve_entity_via_trends(query)
  141. query_terms = {
  142. query,
  143. str(resolved.get("normalized") or "").strip().lower(),
  144. str(resolved.get("canonical_label") or "").strip().lower(),
  145. str(resolved.get("mid") or "").strip().lower(),
  146. }
  147. query_terms = {q for q in query_terms if q}
  148. store = SQLiteClusterStore(DB_PATH)
  149. def _match_clusters(clusters: list[dict]) -> list[dict]:
  150. hits: list[dict] = []
  151. for c in _sort_clusters_by_recency(clusters):
  152. haystack = _cluster_entity_haystack(c)
  153. if any(any(term in item for item in haystack) for term in query_terms):
  154. hits.append(c)
  155. if len(hits) >= limit:
  156. break
  157. return hits
  158. hours = _parse_timeframe_to_hours(timeframe)
  159. clusters = store.get_latest_clusters_all_topics(ttl_hours=hours, limit=max(200, limit * 10))
  160. hits = _match_clusters(clusters)
  161. out = []
  162. for c in hits:
  163. item = {
  164. "cluster_id": c.get("cluster_id"),
  165. "headline": c.get("headline"),
  166. "summary": c.get("summary"),
  167. "entities": c.get("entities", []),
  168. "sentiment": c.get("sentiment", "neutral"),
  169. "importance": c.get("importance", 0.0),
  170. "sources": c.get("sources", []),
  171. "timestamp": c.get("timestamp"),
  172. }
  173. if include_articles:
  174. arts = c.get("articles", []) or []
  175. item["articles"] = [
  176. {
  177. "title": a.get("title"),
  178. "url": a.get("url"),
  179. "source": a.get("source"),
  180. "timestamp": a.get("timestamp"),
  181. }
  182. for a in arts
  183. if isinstance(a, dict)
  184. ]
  185. out.append(item)
  186. return out
  187. @mcp.tool(description="Return entities most commonly associated with the subject in recent clusters, optionally blended with Google Trends suggestions.")
  188. async def get_related_recent_entities(subject: str, timeframe: str = "72h", limit: int = 10, include_trends: bool = True):
  189. limit = max(1, min(int(limit), 25))
  190. hours = _parse_timeframe_to_hours(timeframe)
  191. include_trends_bool = str(include_trends).strip().lower() not in {"false", "0", "no"}
  192. store = SQLiteClusterStore(DB_PATH)
  193. result = related_recent_entities(
  194. store=store,
  195. subject=subject,
  196. timeframe_hours=hours,
  197. limit=limit,
  198. include_trends=include_trends_bool,
  199. )
  200. return result
  201. @mcp.tool(description="Investigate one cluster in depth and return a concise LLM-written explanation plus key facts.")
  202. async def get_event_summary(event_id: str, include_articles: bool = False):
  203. store = SQLiteClusterStore(DB_PATH)
  204. # Summary cache: reuse if present within TTL.
  205. cached_summary = store.get_cluster_summary(
  206. cluster_id=event_id,
  207. ttl_hours=DEFAULT_LOOKBACK_HOURS,
  208. )
  209. if cached_summary:
  210. out = {
  211. "event_id": event_id,
  212. "headline": cached_summary.get("headline"),
  213. "mergedSummary": cached_summary.get("mergedSummary"),
  214. "keyFacts": cached_summary.get("keyFacts", []),
  215. "sources": cached_summary.get("sources", []),
  216. }
  217. if include_articles:
  218. cluster = store.get_cluster_by_id(event_id)
  219. arts = (cluster or {}).get("articles", []) or []
  220. out["articles"] = [
  221. {
  222. "title": a.get("title"),
  223. "url": a.get("url"),
  224. "source": a.get("source"),
  225. "timestamp": a.get("timestamp"),
  226. }
  227. for a in arts
  228. if isinstance(a, dict)
  229. ]
  230. return out
  231. cluster = store.get_cluster_by_id(event_id)
  232. if not cluster:
  233. return {
  234. "event_id": event_id,
  235. "error": "NOT_FOUND",
  236. }
  237. articles_out = None
  238. if include_articles:
  239. arts = cluster.get("articles", []) or []
  240. articles_out = [
  241. {
  242. "title": a.get("title"),
  243. "url": a.get("url"),
  244. "source": a.get("source"),
  245. "timestamp": a.get("timestamp"),
  246. }
  247. for a in arts
  248. if isinstance(a, dict)
  249. ]
  250. summary = await summarize_cluster_llm(cluster)
  251. store.upsert_cluster_summary(event_id, summary)
  252. out = {
  253. "event_id": event_id,
  254. "headline": summary.get("headline"),
  255. "mergedSummary": summary.get("mergedSummary"),
  256. "keyFacts": summary.get("keyFacts", []),
  257. "sources": summary.get("sources", []),
  258. }
  259. if include_articles:
  260. out["articles"] = articles_out or []
  261. return out
  262. @mcp.tool(description="Explore what is starting to matter: surface emerging entities and phrases from recent clusters.")
  263. async def detect_emerging_topics(limit: int = 10):
  264. limit = max(1, min(int(limit), 20))
  265. store = SQLiteClusterStore(DB_PATH)
  266. clusters = store.get_latest_clusters_all_topics(ttl_hours=DEFAULT_LOOKBACK_HOURS, limit=200)
  267. import re
  268. entity_counts = Counter()
  269. entity_importance_sum = Counter()
  270. # co-occurrence: ent -> other_ent -> count
  271. entity_cooccur = {}
  272. phrase_counts = Counter()
  273. topic_counts = Counter()
  274. # Very light heuristics to reduce “meta entities” dominating emerging topics.
  275. # Keep it conservative: only skip obvious boilerplate.
  276. def _is_generic_entity(ent: str) -> bool:
  277. e = str(ent).strip().lower()
  278. if not e:
  279. return True
  280. if len(e) < 4:
  281. return True
  282. # common outlet-ish / meta-ish tokens
  283. if e in {"news", "latest", "breaking"}:
  284. return True
  285. return False
  286. for c in clusters:
  287. topic_counts[c.get("topic", "other")] += 1
  288. ents_in_cluster = [e for e in (c.get("entities", []) or []) if not _is_generic_entity(e)]
  289. ents_in_cluster_norm = [str(e).strip().lower() for e in ents_in_cluster if str(e).strip()]
  290. for ent in ents_in_cluster_norm:
  291. if _is_generic_entity(ent):
  292. continue
  293. entity_counts[ent] += 1
  294. try:
  295. entity_importance_sum[ent] += float(c.get("importance", 0.0) or 0.0)
  296. except Exception:
  297. pass
  298. # update co-occurrence counts
  299. for i in range(len(ents_in_cluster_norm)):
  300. a = ents_in_cluster_norm[i]
  301. if not a:
  302. continue
  303. entity_cooccur.setdefault(a, Counter())
  304. for j in range(len(ents_in_cluster_norm)):
  305. if i == j:
  306. continue
  307. b = ents_in_cluster_norm[j]
  308. if not b:
  309. continue
  310. entity_cooccur[a][b] += 1
  311. text = f"{c.get('headline','')} {c.get('summary','')}"
  312. words = [w for w in re.findall(r"[A-Za-z][A-Za-z0-9\-]{2,}", text.lower())]
  313. for i in range(len(words) - 1):
  314. phrase = f"{words[i]} {words[i+1]}"
  315. if len(phrase) > 6:
  316. phrase_counts[phrase] += 1
  317. emerging = []
  318. # Combine frequency with average importance so “big signal” rises over pure repetition.
  319. for ent, count in entity_counts.most_common(limit):
  320. avg_imp = entity_importance_sum[ent] / max(1, count)
  321. # avg_imp is typically 0..~1; keep score bounded.
  322. trend_score = 0.25 + 0.40 * min(1.0, avg_imp) + 0.08 * min(6.0, float(count))
  323. related = []
  324. for other, _cnt in (entity_cooccur.get(ent) or Counter()).most_common(3):
  325. # avoid returning the entity itself (shouldn't happen, but be safe)
  326. if other != ent:
  327. related.append(other)
  328. emerging.append({
  329. "topic": ent,
  330. "trend_score": min(0.99, round(trend_score, 2)),
  331. "related_entities": related if related else [ent],
  332. "signal_type": "entity",
  333. "count": count,
  334. "avg_importance": round(avg_imp, 3),
  335. })
  336. for phrase, count in phrase_counts.most_common(limit * 2):
  337. if any(item["topic"] == phrase for item in emerging):
  338. continue
  339. emerging.append({
  340. "topic": phrase.title(),
  341. "trend_score": min(0.99, round(0.20 + 0.10 * count, 2)),
  342. "related_entities": [],
  343. "signal_type": "phrase",
  344. "count": count,
  345. })
  346. if len(emerging) >= limit:
  347. break
  348. return emerging[:limit]
  349. @mcp.tool(description="Investigate whether sentiment around an entity is positive, negative, or neutral over a chosen lookback window.")
  350. async def get_news_sentiment(entity: str, timeframe: str = "24h"):
  351. store = SQLiteClusterStore(DB_PATH)
  352. ent = normalize_query(entity).strip().lower()
  353. resolved = resolve_entity_via_trends(ent)
  354. query_terms = {
  355. ent,
  356. str(resolved.get("normalized") or "").strip().lower(),
  357. str(resolved.get("canonical_label") or "").strip().lower(),
  358. str(resolved.get("mid") or "").strip().lower(),
  359. }
  360. query_terms = {q for q in query_terms if q}
  361. if not ent:
  362. return {
  363. "entity": entity,
  364. "sentiment": "neutral",
  365. "score": 0.0,
  366. "cluster_count": 0,
  367. }
  368. # timeframe: accept '24h' or '24'
  369. tf = str(timeframe).strip().lower()
  370. try:
  371. hours = int(tf[:-1]) if tf.endswith("h") else int(tf)
  372. except Exception:
  373. hours = 24
  374. hours = max(1, min(int(hours), 168))
  375. clusters = store.get_latest_clusters_all_topics(ttl_hours=hours, limit=500)
  376. matched = []
  377. for c in clusters:
  378. haystack = _cluster_entity_haystack(c)
  379. if any(any(term in item for item in haystack) for term in query_terms):
  380. matched.append(c)
  381. if not matched:
  382. return {
  383. "entity": entity,
  384. "sentiment": "neutral",
  385. "score": 0.0,
  386. "cluster_count": 0,
  387. }
  388. scores = []
  389. for c in matched:
  390. s = c.get("sentimentScore")
  391. if s is not None:
  392. try:
  393. scores.append(float(s))
  394. except Exception:
  395. pass
  396. avg_score = sum(scores) / len(scores) if scores else 0.0
  397. # Keep the label aligned with the numeric score.
  398. # Small magnitudes are treated as neutral to avoid noisy label flips.
  399. if avg_score >= 0.15:
  400. sentiment = "positive"
  401. elif avg_score <= -0.15:
  402. sentiment = "negative"
  403. else:
  404. sentiment = "neutral"
  405. return {
  406. "entity": entity,
  407. "sentiment": sentiment,
  408. "score": round(avg_score, 3),
  409. "cluster_count": len(matched),
  410. }
  411. def _parse_timeframe_to_hours(timeframe: str) -> int:
  412. tf = str(timeframe).strip().lower()
  413. try:
  414. if tf.endswith("d"):
  415. days = int(tf[:-1])
  416. return max(1, days * 24)
  417. if tf.endswith("h"):
  418. return max(1, int(tf[:-1]))
  419. return max(1, int(tf))
  420. except Exception:
  421. return 24
  422. app = FastAPI(title="News MCP Server")
  423. logger = logging.getLogger("news_mcp.startup")
  424. app.mount("/mcp", mcp.sse_app())
  425. _background_task_started = False
  426. @app.on_event("startup")
  427. async def _start_background_refresh():
  428. global _background_task_started
  429. if _background_task_started:
  430. return
  431. if not NEWS_BACKGROUND_REFRESH_ENABLED:
  432. return
  433. _background_task_started = True
  434. logger.info("news-mcp llm config: %s", active_llm_config())
  435. store = SQLiteClusterStore(DB_PATH)
  436. prune_result = store.prune_if_due(
  437. pruning_enabled=NEWS_PRUNING_ENABLED,
  438. retention_days=NEWS_RETENTION_DAYS,
  439. interval_hours=NEWS_PRUNE_INTERVAL_HOURS,
  440. )
  441. logger.info("startup prune_result=%s", prune_result)
  442. async def _loop():
  443. if not NEWS_BACKGROUND_REFRESH_ON_START:
  444. await asyncio.sleep(float(NEWS_REFRESH_INTERVAL_SECONDS))
  445. while True:
  446. try:
  447. # Refresh all topics by passing topic=None
  448. await refresh_clusters(topic=None, limit=200)
  449. except Exception:
  450. # Avoid crashing the server on network errors.
  451. pass
  452. await asyncio.sleep(float(NEWS_REFRESH_INTERVAL_SECONDS))
  453. asyncio.create_task(_loop())
  454. @app.get("/")
  455. def root():
  456. return {
  457. "status": "ok",
  458. "transport": "fastmcp+sse",
  459. "mount": "/mcp",
  460. "tools": [
  461. "get_latest_events",
  462. "get_events_for_entity",
  463. "get_event_summary",
  464. "detect_emerging_topics",
  465. "get_news_sentiment",
  466. "get_related_recent_entities",
  467. ],
  468. "refresh": {
  469. "enabled": NEWS_BACKGROUND_REFRESH_ENABLED,
  470. "interval_seconds": NEWS_REFRESH_INTERVAL_SECONDS,
  471. },
  472. "retention": {
  473. "lookback_hours": DEFAULT_LOOKBACK_HOURS,
  474. "retention_days": NEWS_RETENTION_DAYS,
  475. },
  476. "pruning": {
  477. "enabled": NEWS_PRUNING_ENABLED,
  478. "interval_hours": NEWS_PRUNE_INTERVAL_HOURS,
  479. },
  480. }
  481. @app.get("/health")
  482. def health():
  483. store = SQLiteClusterStore(DB_PATH)
  484. return {
  485. "status": "ok",
  486. "lookback_hours": DEFAULT_LOOKBACK_HOURS,
  487. "db": str(DB_PATH),
  488. "refresh": store.get_feed_state("breakingthenews"),
  489. "pruning": store.get_prune_state(
  490. pruning_enabled=NEWS_PRUNING_ENABLED,
  491. retention_days=NEWS_RETENTION_DAYS,
  492. interval_hours=NEWS_PRUNE_INTERVAL_HOURS,
  493. ),
  494. }