server.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432
  1. from __future__ import annotations
  2. from contextlib import asynccontextmanager
  3. import asyncio
  4. import json
  5. from datetime import datetime, timezone
  6. from uuid import uuid4
  7. import anyio
  8. from fastapi import FastAPI
  9. from fastapi.responses import JSONResponse
  10. from mcp.server.fastmcp import FastMCP
  11. from mcp.server.transport_security import TransportSecuritySettings
  12. from mcp import ClientSession
  13. from mcp.client.sse import sse_client
  14. from .config import load_config
  15. from .crypto_client import get_price, get_regime
  16. from .decision_engine import assess_wallet_state, make_decision
  17. from .narrative_engine import build_narrative
  18. from .state_engine import synthesize_state
  19. from .store import get_state, init_db, list_concerns, latest_cycle, latest_cycles, latest_decisions, latest_narratives, latest_regime_samples, prune_older_than, recent_regime_samples, sync_concerns_from_strategies, upsert_cycle, upsert_decision, upsert_narrative, upsert_regime_sample, upsert_state, latest_states
  20. from .trader_client import apply_control_decision as trader_apply_control_decision, get_strategy as trader_get_strategy, list_strategies
  21. mcp = FastMCP(
  22. "hermes-mcp",
  23. transport_security=TransportSecuritySettings(enable_dns_rebinding_protection=False),
  24. )
  25. def _build_trader_control_payload(*, decision_id: str, concern: dict, decision: object) -> dict | None:
  26. action = str(getattr(decision, "action", "") or "").strip()
  27. target_strategy = str(getattr(decision, "target_strategy", "") or "").strip() or None
  28. decision_payload = getattr(decision, "payload", {}) if isinstance(getattr(decision, "payload", {}), dict) else {}
  29. current_primary = str(decision_payload.get("current_primary_strategy") or "").strip() or None
  30. trader_action: str | None = None
  31. risk_mode: str | None = None
  32. if action.startswith("replace_with_") or action.startswith("enable_"):
  33. trader_action = "switch"
  34. elif action == "suspend_grid":
  35. trader_action = "pause"
  36. target_strategy = current_primary
  37. elif action == "set_risk_mode":
  38. trader_action = "set_risk_mode"
  39. risk_mode = str(decision_payload.get("risk_mode") or "").strip() or None
  40. else:
  41. return None
  42. account_id = str(concern.get("account_id") or "").strip()
  43. market_symbol = str(concern.get("market_symbol") or "").strip().lower()
  44. concern_id = str(concern.get("id") or "").strip() or None
  45. reason = str(getattr(decision, "reason_summary", "") or "").strip()
  46. confidence = float(getattr(decision, "confidence", 0.0) or 0.0)
  47. payload = {
  48. "decision_id": decision_id,
  49. "concern_id": concern_id,
  50. "account_id": account_id,
  51. "market_symbol": market_symbol,
  52. "action": trader_action,
  53. "target_strategy_id": target_strategy,
  54. "expected_active_strategy_id": current_primary,
  55. "risk_mode": risk_mode,
  56. "reason": reason,
  57. "confidence": confidence,
  58. "dry_run": False,
  59. "override": False,
  60. "source": "hermes-mcp",
  61. "source_action": action,
  62. }
  63. return payload
  64. async def _maybe_dispatch_trader_action(*, cfg: object, decision_id: str, concern: dict, decision: object) -> dict:
  65. if not bool(getattr(decision, "requires_action", False)):
  66. return {"dispatch": "not_required"}
  67. payload = _build_trader_control_payload(decision_id=decision_id, concern=concern, decision=decision)
  68. if payload is None:
  69. return {
  70. "dispatch": "skipped",
  71. "reason": f"no trader action mapping for {getattr(decision, 'action', 'unknown')}",
  72. }
  73. if not bool(getattr(cfg, "hermes_allow_actions", False)):
  74. return {
  75. "dispatch": "blocked",
  76. "reason": "HERMES_ALLOW_ACTIONS is false",
  77. "payload": payload,
  78. }
  79. try:
  80. result = await trader_apply_control_decision(getattr(cfg, "trader_url"), payload)
  81. return {
  82. "dispatch": "sent",
  83. "payload": payload,
  84. "result": result,
  85. }
  86. except Exception as exc:
  87. return {
  88. "dispatch": "failed",
  89. "payload": payload,
  90. "error": str(exc),
  91. }
  92. @mcp.tool(description="Return Hermes current state, narrative, uncertainty, and a short self-assessment report.")
  93. def report() -> dict:
  94. state = get_state()
  95. return {
  96. "status": state.get("status", "stub"),
  97. "thinking": state.get("thinking", "Hermes scaffold is ready."),
  98. "confidence": state.get("confidence", 0.0),
  99. "uncertainty": state.get("uncertainty", ["no live adapters wired yet"]),
  100. "layers": state.get("layers", []),
  101. }
  102. @asynccontextmanager
  103. async def lifespan(_: FastAPI):
  104. cfg = load_config()
  105. init_db()
  106. try:
  107. sync_concerns_from_strategies(await list_strategies(cfg.trader_url))
  108. except Exception:
  109. pass
  110. try:
  111. prune_older_than(cfg.retention_days)
  112. except Exception:
  113. pass
  114. async def _poll_loop() -> None:
  115. while True:
  116. started = datetime.now(timezone.utc).isoformat()
  117. cycle_id = str(uuid4())
  118. concerns = list_concerns()
  119. try:
  120. strategy_inventory = await list_strategies(cfg.trader_url)
  121. enriched_inventory = []
  122. for strategy in strategy_inventory:
  123. instance_id = str(strategy.get("id") or "").strip()
  124. if not instance_id:
  125. enriched_inventory.append(strategy)
  126. continue
  127. try:
  128. detail = await trader_get_strategy(cfg.trader_url, instance_id, include_state=True, include_report=True)
  129. enriched_inventory.append({**strategy, **detail})
  130. except Exception:
  131. enriched_inventory.append(strategy)
  132. strategy_inventory = enriched_inventory
  133. except Exception:
  134. strategy_inventory = []
  135. upsert_cycle(id=cycle_id, started_at=started, finished_at=None, status="running", trigger="interval", notes=f"polling {len(concerns)} concerns")
  136. for concern in concerns:
  137. symbol = _resolve_regime_symbol(concern)
  138. if not symbol:
  139. continue
  140. account_id = str(concern.get("account_id") or "").strip()
  141. account_info = {}
  142. if account_id:
  143. try:
  144. payload = await _call_exec_tool(cfg.exec_url, "get_account_info", {"account_id": account_id})
  145. account_info = payload if isinstance(payload, dict) else {}
  146. except Exception:
  147. account_info = {}
  148. current_regimes: list[dict] = []
  149. for timeframe in cfg.crypto_timeframes:
  150. regime = await get_regime(cfg.crypto_url, str(symbol), timeframe)
  151. current_regimes.append({**regime, "timeframe": timeframe})
  152. upsert_regime_sample(
  153. id=f"{cycle_id}:{concern['id']}:{timeframe}",
  154. cycle_id=cycle_id,
  155. concern_id=str(concern["id"]),
  156. timeframe=timeframe,
  157. regime_json=json.dumps(regime, ensure_ascii=False),
  158. captured_at=datetime.now(timezone.utc).isoformat(),
  159. )
  160. try:
  161. state = synthesize_state(concern=concern, regimes=current_regimes, account_info=account_info)
  162. upsert_state(
  163. id=f"{cycle_id}:{concern['id']}",
  164. cycle_id=cycle_id,
  165. concern_id=str(concern["id"]),
  166. market_regime=state.market_regime,
  167. volatility_state=state.volatility_state,
  168. liquidity_state=state.liquidity_state,
  169. sentiment_pressure=state.sentiment_pressure,
  170. event_risk=state.event_risk,
  171. execution_quality=state.execution_quality,
  172. confidence=state.confidence,
  173. payload_json=json.dumps(state.payload, ensure_ascii=False),
  174. created_at=state.payload.get("generated_at"),
  175. )
  176. narrative = build_narrative(concern=concern, state_payload=state.payload)
  177. upsert_narrative(
  178. id=f"{cycle_id}:{concern['id']}",
  179. cycle_id=cycle_id,
  180. concern_id=str(concern["id"]),
  181. summary=narrative.summary,
  182. key_drivers_json=json.dumps(narrative.key_drivers, ensure_ascii=False),
  183. risk_flags_json=json.dumps(narrative.risk_flags, ensure_ascii=False),
  184. uncertainties_json=json.dumps(narrative.uncertainties, ensure_ascii=False),
  185. confidence=narrative.confidence,
  186. created_at=narrative.payload.get("generated_at"),
  187. )
  188. latest_price = None
  189. if current_regimes:
  190. latest_price = next((r.get("price") for r in reversed(current_regimes) if r.get("price") is not None), None)
  191. wallet_state = assess_wallet_state(
  192. account_info=account_info,
  193. concern=concern,
  194. price=float(latest_price) if latest_price is not None else None,
  195. strategies=strategy_inventory,
  196. )
  197. decision = make_decision(
  198. concern=concern,
  199. narrative_payload={
  200. **state.payload,
  201. **narrative.payload,
  202. "confidence": narrative.confidence,
  203. },
  204. wallet_state=wallet_state,
  205. strategies=strategy_inventory,
  206. )
  207. decision_id = f"{cycle_id}:{concern['id']}"
  208. dispatch_record = await _maybe_dispatch_trader_action(
  209. cfg=cfg,
  210. decision_id=decision_id,
  211. concern=concern,
  212. decision=decision,
  213. )
  214. decision_payload = {
  215. **decision.payload,
  216. "dispatch": dispatch_record,
  217. }
  218. upsert_decision(
  219. id=decision_id,
  220. cycle_id=cycle_id,
  221. concern_id=str(concern["id"]),
  222. mode=decision.mode,
  223. action=decision.action,
  224. target_strategy=decision.target_strategy,
  225. target_policy_json=json.dumps(decision_payload, ensure_ascii=False),
  226. reason_summary=decision.reason_summary,
  227. confidence=decision.confidence,
  228. requires_action=decision.requires_action,
  229. created_at=decision.payload.get("generated_at"),
  230. )
  231. except Exception:
  232. pass
  233. upsert_cycle(id=cycle_id, started_at=started, finished_at=datetime.now(timezone.utc).isoformat(), status="ok", trigger="interval", notes=f"polled {len(concerns)} concerns over {','.join(cfg.crypto_timeframes)}")
  234. await asyncio.sleep(max(10, cfg.cycle_seconds))
  235. asyncio.create_task(_poll_loop())
  236. yield
  237. app = FastAPI(title="Hermes MCP", lifespan=lifespan)
  238. app.mount("/mcp", mcp.sse_app())
  239. @app.get("/")
  240. def root() -> dict:
  241. return {"status": "ok", "mount": "/mcp/sse", "dashboard": "/dashboard"}
  242. @app.get("/health")
  243. def health() -> dict:
  244. return {"status": "ok", "db": "sqlite", "tool": "report"}
  245. def _strip_sse(url: str) -> str:
  246. root = url.rstrip("/")
  247. return root[:-8] if root.endswith("/mcp/sse") else root
  248. async def _call_exec_tool(exec_url: str, tool: str, arguments: dict) -> object:
  249. async with sse_client(exec_url) as (read_stream, write_stream):
  250. async with ClientSession(read_stream, write_stream) as session:
  251. await session.initialize()
  252. result = await session.call_tool(tool, arguments)
  253. content = getattr(result, "content", None) or []
  254. if not content:
  255. return None
  256. first = content[0]
  257. text = getattr(first, "text", None) if not isinstance(first, dict) else first.get("text")
  258. if text is None:
  259. return None
  260. try:
  261. return json.loads(text)
  262. except Exception:
  263. return text
  264. async def _load_exec_enrichment(exec_url: str, crypto_url: str, concerns: list[dict]) -> tuple[dict[str, dict], dict[str, dict], dict[str, float | None]]:
  265. account_ids = sorted({str(c.get("account_id") or "").strip() for c in concerns if str(c.get("account_id") or "").strip()})
  266. market_symbols = sorted({str(c.get("market_symbol") or "").strip().lower() for c in concerns if str(c.get("market_symbol") or "").strip()})
  267. account_payloads = await asyncio.gather(*[_call_exec_tool(exec_url, "get_account_info", {"account_id": account_id}) for account_id in account_ids])
  268. market_payload = await _call_exec_tool(exec_url, "list_markets", {})
  269. accounts_by_id = {account_id: payload for account_id, payload in zip(account_ids, account_payloads) if isinstance(payload, dict)}
  270. total_values: dict[str, float | None] = {}
  271. for account_id, payload in accounts_by_id.items():
  272. total_values[account_id] = await _live_total_value(crypto_url, payload)
  273. markets_by_symbol: dict[str, dict] = {}
  274. if isinstance(market_payload, list):
  275. for market in market_payload:
  276. if not isinstance(market, dict):
  277. continue
  278. symbol = str(market.get("symbol") or market.get("market_symbol") or "").strip().lower()
  279. if symbol in market_symbols or symbol:
  280. markets_by_symbol[symbol] = market
  281. return accounts_by_id, markets_by_symbol, total_values
  282. async def _live_total_value(crypto_url: str, account_info: dict) -> float | None:
  283. balances = account_info.get("balances")
  284. if not isinstance(balances, list):
  285. return None
  286. total = 0.0
  287. seen = False
  288. for item in balances:
  289. if not isinstance(item, dict):
  290. continue
  291. asset = str(item.get("asset_code") or item.get("asset") or "").strip().lower()
  292. amount = item.get("total")
  293. if not asset or amount is None:
  294. continue
  295. try:
  296. amount_f = float(amount)
  297. except Exception:
  298. continue
  299. seen = True
  300. if asset == "usd":
  301. total += amount_f
  302. continue
  303. price_payload = await get_price(crypto_url, asset)
  304. try:
  305. price = float(price_payload.get("price"))
  306. except Exception:
  307. price = 0.0
  308. total += amount_f * price
  309. return total if seen else None
  310. def _compact_balances(payload: object) -> str:
  311. if not isinstance(payload, list):
  312. return "-"
  313. parts: list[str] = []
  314. for item in payload:
  315. if not isinstance(item, dict):
  316. continue
  317. asset = str(item.get("asset_code") or item.get("asset") or "").upper().strip()
  318. total = item.get("total")
  319. available = item.get("available")
  320. value_usd = item.get("value_usd")
  321. if not asset:
  322. continue
  323. segment = f"{asset} {float(total):.8g}" if total is not None else asset
  324. if available is not None and total is not None and available != total:
  325. segment += f" (avail {float(available):.8g})"
  326. if isinstance(value_usd, (int, float)):
  327. segment += f" ≈ ${float(value_usd):,.2f}"
  328. parts.append(segment)
  329. return " | ".join(parts[:5]) or "-"
  330. def _resolve_regime_symbol(concern: dict) -> str | None:
  331. base = str(concern.get("base_currency") or "").strip().upper()
  332. if base:
  333. return base
  334. market = str(concern.get("market_symbol") or "").strip().upper().replace("/", "").replace("-", "")
  335. for suffix in ("USDT", "USDC", "USD", "EUR", "BTC", "ETH"):
  336. if market.endswith(suffix) and len(market) > len(suffix):
  337. return market[:-len(suffix)]
  338. return market or None
  339. @app.get("/dashboard/data")
  340. def dashboard_data() -> JSONResponse:
  341. cfg = load_config()
  342. concerns = list_concerns()
  343. accounts_by_id: dict[str, dict] = {}
  344. markets_by_symbol: dict[str, dict] = {}
  345. try:
  346. accounts_by_id, markets_by_symbol, total_values = anyio.run(_load_exec_enrichment, cfg.exec_url, cfg.crypto_url, concerns)
  347. except Exception:
  348. total_values = {}
  349. pass
  350. enriched = []
  351. concern_lookup: dict[str, dict] = {}
  352. for concern in concerns:
  353. account_id = str(concern.get("account_id") or "").strip()
  354. market_symbol = str(concern.get("market_symbol") or "").strip().lower()
  355. account_info = accounts_by_id.get(account_id, {})
  356. market_info = markets_by_symbol.get(market_symbol, {})
  357. enriched.append({
  358. **concern,
  359. "account_display": account_info.get("display_name") or account_id,
  360. "balances": account_info.get("balances") or account_info.get("balance") or account_info.get("wallets") or [],
  361. "balance_summary": _compact_balances(account_info.get("balances") or account_info.get("balance") or account_info.get("wallets") or []),
  362. "total_value_usd": total_values.get(account_id) if total_values.get(account_id) is not None else account_info.get("total_value_usd"),
  363. "market_display": market_info.get("name") or concern.get("market_symbol") or "",
  364. "market_description": market_info.get("description") or "",
  365. })
  366. concern_lookup[str(concern.get("id") or "")] = enriched[-1]
  367. regimes = []
  368. histories_by_key: dict[str, list[dict]] = {}
  369. for sample in recent_regime_samples(1000):
  370. concern_id = str(sample.get("concern_id") or "")
  371. timeframe = str(sample.get("timeframe") or "")
  372. key = f"{concern_id}::{timeframe}"
  373. bucket = histories_by_key.setdefault(key, [])
  374. if len(bucket) < 24:
  375. bucket.append(sample)
  376. for sample in latest_regime_samples(20):
  377. concern_meta = concern_lookup.get(str(sample.get("concern_id") or ""), {})
  378. regimes.append({**sample, **{
  379. "account_display": concern_meta.get("account_display"),
  380. "market_display": concern_meta.get("market_display"),
  381. "market_symbol": concern_meta.get("market_symbol"),
  382. }})
  383. return JSONResponse({
  384. "latest_cycle": latest_cycle(),
  385. "cycles": latest_cycles(10),
  386. "concerns": enriched,
  387. "regime_samples": regimes,
  388. "regime_histories": histories_by_key,
  389. "state_samples": latest_states(20),
  390. "narrative_samples": latest_narratives(20),
  391. "decision_samples": latest_decisions(20),
  392. })