mcp_server_fastmcp.py 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760
  1. from __future__ import annotations
  2. import asyncio
  3. import logging
  4. from collections import Counter
  5. from datetime import datetime, timezone
  6. from email.utils import parsedate_to_datetime
  7. from fastapi import FastAPI
  8. from mcp.server.fastmcp import FastMCP
  9. from mcp.server.transport_security import TransportSecuritySettings
  10. from news_mcp.config import DEFAULT_LOOKBACK_HOURS, DEFAULT_TOPICS, DB_PATH
  11. from news_mcp.config import (
  12. NEWS_PRUNE_INTERVAL_HOURS,
  13. NEWS_PRUNING_ENABLED,
  14. NEWS_REFRESH_INTERVAL_SECONDS,
  15. NEWS_BACKGROUND_REFRESH_ENABLED,
  16. NEWS_BACKGROUND_REFRESH_ON_START,
  17. NEWS_RETENTION_DAYS,
  18. )
  19. from news_mcp.jobs.poller import refresh_clusters
  20. from news_mcp.storage.sqlite_store import SQLiteClusterStore
  21. from news_mcp.enrichment.llm_enrich import summarize_cluster_llm
  22. from news_mcp.trends_resolution import resolve_entity_via_trends
  23. from news_mcp.llm import active_llm_config
  24. from news_mcp.entity_normalize import normalize_query
  25. from news_mcp.related_entities import related_recent_entities
  26. logging.basicConfig(
  27. level=logging.INFO,
  28. format="%(asctime)s %(levelname)s %(name)s: %(message)s",
  29. )
  30. mcp = FastMCP(
  31. "news-mcp",
  32. transport_security=TransportSecuritySettings(enable_dns_rebinding_protection=False),
  33. )
  34. def _cluster_entity_haystack(cluster: dict) -> list[str]:
  35. """Collect the normalized entity clues attached to a cluster."""
  36. values: list[str] = []
  37. for ent in cluster.get("entities", []) or []:
  38. values.append(str(ent).strip().lower())
  39. for res in cluster.get("entityResolutions", []) or []:
  40. if not isinstance(res, dict):
  41. continue
  42. for key in ("normalized", "canonical_label", "mid"):
  43. val = res.get(key)
  44. if val:
  45. values.append(str(val).strip().lower())
  46. return [v for v in values if v]
  47. def _parse_cluster_timestamp(value) -> datetime:
  48. if not value:
  49. return datetime.min.replace(tzinfo=timezone.utc)
  50. text = str(value).strip()
  51. if not text:
  52. return datetime.min.replace(tzinfo=timezone.utc)
  53. try:
  54. dt = datetime.fromisoformat(text.replace("Z", "+00:00"))
  55. if dt.tzinfo is None:
  56. dt = dt.replace(tzinfo=timezone.utc)
  57. return dt.astimezone(timezone.utc)
  58. except Exception:
  59. pass
  60. try:
  61. dt = parsedate_to_datetime(text)
  62. if dt.tzinfo is None:
  63. dt = dt.replace(tzinfo=timezone.utc)
  64. return dt.astimezone(timezone.utc)
  65. except Exception:
  66. return datetime.min.replace(tzinfo=timezone.utc)
  67. def _sort_clusters_by_recency(clusters: list[dict]) -> list[dict]:
  68. return sorted(
  69. clusters,
  70. key=lambda c: (
  71. _parse_cluster_timestamp(c.get("timestamp")),
  72. float(c.get("importance", 0.0) or 0.0),
  73. ),
  74. reverse=True,
  75. )
  76. def _tool_card(name: str, description: str, inputs: list[dict], outputs: list[str], notes: list[str] | None = None) -> dict:
  77. return {
  78. "name": name,
  79. "description": description,
  80. "inputs": inputs,
  81. "outputs": outputs,
  82. "notes": notes or [],
  83. }
  84. NEWS_TOOL_CARDS = [
  85. _tool_card(
  86. "get_latest_events",
  87. "Get the newest deduplicated clusters for a topic or resolved entity-like query.",
  88. [
  89. {"name": "topic", "type": "string", "default": "crypto", "meaning": "coarse category or entity-like topic"},
  90. {"name": "limit", "type": "integer", "default": 5, "range": "1-20"},
  91. {"name": "include_articles", "type": "boolean", "default": False},
  92. ],
  93. ["headline", "summary", "entities", "sentiment", "importance", "sources", "timestamp", "articles?"],
  94. ["Use when you want the freshest clusters and are willing to let the server decide topic vs entity mode."],
  95. ),
  96. _tool_card(
  97. "get_events_for_entity",
  98. "Search recent clusters for a person, place, company, or theme by entity matching.",
  99. [
  100. {"name": "entity", "type": "string", "meaning": "entity label or phrase"},
  101. {"name": "timeframe", "type": "string", "default": "24h", "examples": ["24h", "72h", "3d"]},
  102. {"name": "limit", "type": "integer", "default": 10, "range": "1-30"},
  103. {"name": "include_articles", "type": "boolean", "default": False},
  104. ],
  105. ["headline", "summary", "entities", "sentiment", "importance", "sources", "timestamp", "articles?"],
  106. ["Normalization is automatic; use this for an entity-centered deep dive."],
  107. ),
  108. _tool_card(
  109. "get_event_summary",
  110. "Produce a concise LLM-written explanation for one cluster and key facts.",
  111. [
  112. {"name": "event_id", "type": "string", "meaning": "cluster_id; do not surface in user-facing prose"},
  113. {"name": "include_articles", "type": "boolean", "default": False},
  114. ],
  115. ["headline", "mergedSummary", "keyFacts", "sources", "articles?"],
  116. ["Prefer this after you have already chosen a specific cluster to explain."],
  117. ),
  118. _tool_card(
  119. "detect_emerging_topics",
  120. "Surface entities and phrases starting to matter in the recent window.",
  121. [{"name": "limit", "type": "integer", "default": 10, "range": "1-20"}],
  122. ["topic", "trend_score", "related_entities", "signal_type", "count", "avg_importance"],
  123. ["Good for 'what is heating up?' style questions."],
  124. ),
  125. _tool_card(
  126. "get_news_sentiment",
  127. "Estimate sentiment around an entity over a lookback window.",
  128. [
  129. {"name": "entity", "type": "string"},
  130. {"name": "timeframe", "type": "string", "default": "24h"},
  131. ],
  132. ["entity", "sentiment", "score", "cluster_count"],
  133. ["Use after locating a cluster set or entity neighborhood."],
  134. ),
  135. _tool_card(
  136. "get_related_recent_entities",
  137. "Blend local co-occurrence with Google Trends related topics, while preserving mids where available.",
  138. [
  139. {"name": "subject", "type": "string", "meaning": "canonical entity or subject phrase"},
  140. {"name": "timeframe", "type": "string", "default": "72h"},
  141. {"name": "limit", "type": "integer", "default": 10, "range": "1-25"},
  142. {"name": "include_trends", "type": "boolean", "default": True},
  143. ],
  144. ["subject", "related[].normalized", "related[].canonical_label", "related[].mid", "related[].sources", "related[].scores"],
  145. ["Use this to drill from a subject into related entities, then feed those into get_events_for_entity."],
  146. ),
  147. ]
  148. NEWS_COMPOSITION_RECIPES = [
  149. {
  150. "name": "fresh-news-tail",
  151. "steps": [
  152. "get_latest_events(topic=...)",
  153. "optionally get_event_summary(event_id=...) for the strongest cluster",
  154. ],
  155. "notes": ["Best for a quick tail of what is happening now."]
  156. },
  157. {
  158. "name": "entity-deep-dive",
  159. "steps": [
  160. "get_events_for_entity(entity=...)",
  161. "get_event_summary(event_id=...)",
  162. "get_news_sentiment(entity=..., timeframe=...)",
  163. ],
  164. "notes": ["Prefer canonical entity labels when you have them; the server normalizes for you."],
  165. },
  166. {
  167. "name": "subject-neighborhood",
  168. "steps": [
  169. "get_related_recent_entities(subject=...)",
  170. "for each strong related entity, call get_events_for_entity(entity=...)",
  171. ],
  172. "notes": ["Use this when you want a graph-like expansion around a subject."]
  173. },
  174. {
  175. "name": "emerging-signal",
  176. "steps": [
  177. "detect_emerging_topics(limit=...)",
  178. "choose a topic/entity",
  179. "get_events_for_entity(entity=...)",
  180. "get_news_sentiment(entity=...)",
  181. ],
  182. "notes": ["Good for trend scouting and risk mapping."],
  183. },
  184. ]
  185. NEWS_AGENT_TIPS = [
  186. "If you need a fast answer, start with get_latest_events, then summarize the strongest cluster with get_event_summary.",
  187. "If a user asks about a person/place/company/theme, use get_events_for_entity before broadening to get_related_recent_entities.",
  188. "Treat cluster_id as an internal cursor, not user-facing output; use it only for follow-up tool calls.",
  189. "When describing clusters, keep sources and timestamps visible so the user can assess recency and provenance.",
  190. "Prefer a short chain of tools over many parallel calls unless you are building a neighborhood map or comparison table.",
  191. "For tricky names, rely on the server’s resolver instead of inventing alias rules in the client.",
  192. ]
  193. NEWS_EXAMPLE_CHAINS = [
  194. {
  195. "task": "What is happening now?",
  196. "chain": [
  197. "get_latest_events(topic=...)",
  198. "get_event_summary(event_id=...) if one cluster looks important",
  199. ],
  200. },
  201. {
  202. "task": "Deep dive on an entity",
  203. "chain": [
  204. "get_events_for_entity(entity=..., timeframe=...)",
  205. "get_news_sentiment(entity=..., timeframe=...)",
  206. "get_event_summary(event_id=...) for the strongest cluster",
  207. ],
  208. },
  209. {
  210. "task": "Broaden from a subject",
  211. "chain": [
  212. "get_related_recent_entities(subject=..., include_trends=true)",
  213. "get_events_for_entity(entity=...) for the strongest related entities",
  214. ],
  215. },
  216. {
  217. "task": "Find what is emerging",
  218. "chain": [
  219. "detect_emerging_topics(limit=...)",
  220. "get_events_for_entity(entity=...) on one or two emerging terms",
  221. ],
  222. },
  223. ]
  224. @mcp.tool(description="Investigate a topic and return the newest deduplicated news clusters, sorted by recency.")
  225. async def get_latest_events(topic: str = "crypto", limit: int = 5, include_articles: bool = False):
  226. limit = max(1, min(int(limit), 20))
  227. # If the caller passes an entity-like value, resolve it and use the canonical
  228. # entity as the query lens. Otherwise keep the original topic path.
  229. topic_norm = normalize_query(topic).lower()
  230. resolved = resolve_entity_via_trends(topic_norm)
  231. allowed = {t.lower() for t in DEFAULT_TOPICS}
  232. is_topic = topic_norm in allowed
  233. query_terms = {
  234. topic_norm,
  235. str(resolved.get("normalized") or "").strip().lower(),
  236. str(resolved.get("canonical_label") or "").strip().lower(),
  237. str(resolved.get("mid") or "").strip().lower(),
  238. }
  239. query_terms = {q for q in query_terms if q}
  240. store = SQLiteClusterStore(DB_PATH)
  241. if is_topic:
  242. # Cache-first: only refresh if we currently have no fresh clusters for this topic.
  243. clusters = store.get_latest_clusters(topic=topic_norm, ttl_hours=DEFAULT_LOOKBACK_HOURS, limit=limit)
  244. if not clusters:
  245. await refresh_clusters(topic=topic_norm, limit=200)
  246. clusters = store.get_latest_clusters(topic=topic_norm, ttl_hours=DEFAULT_LOOKBACK_HOURS, limit=limit)
  247. else:
  248. # Entity-aware mode: search recent clusters across all topics and match by
  249. # raw entity, canonical label, or MID.
  250. clusters = store.get_latest_clusters_all_topics(ttl_hours=DEFAULT_LOOKBACK_HOURS, limit=limit * 8)
  251. filtered = []
  252. for c in clusters:
  253. haystack = _cluster_entity_haystack(c)
  254. if any(any(term in item for item in haystack) for term in query_terms):
  255. filtered.append(c)
  256. if len(filtered) >= limit:
  257. break
  258. clusters = filtered
  259. out = []
  260. for c in _sort_clusters_by_recency(clusters):
  261. item = {
  262. "cluster_id": c.get("cluster_id"),
  263. "headline": c.get("headline"),
  264. "summary": c.get("summary"),
  265. "entities": c.get("entities", []),
  266. "sentiment": c.get("sentiment", "neutral"),
  267. "importance": c.get("importance", 0.0),
  268. "sources": c.get("sources", []),
  269. "timestamp": c.get("timestamp"),
  270. }
  271. if include_articles:
  272. # Return minimal article fields to keep responses compact.
  273. arts = c.get("articles", []) or []
  274. item["articles"] = [
  275. {
  276. "title": a.get("title"),
  277. "url": a.get("url"),
  278. "source": a.get("source"),
  279. "timestamp": a.get("timestamp"),
  280. }
  281. for a in arts
  282. if isinstance(a, dict)
  283. ]
  284. out.append(item)
  285. return out
  286. @mcp.tool(description="Investigate a person, company, place, or theme by matching extracted entities within a time window.")
  287. async def get_events_for_entity(entity: str, limit: int = 10, timeframe: str = "24h", include_articles: bool = False):
  288. limit = max(1, min(int(limit), 30))
  289. query = normalize_query(entity).strip().lower()
  290. if not query:
  291. return []
  292. resolved = resolve_entity_via_trends(query)
  293. query_terms = {
  294. query,
  295. str(resolved.get("normalized") or "").strip().lower(),
  296. str(resolved.get("canonical_label") or "").strip().lower(),
  297. str(resolved.get("mid") or "").strip().lower(),
  298. }
  299. query_terms = {q for q in query_terms if q}
  300. store = SQLiteClusterStore(DB_PATH)
  301. def _match_clusters(clusters: list[dict]) -> list[dict]:
  302. hits: list[dict] = []
  303. for c in _sort_clusters_by_recency(clusters):
  304. haystack = _cluster_entity_haystack(c)
  305. if any(any(term in item for item in haystack) for term in query_terms):
  306. hits.append(c)
  307. if len(hits) >= limit:
  308. break
  309. return hits
  310. hours = _parse_timeframe_to_hours(timeframe)
  311. clusters = store.get_latest_clusters_all_topics(ttl_hours=hours, limit=max(200, limit * 10))
  312. hits = _match_clusters(clusters)
  313. out = []
  314. for c in hits:
  315. item = {
  316. "cluster_id": c.get("cluster_id"),
  317. "headline": c.get("headline"),
  318. "summary": c.get("summary"),
  319. "entities": c.get("entities", []),
  320. "sentiment": c.get("sentiment", "neutral"),
  321. "importance": c.get("importance", 0.0),
  322. "sources": c.get("sources", []),
  323. "timestamp": c.get("timestamp"),
  324. }
  325. if include_articles:
  326. arts = c.get("articles", []) or []
  327. item["articles"] = [
  328. {
  329. "title": a.get("title"),
  330. "url": a.get("url"),
  331. "source": a.get("source"),
  332. "timestamp": a.get("timestamp"),
  333. }
  334. for a in arts
  335. if isinstance(a, dict)
  336. ]
  337. out.append(item)
  338. return out
  339. @mcp.tool(description="Return entities most commonly associated with the subject in recent clusters, optionally blended with Google Trends suggestions.")
  340. async def get_related_recent_entities(subject: str, timeframe: str = "72h", limit: int = 10, include_trends: bool = True):
  341. limit = max(1, min(int(limit), 25))
  342. hours = _parse_timeframe_to_hours(timeframe)
  343. include_trends_bool = str(include_trends).strip().lower() not in {"false", "0", "no"}
  344. store = SQLiteClusterStore(DB_PATH)
  345. result = related_recent_entities(
  346. store=store,
  347. subject=subject,
  348. timeframe_hours=hours,
  349. limit=limit,
  350. include_trends=include_trends_bool,
  351. )
  352. return result
  353. @mcp.tool(description="Investigate one cluster in depth and return a concise LLM-written explanation plus key facts.")
  354. async def get_event_summary(event_id: str, include_articles: bool = False):
  355. store = SQLiteClusterStore(DB_PATH)
  356. # Summary cache: reuse if present within TTL.
  357. cached_summary = store.get_cluster_summary(
  358. cluster_id=event_id,
  359. ttl_hours=DEFAULT_LOOKBACK_HOURS,
  360. )
  361. if cached_summary:
  362. out = {
  363. "event_id": event_id,
  364. "headline": cached_summary.get("headline"),
  365. "mergedSummary": cached_summary.get("mergedSummary"),
  366. "keyFacts": cached_summary.get("keyFacts", []),
  367. "sources": cached_summary.get("sources", []),
  368. }
  369. if include_articles:
  370. cluster = store.get_cluster_by_id(event_id)
  371. arts = (cluster or {}).get("articles", []) or []
  372. out["articles"] = [
  373. {
  374. "title": a.get("title"),
  375. "url": a.get("url"),
  376. "source": a.get("source"),
  377. "timestamp": a.get("timestamp"),
  378. }
  379. for a in arts
  380. if isinstance(a, dict)
  381. ]
  382. return out
  383. cluster = store.get_cluster_by_id(event_id)
  384. if not cluster:
  385. return {
  386. "event_id": event_id,
  387. "error": "NOT_FOUND",
  388. }
  389. articles_out = None
  390. if include_articles:
  391. arts = cluster.get("articles", []) or []
  392. articles_out = [
  393. {
  394. "title": a.get("title"),
  395. "url": a.get("url"),
  396. "source": a.get("source"),
  397. "timestamp": a.get("timestamp"),
  398. }
  399. for a in arts
  400. if isinstance(a, dict)
  401. ]
  402. summary = await summarize_cluster_llm(cluster)
  403. store.upsert_cluster_summary(event_id, summary)
  404. out = {
  405. "event_id": event_id,
  406. "headline": summary.get("headline"),
  407. "mergedSummary": summary.get("mergedSummary"),
  408. "keyFacts": summary.get("keyFacts", []),
  409. "sources": summary.get("sources", []),
  410. }
  411. if include_articles:
  412. out["articles"] = articles_out or []
  413. return out
  414. @mcp.tool(description="Explore what is starting to matter: surface emerging entities and phrases from recent clusters.")
  415. async def detect_emerging_topics(limit: int = 10):
  416. limit = max(1, min(int(limit), 20))
  417. store = SQLiteClusterStore(DB_PATH)
  418. clusters = store.get_latest_clusters_all_topics(ttl_hours=DEFAULT_LOOKBACK_HOURS, limit=200)
  419. import re
  420. entity_counts = Counter()
  421. entity_importance_sum = Counter()
  422. # co-occurrence: ent -> other_ent -> count
  423. entity_cooccur = {}
  424. phrase_counts = Counter()
  425. topic_counts = Counter()
  426. # Very light heuristics to reduce “meta entities” dominating emerging topics.
  427. # Keep it conservative: only skip obvious boilerplate.
  428. def _is_generic_entity(ent: str) -> bool:
  429. e = str(ent).strip().lower()
  430. if not e:
  431. return True
  432. if len(e) < 4:
  433. return True
  434. # common outlet-ish / meta-ish tokens
  435. if e in {"news", "latest", "breaking"}:
  436. return True
  437. return False
  438. for c in clusters:
  439. topic_counts[c.get("topic", "other")] += 1
  440. ents_in_cluster = [e for e in (c.get("entities", []) or []) if not _is_generic_entity(e)]
  441. ents_in_cluster_norm = [str(e).strip().lower() for e in ents_in_cluster if str(e).strip()]
  442. for ent in ents_in_cluster_norm:
  443. if _is_generic_entity(ent):
  444. continue
  445. entity_counts[ent] += 1
  446. try:
  447. entity_importance_sum[ent] += float(c.get("importance", 0.0) or 0.0)
  448. except Exception:
  449. pass
  450. # update co-occurrence counts
  451. for i in range(len(ents_in_cluster_norm)):
  452. a = ents_in_cluster_norm[i]
  453. if not a:
  454. continue
  455. entity_cooccur.setdefault(a, Counter())
  456. for j in range(len(ents_in_cluster_norm)):
  457. if i == j:
  458. continue
  459. b = ents_in_cluster_norm[j]
  460. if not b:
  461. continue
  462. entity_cooccur[a][b] += 1
  463. text = f"{c.get('headline','')} {c.get('summary','')}"
  464. words = [w for w in re.findall(r"[A-Za-z][A-Za-z0-9\-]{2,}", text.lower())]
  465. for i in range(len(words) - 1):
  466. phrase = f"{words[i]} {words[i+1]}"
  467. if len(phrase) > 6:
  468. phrase_counts[phrase] += 1
  469. emerging = []
  470. # Combine frequency with average importance so “big signal” rises over pure repetition.
  471. for ent, count in entity_counts.most_common(limit):
  472. avg_imp = entity_importance_sum[ent] / max(1, count)
  473. # avg_imp is typically 0..~1; keep score bounded.
  474. trend_score = 0.25 + 0.40 * min(1.0, avg_imp) + 0.08 * min(6.0, float(count))
  475. related = []
  476. for other, _cnt in (entity_cooccur.get(ent) or Counter()).most_common(3):
  477. # avoid returning the entity itself (shouldn't happen, but be safe)
  478. if other != ent:
  479. related.append(other)
  480. emerging.append({
  481. "topic": ent,
  482. "trend_score": min(0.99, round(trend_score, 2)),
  483. "related_entities": related if related else [ent],
  484. "signal_type": "entity",
  485. "count": count,
  486. "avg_importance": round(avg_imp, 3),
  487. })
  488. for phrase, count in phrase_counts.most_common(limit * 2):
  489. if any(item["topic"] == phrase for item in emerging):
  490. continue
  491. emerging.append({
  492. "topic": phrase.title(),
  493. "trend_score": min(0.99, round(0.20 + 0.10 * count, 2)),
  494. "related_entities": [],
  495. "signal_type": "phrase",
  496. "count": count,
  497. })
  498. if len(emerging) >= limit:
  499. break
  500. return emerging[:limit]
  501. @mcp.tool(description="Investigate whether sentiment around an entity is positive, negative, or neutral over a chosen lookback window.")
  502. async def get_news_sentiment(entity: str, timeframe: str = "24h"):
  503. store = SQLiteClusterStore(DB_PATH)
  504. ent = normalize_query(entity).strip().lower()
  505. resolved = resolve_entity_via_trends(ent)
  506. query_terms = {
  507. ent,
  508. str(resolved.get("normalized") or "").strip().lower(),
  509. str(resolved.get("canonical_label") or "").strip().lower(),
  510. str(resolved.get("mid") or "").strip().lower(),
  511. }
  512. query_terms = {q for q in query_terms if q}
  513. if not ent:
  514. return {
  515. "entity": entity,
  516. "sentiment": "neutral",
  517. "score": 0.0,
  518. "cluster_count": 0,
  519. }
  520. # timeframe: accept '24h' or '24'
  521. tf = str(timeframe).strip().lower()
  522. try:
  523. hours = int(tf[:-1]) if tf.endswith("h") else int(tf)
  524. except Exception:
  525. hours = 24
  526. hours = max(1, min(int(hours), 168))
  527. clusters = store.get_latest_clusters_all_topics(ttl_hours=hours, limit=500)
  528. matched = []
  529. for c in clusters:
  530. haystack = _cluster_entity_haystack(c)
  531. if any(any(term in item for item in haystack) for term in query_terms):
  532. matched.append(c)
  533. if not matched:
  534. return {
  535. "entity": entity,
  536. "sentiment": "neutral",
  537. "score": 0.0,
  538. "cluster_count": 0,
  539. }
  540. scores = []
  541. for c in matched:
  542. s = c.get("sentimentScore")
  543. if s is not None:
  544. try:
  545. scores.append(float(s))
  546. except Exception:
  547. pass
  548. avg_score = sum(scores) / len(scores) if scores else 0.0
  549. # Keep the label aligned with the numeric score.
  550. # Small magnitudes are treated as neutral to avoid noisy label flips.
  551. if avg_score >= 0.15:
  552. sentiment = "positive"
  553. elif avg_score <= -0.15:
  554. sentiment = "negative"
  555. else:
  556. sentiment = "neutral"
  557. return {
  558. "entity": entity,
  559. "sentiment": sentiment,
  560. "score": round(avg_score, 3),
  561. "cluster_count": len(matched),
  562. }
  563. @mcp.tool(description="Describe the server tool surface, how tools fit together, and output conventions for downstream agents.")
  564. async def get_capabilities():
  565. return {
  566. "server": {
  567. "name": "news-mcp",
  568. "purpose": "Recent news clusters, entity drill-down, sentiment, emerging topics, and related-entity expansion.",
  569. "output_conventions": {
  570. "cluster_ids": "Do not surface cluster_id in user-facing prose unless explicitly requested; treat it as internal navigation metadata.",
  571. "sources": "Always preserve and display sources when summarizing a cluster or entity result.",
  572. "timestamps": "Mention timestamps consistently when comparing multiple clusters or when recency matters.",
  573. },
  574. },
  575. "tools": NEWS_TOOL_CARDS,
  576. "recipes": NEWS_COMPOSITION_RECIPES,
  577. "example_chains": NEWS_EXAMPLE_CHAINS,
  578. "agent_tips": NEWS_AGENT_TIPS,
  579. "guidance": [
  580. "Use get_latest_events for a tail, get_events_for_entity for entity deep dives, and get_related_recent_entities for neighborhood expansion.",
  581. "Prefer normalized/canonical entities when possible, but the server will resolve common aliases and MIDs for you.",
  582. "When presenting results to users, summarize the cluster; avoid exposing internal IDs unless they are needed for follow-up tool calls.",
  583. ],
  584. }
  585. def _parse_timeframe_to_hours(timeframe: str) -> int:
  586. tf = str(timeframe).strip().lower()
  587. try:
  588. if tf.endswith("d"):
  589. days = int(tf[:-1])
  590. return max(1, days * 24)
  591. if tf.endswith("h"):
  592. return max(1, int(tf[:-1]))
  593. return max(1, int(tf))
  594. except Exception:
  595. return 24
  596. app = FastAPI(title="News MCP Server")
  597. logger = logging.getLogger("news_mcp.startup")
  598. app.mount("/mcp", mcp.sse_app())
  599. _background_task_started = False
  600. @app.on_event("startup")
  601. async def _start_background_refresh():
  602. global _background_task_started
  603. if _background_task_started:
  604. return
  605. if not NEWS_BACKGROUND_REFRESH_ENABLED:
  606. return
  607. _background_task_started = True
  608. logger.info("news-mcp llm config: %s", active_llm_config())
  609. store = SQLiteClusterStore(DB_PATH)
  610. prune_result = store.prune_if_due(
  611. pruning_enabled=NEWS_PRUNING_ENABLED,
  612. retention_days=NEWS_RETENTION_DAYS,
  613. interval_hours=NEWS_PRUNE_INTERVAL_HOURS,
  614. )
  615. logger.info("startup prune_result=%s", prune_result)
  616. async def _loop():
  617. if not NEWS_BACKGROUND_REFRESH_ON_START:
  618. logger.info("background refresh delayed start interval_seconds=%s", NEWS_REFRESH_INTERVAL_SECONDS)
  619. await asyncio.sleep(float(NEWS_REFRESH_INTERVAL_SECONDS))
  620. while True:
  621. try:
  622. logger.info("background refresh tick start")
  623. # Refresh all topics by passing topic=None
  624. await refresh_clusters(topic=None, limit=200)
  625. logger.info("background refresh tick complete")
  626. except Exception:
  627. # Keep the server alive, but do not hide the failure.
  628. logger.exception("background refresh tick failed")
  629. await asyncio.sleep(float(NEWS_REFRESH_INTERVAL_SECONDS))
  630. asyncio.create_task(_loop())
  631. @app.get("/")
  632. def root():
  633. return {
  634. "status": "ok",
  635. "transport": "fastmcp+sse",
  636. "mount": "/mcp",
  637. "tools": [
  638. "get_latest_events",
  639. "get_events_for_entity",
  640. "get_event_summary",
  641. "detect_emerging_topics",
  642. "get_news_sentiment",
  643. "get_related_recent_entities",
  644. "get_capabilities",
  645. ],
  646. "refresh": {
  647. "enabled": NEWS_BACKGROUND_REFRESH_ENABLED,
  648. "interval_seconds": NEWS_REFRESH_INTERVAL_SECONDS,
  649. },
  650. "retention": {
  651. "lookback_hours": DEFAULT_LOOKBACK_HOURS,
  652. "retention_days": NEWS_RETENTION_DAYS,
  653. },
  654. "pruning": {
  655. "enabled": NEWS_PRUNING_ENABLED,
  656. "interval_hours": NEWS_PRUNE_INTERVAL_HOURS,
  657. },
  658. }
  659. @app.get("/health")
  660. def health():
  661. store = SQLiteClusterStore(DB_PATH)
  662. return {
  663. "status": "ok",
  664. "lookback_hours": DEFAULT_LOOKBACK_HOURS,
  665. "db": str(DB_PATH),
  666. "last_refresh_at": store.get_meta("last_refresh_at"),
  667. "refresh": store.get_feed_state("breakingthenews"),
  668. "pruning": store.get_prune_state(
  669. pruning_enabled=NEWS_PRUNING_ENABLED,
  670. retention_days=NEWS_RETENTION_DAYS,
  671. interval_hours=NEWS_PRUNE_INTERVAL_HOURS,
  672. ),
  673. }