mcp_server_fastmcp.py 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871
  1. from __future__ import annotations
  2. import asyncio
  3. import logging
  4. import time
  5. from collections import Counter
  6. from datetime import datetime, timezone
  7. from email.utils import parsedate_to_datetime
  8. from fastapi import FastAPI
  9. from mcp.server.fastmcp import FastMCP
  10. from mcp.server.transport_security import TransportSecuritySettings
  11. from news_mcp.config import DEFAULT_LOOKBACK_HOURS, DEFAULT_TOPICS, DB_PATH
  12. from news_mcp.config import (
  13. NEWS_PRUNE_INTERVAL_HOURS,
  14. NEWS_PRUNING_ENABLED,
  15. NEWS_REFRESH_INTERVAL_SECONDS,
  16. NEWS_BACKGROUND_REFRESH_ENABLED,
  17. NEWS_BACKGROUND_REFRESH_ON_START,
  18. NEWS_RETENTION_DAYS,
  19. )
  20. from news_mcp.jobs.poller import refresh_clusters
  21. from news_mcp.storage.sqlite_store import SQLiteClusterStore
  22. from news_mcp.dashboard.dashboard_store import DashboardStore
  23. from news_mcp.enrichment.llm_enrich import summarize_cluster_llm
  24. from news_mcp.trends_resolution import resolve_entity_via_trends
  25. from news_mcp.llm import active_llm_config
  26. from news_mcp.entity_normalize import normalize_query
  27. from news_mcp.related_entities import related_recent_entities
  28. logging.basicConfig(
  29. level=logging.INFO,
  30. format="%(asctime)s %(levelname)s %(name)s: %(message)s",
  31. )
  32. _PROCESS_STARTED_AT = time.monotonic()
  33. mcp = FastMCP(
  34. "news-mcp",
  35. transport_security=TransportSecuritySettings(enable_dns_rebinding_protection=False),
  36. )
  37. def _cluster_entity_haystack(cluster: dict) -> list[str]:
  38. """Collect the normalized entity clues attached to a cluster."""
  39. values: list[str] = []
  40. for ent in cluster.get("entities", []) or []:
  41. values.append(str(ent).strip().lower())
  42. for res in cluster.get("entityResolutions", []) or []:
  43. if not isinstance(res, dict):
  44. continue
  45. for key in ("normalized", "canonical_label", "mid"):
  46. val = res.get(key)
  47. if val:
  48. values.append(str(val).strip().lower())
  49. return [v for v in values if v]
  50. def _parse_cluster_timestamp(value) -> datetime:
  51. if not value:
  52. return datetime.min.replace(tzinfo=timezone.utc)
  53. text = str(value).strip()
  54. if not text:
  55. return datetime.min.replace(tzinfo=timezone.utc)
  56. try:
  57. dt = datetime.fromisoformat(text.replace("Z", "+00:00"))
  58. if dt.tzinfo is None:
  59. dt = dt.replace(tzinfo=timezone.utc)
  60. return dt.astimezone(timezone.utc)
  61. except Exception:
  62. pass
  63. try:
  64. dt = parsedate_to_datetime(text)
  65. if dt.tzinfo is None:
  66. dt = dt.replace(tzinfo=timezone.utc)
  67. return dt.astimezone(timezone.utc)
  68. except Exception:
  69. return datetime.min.replace(tzinfo=timezone.utc)
  70. def _sort_clusters_by_recency(clusters: list[dict]) -> list[dict]:
  71. return sorted(
  72. clusters,
  73. key=lambda c: (
  74. _parse_cluster_timestamp(c.get("timestamp")),
  75. float(c.get("importance", 0.0) or 0.0),
  76. ),
  77. reverse=True,
  78. )
  79. def _tool_card(name: str, description: str, inputs: list[dict], outputs: list[str], notes: list[str] | None = None) -> dict:
  80. return {
  81. "name": name,
  82. "description": description,
  83. "inputs": inputs,
  84. "outputs": outputs,
  85. "notes": notes or [],
  86. }
  87. NEWS_TOOL_CARDS = [
  88. _tool_card(
  89. "get_latest_events",
  90. "Get the newest deduplicated clusters for a topic or resolved entity-like query.",
  91. [
  92. {"name": "topic", "type": "string", "default": "all topics", "meaning": "coarse category (crypto, macro, regulation, ai, other), entity-like topic, or omit for all topics"},
  93. {"name": "limit", "type": "integer", "default": 5, "range": "1-20"},
  94. {"name": "include_articles", "type": "boolean", "default": False},
  95. ],
  96. ["headline", "summary", "entities", "sentiment", "importance", "sources", "timestamp", "articles?"],
  97. ["Use when you want the freshest clusters and are willing to let the server decide topic vs entity mode."],
  98. ),
  99. _tool_card(
  100. "get_events_for_entity",
  101. "Search recent clusters for a person, place, company, or theme by entity matching.",
  102. [
  103. {"name": "entity", "type": "string", "meaning": "entity label or phrase"},
  104. {"name": "timeframe", "type": "string", "default": "24h", "examples": ["24h", "72h", "3d"]},
  105. {"name": "limit", "type": "integer", "default": 10, "range": "1-30"},
  106. {"name": "include_articles", "type": "boolean", "default": False},
  107. ],
  108. ["headline", "summary", "entities", "sentiment", "importance", "sources", "timestamp", "articles?"],
  109. ["Normalization is automatic; use this for an entity-centered deep dive."],
  110. ),
  111. _tool_card(
  112. "get_event_summary",
  113. "Produce a concise LLM-written explanation for one cluster and key facts.",
  114. [
  115. {"name": "event_id", "type": "string", "meaning": "cluster_id; do not surface in user-facing prose"},
  116. {"name": "include_articles", "type": "boolean", "default": False},
  117. ],
  118. ["headline", "mergedSummary", "keyFacts", "sources", "articles?"],
  119. ["Prefer this after you have already chosen a specific cluster to explain."],
  120. ),
  121. _tool_card(
  122. "detect_emerging_topics",
  123. "Surface entities and phrases starting to matter in the recent window.",
  124. [{"name": "limit", "type": "integer", "default": 10, "range": "1-20"}],
  125. ["topic", "trend_score", "related_entities", "signal_type", "count", "avg_importance"],
  126. ["Good for 'what is heating up?' style questions."],
  127. ),
  128. _tool_card(
  129. "get_news_sentiment",
  130. "Estimate sentiment around an entity over a lookback window.",
  131. [
  132. {"name": "entity", "type": "string"},
  133. {"name": "timeframe", "type": "string", "default": "24h"},
  134. ],
  135. ["entity", "sentiment", "score", "cluster_count"],
  136. ["Use after locating a cluster set or entity neighborhood."],
  137. ),
  138. _tool_card(
  139. "get_related_recent_entities",
  140. "Blend local co-occurrence with Google Trends related topics, while preserving mids where available.",
  141. [
  142. {"name": "subject", "type": "string", "meaning": "canonical entity or subject phrase"},
  143. {"name": "timeframe", "type": "string", "default": "72h"},
  144. {"name": "limit", "type": "integer", "default": 10, "range": "1-25"},
  145. {"name": "include_trends", "type": "boolean", "default": True},
  146. ],
  147. ["subject", "related[].normalized", "related[].canonical_label", "related[].mid", "related[].sources", "related[].scores"],
  148. ["Use this to drill from a subject into related entities, then feed those into get_events_for_entity."],
  149. ),
  150. ]
  151. NEWS_COMPOSITION_RECIPES = [
  152. {
  153. "name": "fresh-news-tail",
  154. "steps": [
  155. "get_latest_events(topic=...)",
  156. "optionally get_event_summary(event_id=...) for the strongest cluster",
  157. ],
  158. "notes": ["Best for a quick tail of what is happening now. Omit topic for all topics, or pass crypto/macro/regulation/ai/other to filter."]
  159. },
  160. {
  161. "name": "entity-deep-dive",
  162. "steps": [
  163. "get_events_for_entity(entity=...)",
  164. "get_event_summary(event_id=...)",
  165. "get_news_sentiment(entity=..., timeframe=...)",
  166. ],
  167. "notes": ["Prefer canonical entity labels when you have them; the server normalizes for you."],
  168. },
  169. {
  170. "name": "subject-neighborhood",
  171. "steps": [
  172. "get_related_recent_entities(subject=...)",
  173. "for each strong related entity, call get_events_for_entity(entity=...)",
  174. ],
  175. "notes": ["Use this when you want a graph-like expansion around a subject."]
  176. },
  177. {
  178. "name": "emerging-signal",
  179. "steps": [
  180. "detect_emerging_topics(limit=...)",
  181. "choose a topic/entity",
  182. "get_events_for_entity(entity=...)",
  183. "get_news_sentiment(entity=...)",
  184. ],
  185. "notes": ["Good for trend scouting and risk mapping."],
  186. },
  187. ]
  188. NEWS_AGENT_TIPS = [
  189. "If you need a fast answer, start with get_latest_events, then summarize the strongest cluster with get_event_summary.",
  190. "If a user asks about a person/place/company/theme, use get_events_for_entity before broadening to get_related_recent_entities.",
  191. "Treat cluster_id as an internal cursor, not user-facing output; use it only for follow-up tool calls.",
  192. "When describing clusters, keep sources and timestamps visible so the user can assess recency and provenance.",
  193. "Prefer a short chain of tools over many parallel calls unless you are building a neighborhood map or comparison table.",
  194. "For tricky names, rely on the server’s resolver instead of inventing alias rules in the client.",
  195. ]
  196. NEWS_EXAMPLE_CHAINS = [
  197. {
  198. "task": "What is happening now?",
  199. "chain": [
  200. "get_latest_events(topic=...)",
  201. "get_event_summary(event_id=...) if one cluster looks important",
  202. ],
  203. },
  204. {
  205. "task": "Deep dive on an entity",
  206. "chain": [
  207. "get_events_for_entity(entity=..., timeframe=...)",
  208. "get_news_sentiment(entity=..., timeframe=...)",
  209. "get_event_summary(event_id=...) for the strongest cluster",
  210. ],
  211. },
  212. {
  213. "task": "Broaden from a subject",
  214. "chain": [
  215. "get_related_recent_entities(subject=..., include_trends=true)",
  216. "get_events_for_entity(entity=...) for the strongest related entities",
  217. ],
  218. },
  219. {
  220. "task": "Find what is emerging",
  221. "chain": [
  222. "detect_emerging_topics(limit=...)",
  223. "get_events_for_entity(entity=...) on one or two emerging terms",
  224. ],
  225. },
  226. ]
  227. @mcp.tool(description="Investigate a topic and return the newest deduplicated news clusters, sorted by recency.")
  228. async def get_latest_events(topic: str | None = None, limit: int = 5, include_articles: bool = False):
  229. limit = max(1, min(int(limit), 20))
  230. # When topic is omitted, search across all topics (no topic filter).
  231. # When topic is provided and matches a known topic, filter by that topic.
  232. # Otherwise treat the value as an entity-like query.
  233. topic_norm = normalize_query(topic).lower() if topic else ""
  234. resolved = resolve_entity_via_trends(topic_norm) if topic_norm else {}
  235. allowed = {t.lower() for t in DEFAULT_TOPICS}
  236. is_topic = topic_norm in allowed
  237. is_all_topics = not topic_norm
  238. query_terms = {
  239. topic_norm,
  240. str(resolved.get("normalized") or "").strip().lower(),
  241. str(resolved.get("canonical_label") or "").strip().lower(),
  242. str(resolved.get("mid") or "").strip().lower(),
  243. }
  244. query_terms = {q for q in query_terms if q}
  245. store = SQLiteClusterStore(DB_PATH)
  246. if is_all_topics:
  247. # No topic specified: return freshest clusters across all topics.
  248. clusters = store.get_latest_clusters_all_topics(ttl_hours=DEFAULT_LOOKBACK_HOURS, limit=limit)
  249. elif is_topic:
  250. # Cache-first: only refresh if we currently have no fresh clusters for this topic.
  251. clusters = store.get_latest_clusters(topic=topic_norm, ttl_hours=DEFAULT_LOOKBACK_HOURS, limit=limit)
  252. if not clusters:
  253. await refresh_clusters(topic=topic_norm, limit=200)
  254. clusters = store.get_latest_clusters(topic=topic_norm, ttl_hours=DEFAULT_LOOKBACK_HOURS, limit=limit)
  255. else:
  256. # Entity-aware mode: search recent clusters across all topics and match by
  257. # raw entity, canonical label, or MID.
  258. clusters = store.get_latest_clusters_all_topics(ttl_hours=DEFAULT_LOOKBACK_HOURS, limit=limit * 8)
  259. filtered = []
  260. for c in clusters:
  261. haystack = _cluster_entity_haystack(c)
  262. if any(any(term in item for item in haystack) for term in query_terms):
  263. filtered.append(c)
  264. if len(filtered) >= limit:
  265. break
  266. clusters = filtered
  267. out = []
  268. for c in _sort_clusters_by_recency(clusters):
  269. item = {
  270. "cluster_id": c.get("cluster_id"),
  271. "headline": c.get("headline"),
  272. "summary": c.get("summary"),
  273. "entities": c.get("entities", []),
  274. "sentiment": c.get("sentiment", "neutral"),
  275. "importance": c.get("importance", 0.0),
  276. "sources": c.get("sources", []),
  277. "timestamp": c.get("timestamp"),
  278. }
  279. if include_articles:
  280. # Return minimal article fields to keep responses compact.
  281. arts = c.get("articles", []) or []
  282. item["articles"] = [
  283. {
  284. "title": a.get("title"),
  285. "url": a.get("url"),
  286. "source": a.get("source"),
  287. "timestamp": a.get("timestamp"),
  288. }
  289. for a in arts
  290. if isinstance(a, dict)
  291. ]
  292. out.append(item)
  293. return out
  294. @mcp.tool(description="Investigate a person, company, place, or theme by matching extracted entities within a time window.")
  295. async def get_events_for_entity(entity: str, limit: int = 10, timeframe: str = "24h", include_articles: bool = False):
  296. limit = max(1, min(int(limit), 30))
  297. query = normalize_query(entity).strip().lower()
  298. if not query:
  299. return []
  300. resolved = resolve_entity_via_trends(query)
  301. query_terms = {
  302. query,
  303. str(resolved.get("normalized") or "").strip().lower(),
  304. str(resolved.get("canonical_label") or "").strip().lower(),
  305. str(resolved.get("mid") or "").strip().lower(),
  306. }
  307. query_terms = {q for q in query_terms if q}
  308. store = SQLiteClusterStore(DB_PATH)
  309. def _match_clusters(clusters: list[dict]) -> list[dict]:
  310. hits: list[dict] = []
  311. for c in _sort_clusters_by_recency(clusters):
  312. haystack = _cluster_entity_haystack(c)
  313. if any(any(term in item for item in haystack) for term in query_terms):
  314. hits.append(c)
  315. if len(hits) >= limit:
  316. break
  317. return hits
  318. hours = _parse_timeframe_to_hours(timeframe)
  319. clusters = store.get_latest_clusters_all_topics(ttl_hours=hours, limit=max(200, limit * 10))
  320. hits = _match_clusters(clusters)
  321. out = []
  322. for c in hits:
  323. item = {
  324. "cluster_id": c.get("cluster_id"),
  325. "headline": c.get("headline"),
  326. "summary": c.get("summary"),
  327. "entities": c.get("entities", []),
  328. "sentiment": c.get("sentiment", "neutral"),
  329. "importance": c.get("importance", 0.0),
  330. "sources": c.get("sources", []),
  331. "timestamp": c.get("timestamp"),
  332. }
  333. if include_articles:
  334. arts = c.get("articles", []) or []
  335. item["articles"] = [
  336. {
  337. "title": a.get("title"),
  338. "url": a.get("url"),
  339. "source": a.get("source"),
  340. "timestamp": a.get("timestamp"),
  341. }
  342. for a in arts
  343. if isinstance(a, dict)
  344. ]
  345. out.append(item)
  346. return out
  347. @mcp.tool(description="Return entities most commonly associated with the subject in recent clusters, optionally blended with Google Trends suggestions.")
  348. async def get_related_recent_entities(subject: str, timeframe: str = "72h", limit: int = 10, include_trends: bool = True):
  349. limit = max(1, min(int(limit), 25))
  350. hours = _parse_timeframe_to_hours(timeframe)
  351. include_trends_bool = str(include_trends).strip().lower() not in {"false", "0", "no"}
  352. store = SQLiteClusterStore(DB_PATH)
  353. result = related_recent_entities(
  354. store=store,
  355. subject=subject,
  356. timeframe_hours=hours,
  357. limit=limit,
  358. include_trends=include_trends_bool,
  359. )
  360. return result
  361. @mcp.tool(description="Investigate one cluster in depth and return a concise LLM-written explanation plus key facts.")
  362. async def get_event_summary(event_id: str, include_articles: bool = False):
  363. store = SQLiteClusterStore(DB_PATH)
  364. # Summary cache: reuse if present within TTL.
  365. cached_summary = store.get_cluster_summary(
  366. cluster_id=event_id,
  367. ttl_hours=DEFAULT_LOOKBACK_HOURS,
  368. )
  369. if cached_summary:
  370. out = {
  371. "event_id": event_id,
  372. "headline": cached_summary.get("headline"),
  373. "mergedSummary": cached_summary.get("mergedSummary"),
  374. "keyFacts": cached_summary.get("keyFacts", []),
  375. "sources": cached_summary.get("sources", []),
  376. }
  377. if include_articles:
  378. cluster = store.get_cluster_by_id(event_id)
  379. arts = (cluster or {}).get("articles", []) or []
  380. out["articles"] = [
  381. {
  382. "title": a.get("title"),
  383. "url": a.get("url"),
  384. "source": a.get("source"),
  385. "timestamp": a.get("timestamp"),
  386. }
  387. for a in arts
  388. if isinstance(a, dict)
  389. ]
  390. return out
  391. cluster = store.get_cluster_by_id(event_id)
  392. if not cluster:
  393. return {
  394. "event_id": event_id,
  395. "error": "NOT_FOUND",
  396. }
  397. articles_out = None
  398. if include_articles:
  399. arts = cluster.get("articles", []) or []
  400. articles_out = [
  401. {
  402. "title": a.get("title"),
  403. "url": a.get("url"),
  404. "source": a.get("source"),
  405. "timestamp": a.get("timestamp"),
  406. }
  407. for a in arts
  408. if isinstance(a, dict)
  409. ]
  410. summary = await summarize_cluster_llm(cluster)
  411. store.upsert_cluster_summary(event_id, summary)
  412. out = {
  413. "event_id": event_id,
  414. "headline": summary.get("headline"),
  415. "mergedSummary": summary.get("mergedSummary"),
  416. "keyFacts": summary.get("keyFacts", []),
  417. "sources": summary.get("sources", []),
  418. }
  419. if include_articles:
  420. out["articles"] = articles_out or []
  421. return out
  422. @mcp.tool(description="Explore what is starting to matter: surface emerging entities and phrases from recent clusters.")
  423. async def detect_emerging_topics(limit: int = 10):
  424. limit = max(1, min(int(limit), 20))
  425. store = SQLiteClusterStore(DB_PATH)
  426. clusters = store.get_latest_clusters_all_topics(ttl_hours=DEFAULT_LOOKBACK_HOURS, limit=200)
  427. import re
  428. entity_counts = Counter()
  429. entity_importance_sum = Counter()
  430. # co-occurrence: ent -> other_ent -> count
  431. entity_cooccur = {}
  432. phrase_counts = Counter()
  433. topic_counts = Counter()
  434. # Very light heuristics to reduce “meta entities” dominating emerging topics.
  435. # Keep it conservative: only skip obvious boilerplate.
  436. def _is_generic_entity(ent: str) -> bool:
  437. e = str(ent).strip().lower()
  438. if not e:
  439. return True
  440. if len(e) < 4:
  441. return True
  442. # common outlet-ish / meta-ish tokens
  443. if e in {"news", "latest", "breaking"}:
  444. return True
  445. return False
  446. for c in clusters:
  447. topic_counts[c.get("topic", "other")] += 1
  448. ents_in_cluster = [e for e in (c.get("entities", []) or []) if not _is_generic_entity(e)]
  449. ents_in_cluster_norm = [str(e).strip().lower() for e in ents_in_cluster if str(e).strip()]
  450. for ent in ents_in_cluster_norm:
  451. if _is_generic_entity(ent):
  452. continue
  453. entity_counts[ent] += 1
  454. try:
  455. entity_importance_sum[ent] += float(c.get("importance", 0.0) or 0.0)
  456. except Exception:
  457. pass
  458. # update co-occurrence counts
  459. for i in range(len(ents_in_cluster_norm)):
  460. a = ents_in_cluster_norm[i]
  461. if not a:
  462. continue
  463. entity_cooccur.setdefault(a, Counter())
  464. for j in range(len(ents_in_cluster_norm)):
  465. if i == j:
  466. continue
  467. b = ents_in_cluster_norm[j]
  468. if not b:
  469. continue
  470. entity_cooccur[a][b] += 1
  471. text = f"{c.get('headline','')} {c.get('summary','')}"
  472. words = [w for w in re.findall(r"[A-Za-z][A-Za-z0-9\-]{2,}", text.lower())]
  473. for i in range(len(words) - 1):
  474. phrase = f"{words[i]} {words[i+1]}"
  475. if len(phrase) > 6:
  476. phrase_counts[phrase] += 1
  477. emerging = []
  478. # Combine frequency with average importance so “big signal” rises over pure repetition.
  479. for ent, count in entity_counts.most_common(limit):
  480. avg_imp = entity_importance_sum[ent] / max(1, count)
  481. # avg_imp is typically 0..~1; keep score bounded.
  482. trend_score = 0.25 + 0.40 * min(1.0, avg_imp) + 0.08 * min(6.0, float(count))
  483. related = []
  484. for other, _cnt in (entity_cooccur.get(ent) or Counter()).most_common(3):
  485. # avoid returning the entity itself (shouldn't happen, but be safe)
  486. if other != ent:
  487. related.append(other)
  488. emerging.append({
  489. "topic": ent,
  490. "trend_score": min(0.99, round(trend_score, 2)),
  491. "related_entities": related if related else [ent],
  492. "signal_type": "entity",
  493. "count": count,
  494. "avg_importance": round(avg_imp, 3),
  495. })
  496. for phrase, count in phrase_counts.most_common(limit * 2):
  497. if any(item["topic"] == phrase for item in emerging):
  498. continue
  499. emerging.append({
  500. "topic": phrase.title(),
  501. "trend_score": min(0.99, round(0.20 + 0.10 * count, 2)),
  502. "related_entities": [],
  503. "signal_type": "phrase",
  504. "count": count,
  505. })
  506. if len(emerging) >= limit:
  507. break
  508. return emerging[:limit]
  509. @mcp.tool(description="Investigate whether sentiment around an entity is positive, negative, or neutral over a chosen lookback window.")
  510. async def get_news_sentiment(entity: str, timeframe: str = "24h"):
  511. store = SQLiteClusterStore(DB_PATH)
  512. ent = normalize_query(entity).strip().lower()
  513. resolved = resolve_entity_via_trends(ent)
  514. query_terms = {
  515. ent,
  516. str(resolved.get("normalized") or "").strip().lower(),
  517. str(resolved.get("canonical_label") or "").strip().lower(),
  518. str(resolved.get("mid") or "").strip().lower(),
  519. }
  520. query_terms = {q for q in query_terms if q}
  521. if not ent:
  522. return {
  523. "entity": entity,
  524. "sentiment": "neutral",
  525. "score": 0.0,
  526. "cluster_count": 0,
  527. }
  528. # timeframe: accept '24h' or '24'
  529. tf = str(timeframe).strip().lower()
  530. try:
  531. hours = int(tf[:-1]) if tf.endswith("h") else int(tf)
  532. except Exception:
  533. hours = 24
  534. hours = max(1, min(int(hours), 168))
  535. clusters = store.get_latest_clusters_all_topics(ttl_hours=hours, limit=500)
  536. matched = []
  537. for c in clusters:
  538. haystack = _cluster_entity_haystack(c)
  539. if any(any(term in item for item in haystack) for term in query_terms):
  540. matched.append(c)
  541. if not matched:
  542. return {
  543. "entity": entity,
  544. "sentiment": "neutral",
  545. "score": 0.0,
  546. "cluster_count": 0,
  547. }
  548. scores = []
  549. for c in matched:
  550. s = c.get("sentimentScore")
  551. if s is not None:
  552. try:
  553. scores.append(float(s))
  554. except Exception:
  555. pass
  556. avg_score = sum(scores) / len(scores) if scores else 0.0
  557. # Keep the label aligned with the numeric score.
  558. # Small magnitudes are treated as neutral to avoid noisy label flips.
  559. if avg_score >= 0.15:
  560. sentiment = "positive"
  561. elif avg_score <= -0.15:
  562. sentiment = "negative"
  563. else:
  564. sentiment = "neutral"
  565. return {
  566. "entity": entity,
  567. "sentiment": sentiment,
  568. "score": round(avg_score, 3),
  569. "cluster_count": len(matched),
  570. }
  571. @mcp.tool(description="Describe the server tool surface, how tools fit together, and output conventions for downstream agents.")
  572. async def get_capabilities():
  573. return {
  574. "server": {
  575. "name": "news-mcp",
  576. "purpose": "Recent news clusters, entity drill-down, sentiment, emerging topics, and related-entity expansion.",
  577. "output_conventions": {
  578. "cluster_ids": "Do not surface cluster_id in user-facing prose unless explicitly requested; treat it as internal navigation metadata.",
  579. "sources": "Always preserve and display sources when summarizing a cluster or entity result.",
  580. "timestamps": "Mention timestamps consistently when comparing multiple clusters or when recency matters.",
  581. },
  582. },
  583. "tools": NEWS_TOOL_CARDS,
  584. "recipes": NEWS_COMPOSITION_RECIPES,
  585. "example_chains": NEWS_EXAMPLE_CHAINS,
  586. "agent_tips": NEWS_AGENT_TIPS,
  587. "guidance": [
  588. "Use get_latest_events for a tail, get_events_for_entity for entity deep dives, and get_related_recent_entities for neighborhood expansion.",
  589. "Prefer normalized/canonical entities when possible, but the server will resolve common aliases and MIDs for you.",
  590. "When presenting results to users, summarize the cluster; avoid exposing internal IDs unless they are needed for follow-up tool calls.",
  591. ],
  592. }
  593. def _parse_timeframe_to_hours(timeframe: str) -> int:
  594. tf = str(timeframe).strip().lower()
  595. try:
  596. if tf.endswith("d"):
  597. days = int(tf[:-1])
  598. return max(1, days * 24)
  599. if tf.endswith("h"):
  600. return max(1, int(tf[:-1]))
  601. return max(1, int(tf))
  602. except Exception:
  603. return 24
  604. from contextlib import asynccontextmanager
  605. @asynccontextmanager
  606. async def _lifespan(app: FastAPI):
  607. asyncio.ensure_future(_background_refresh_loop())
  608. yield
  609. app = FastAPI(title="News MCP Server", lifespan=_lifespan)
  610. logger = logging.getLogger("news_mcp.startup")
  611. app.mount("/mcp", mcp.sse_app())
  612. # Shared store — single connection pool
  613. _shared_store = SQLiteClusterStore(DB_PATH)
  614. _refresh_lock = asyncio.Lock()
  615. _refresh_started = False
  616. async def _background_refresh_loop():
  617. """Non-blocking background refresher: prune then poll.
  618. Protected by an async lock so a second event-loop wake-up cannot
  619. start a parallel ingestion cycle.
  620. """
  621. global _refresh_started
  622. async with _refresh_lock:
  623. if _refresh_started:
  624. return
  625. _refresh_started = True
  626. logger.info("news-mcp llm config: %s", active_llm_config())
  627. # Prune off-thread so we do not block the event loop
  628. prune_result = await asyncio.to_thread(
  629. _shared_store.prune_if_due,
  630. NEWS_PRUNING_ENABLED,
  631. NEWS_RETENTION_DAYS,
  632. NEWS_PRUNE_INTERVAL_HOURS,
  633. )
  634. logger.info("startup prune_result=%s", prune_result)
  635. if not NEWS_BACKGROUND_REFRESH_ENABLED:
  636. return
  637. async def _loop():
  638. if not NEWS_BACKGROUND_REFRESH_ON_START:
  639. logger.info("background refresh delayed start interval_seconds=%s", NEWS_REFRESH_INTERVAL_SECONDS)
  640. await asyncio.sleep(float(NEWS_REFRESH_INTERVAL_SECONDS))
  641. while True:
  642. try:
  643. logger.info("background refresh tick start")
  644. await refresh_clusters(topic=None, limit=200)
  645. logger.info("background refresh tick complete")
  646. except Exception:
  647. logger.exception("background refresh tick failed")
  648. await asyncio.sleep(float(NEWS_REFRESH_INTERVAL_SECONDS))
  649. asyncio.create_task(_loop())
  650. @app.get("/")
  651. def root():
  652. return {
  653. "status": "ok",
  654. "transport": "fastmcp+sse",
  655. "mount": "/mcp",
  656. "tools": [
  657. "get_latest_events",
  658. "get_events_for_entity",
  659. "get_event_summary",
  660. "detect_emerging_topics",
  661. "get_news_sentiment",
  662. "get_related_recent_entities",
  663. "get_capabilities",
  664. ],
  665. "refresh": {
  666. "enabled": NEWS_BACKGROUND_REFRESH_ENABLED,
  667. "interval_seconds": NEWS_REFRESH_INTERVAL_SECONDS,
  668. },
  669. "retention": {
  670. "lookback_hours": DEFAULT_LOOKBACK_HOURS,
  671. "retention_days": NEWS_RETENTION_DAYS,
  672. },
  673. "pruning": {
  674. "enabled": NEWS_PRUNING_ENABLED,
  675. "interval_hours": NEWS_PRUNE_INTERVAL_HOURS,
  676. },
  677. }
  678. # ------------------------------------------------------------------
  679. # Dashboard REST API endpoints
  680. # ------------------------------------------------------------------
  681. from fastapi.staticfiles import StaticFiles
  682. from fastapi.responses import JSONResponse
  683. app.mount("/dashboard", StaticFiles(directory="dashboard", html=True), name="dashboard")
  684. import logging as _log
  685. API_LOG = _log.getLogger("news_mcp.api")
  686. def _api_ok(data: dict) -> dict:
  687. return data
  688. def _api_err(exc: Exception, ctx: str) -> JSONResponse:
  689. API_LOG.exception(f"API error in {ctx}")
  690. return JSONResponse(status_code=500, content={"error": str(exc), "ctx": ctx})
  691. @app.get("/api/v1/health")
  692. def api_health():
  693. """Extended health + dashboard stats."""
  694. try:
  695. store = DashboardStore(_shared_store)
  696. return store.get_dashboard_stats()
  697. except Exception as e:
  698. return _api_err(e, "health")
  699. @app.get("/api/v1/clusters")
  700. def api_clusters(
  701. topic: str | None = None,
  702. hours: int = 24,
  703. limit: int = 50,
  704. offset: int = 0,
  705. ):
  706. """Paginated cluster listing."""
  707. try:
  708. store = DashboardStore(_shared_store)
  709. clusters = store.get_clusters_page(topic=topic, hours=hours, limit=limit, offset=offset)
  710. with store._store._conn() as conn:
  711. if topic and topic != "all":
  712. count_row = conn.execute(
  713. "SELECT COUNT(*) FROM clusters WHERE updated_at >= datetime('now', ? || ' hours') AND topic = ?",
  714. (-hours, topic),
  715. ).fetchone()
  716. else:
  717. count_row = conn.execute(
  718. "SELECT COUNT(*) FROM clusters WHERE updated_at >= datetime('now', ? || ' hours')",
  719. (-hours,),
  720. ).fetchone()
  721. total = count_row[0] if count_row else 0
  722. return {"clusters": clusters, "total": total, "topic": topic or "all", "hours": hours}
  723. except Exception as e:
  724. return _api_err(e, f"clusters(topic={topic},hours={hours})")
  725. @app.get("/api/v1/sentiment-series")
  726. def api_sentiment_series(
  727. topic: str | None = None,
  728. hours: int = 24,
  729. bucket_hours: float = 1.0,
  730. ):
  731. """Sentiment time-series for Chart.js."""
  732. try:
  733. store = DashboardStore(_shared_store)
  734. series = store.get_sentiment_series(topic=topic, hours=hours, bucket_hours=bucket_hours)
  735. return {"series": series, "topic": topic or "all"}
  736. except Exception as e:
  737. return _api_err(e, f"sentiment(topic={topic})")
  738. @app.get("/api/v1/entities")
  739. def api_entities(
  740. hours: int = 24,
  741. limit: int = 30,
  742. ):
  743. """Top entity frequencies."""
  744. try:
  745. store = DashboardStore(_shared_store)
  746. entities = store.get_entity_frequencies(hours=hours, limit=limit)
  747. return {"entities": entities, "hours": hours}
  748. except Exception as e:
  749. return _api_err(e, f"entities(hours={hours})")
  750. @app.get("/api/v1/cluster/{cluster_id}")
  751. def api_cluster_detail(cluster_id: str):
  752. """Full cluster detail for drill-down."""
  753. try:
  754. store = DashboardStore(_shared_store)
  755. detail = store.get_cluster_detail(cluster_id)
  756. if not detail:
  757. return JSONResponse(status_code=404, content={"error": "Cluster not found", "id": cluster_id})
  758. return detail
  759. except Exception as e:
  760. return _api_err(e, f"detail({cluster_id})")
  761. @app.get("/health")
  762. def health():
  763. return {
  764. "status": "ok",
  765. "uptime": round(time.monotonic() - _PROCESS_STARTED_AT, 3),
  766. }