server.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489
  1. from __future__ import annotations
  2. from contextlib import asynccontextmanager
  3. import asyncio
  4. import json
  5. from datetime import datetime, timezone
  6. from uuid import uuid4
  7. import anyio
  8. from fastapi import FastAPI
  9. from fastapi.responses import JSONResponse
  10. from mcp.server.fastmcp import FastMCP
  11. from mcp.server.transport_security import TransportSecuritySettings
  12. from mcp import ClientSession
  13. from mcp.client.sse import sse_client
  14. from .config import load_config
  15. from .argus_client import get_regime as argus_get_regime, get_snapshot as argus_get_snapshot
  16. from .crypto_client import get_price, get_regime
  17. from .decision_engine import assess_wallet_state, make_decision
  18. from .narrative_engine import build_narrative
  19. from .state_engine import synthesize_state
  20. from .store import delete_concern, get_state, init_db, list_concerns, latest_cycle, latest_cycles, latest_decisions, latest_narratives, latest_observations, latest_regime_samples, prune_older_than, recent_regime_samples, recent_states_for_concern, sync_concerns_from_strategies, upsert_cycle, upsert_decision, upsert_narrative, upsert_observation, upsert_regime_sample, upsert_state, latest_states
  21. from .trader_client import apply_control_decision as trader_apply_control_decision, get_strategy as trader_get_strategy, list_strategies
  22. mcp = FastMCP(
  23. "hermes-mcp",
  24. transport_security=TransportSecuritySettings(enable_dns_rebinding_protection=False),
  25. )
  26. def _build_trader_control_payload(*, decision_id: str, concern: dict, decision: object) -> dict | None:
  27. action = str(getattr(decision, "action", "") or "").strip()
  28. target_strategy = str(getattr(decision, "target_strategy", "") or "").strip() or None
  29. decision_payload = getattr(decision, "payload", {}) if isinstance(getattr(decision, "payload", {}), dict) else {}
  30. current_primary = str(decision_payload.get("current_primary_strategy") or "").strip() or None
  31. trader_action: str | None = None
  32. risk_mode: str | None = None
  33. if action.startswith("replace_with_") or action.startswith("enable_"):
  34. trader_action = "switch"
  35. elif action == "suspend_grid":
  36. trader_action = "pause"
  37. target_strategy = current_primary
  38. elif action == "set_risk_mode":
  39. trader_action = "set_risk_mode"
  40. risk_mode = str(decision_payload.get("risk_mode") or "").strip() or None
  41. else:
  42. return None
  43. account_id = str(concern.get("account_id") or "").strip()
  44. market_symbol = str(concern.get("market_symbol") or "").strip().lower()
  45. concern_id = str(concern.get("id") or "").strip() or None
  46. reason = str(getattr(decision, "reason_summary", "") or "").strip()
  47. confidence = float(getattr(decision, "confidence", 0.0) or 0.0)
  48. payload = {
  49. "decision_id": decision_id,
  50. "concern_id": concern_id,
  51. "account_id": account_id,
  52. "market_symbol": market_symbol,
  53. "action": trader_action,
  54. "target_strategy_id": target_strategy,
  55. "expected_active_strategy_id": current_primary,
  56. "risk_mode": risk_mode,
  57. "reason": reason,
  58. "confidence": confidence,
  59. "dry_run": False,
  60. "override": False,
  61. "source": "hermes-mcp",
  62. "source_action": action,
  63. }
  64. return payload
  65. async def _maybe_dispatch_trader_action(*, cfg: object, decision_id: str, concern: dict, decision: object) -> dict:
  66. if not bool(getattr(decision, "requires_action", False)):
  67. return {"dispatch": "not_required"}
  68. payload = _build_trader_control_payload(decision_id=decision_id, concern=concern, decision=decision)
  69. if payload is None:
  70. return {
  71. "dispatch": "skipped",
  72. "reason": f"no trader action mapping for {getattr(decision, 'action', 'unknown')}",
  73. }
  74. if not bool(getattr(cfg, "hermes_allow_actions", False)):
  75. return {
  76. "dispatch": "blocked",
  77. "reason": "HERMES_ALLOW_ACTIONS is false",
  78. "payload": payload,
  79. }
  80. try:
  81. result = await trader_apply_control_decision(getattr(cfg, "trader_url"), payload)
  82. return {
  83. "dispatch": "sent",
  84. "payload": payload,
  85. "result": result,
  86. }
  87. except Exception as exc:
  88. return {
  89. "dispatch": "failed",
  90. "payload": payload,
  91. "error": str(exc),
  92. }
  93. @mcp.tool(description="Return Hermes current state, narrative, uncertainty, and a short self-assessment report.")
  94. def report() -> dict:
  95. state = get_state()
  96. return {
  97. "status": state.get("status", "stub"),
  98. "thinking": state.get("thinking", "Hermes scaffold is ready."),
  99. "confidence": state.get("confidence", 0.0),
  100. "uncertainty": state.get("uncertainty", ["no live adapters wired yet"]),
  101. "layers": state.get("layers", []),
  102. }
  103. @asynccontextmanager
  104. async def lifespan(_: FastAPI):
  105. cfg = load_config()
  106. init_db()
  107. try:
  108. sync_concerns_from_strategies(await list_strategies(cfg.trader_url))
  109. except Exception:
  110. pass
  111. try:
  112. prune_older_than(cfg.retention_days)
  113. except Exception:
  114. pass
  115. async def _poll_loop() -> None:
  116. while True:
  117. started = datetime.now(timezone.utc).isoformat()
  118. cycle_id = str(uuid4())
  119. concerns = list_concerns()
  120. try:
  121. strategy_inventory = await list_strategies(cfg.trader_url)
  122. enriched_inventory = []
  123. for strategy in strategy_inventory:
  124. instance_id = str(strategy.get("id") or "").strip()
  125. if not instance_id:
  126. enriched_inventory.append(strategy)
  127. continue
  128. try:
  129. detail = await trader_get_strategy(cfg.trader_url, instance_id, include_state=True, include_report=True)
  130. enriched_inventory.append({**strategy, **detail})
  131. except Exception:
  132. enriched_inventory.append(strategy)
  133. strategy_inventory = enriched_inventory
  134. except Exception:
  135. strategy_inventory = []
  136. upsert_cycle(id=cycle_id, started_at=started, finished_at=None, status="running", trigger="interval", notes=f"polling {len(concerns)} concerns")
  137. argus_snapshot: dict = {}
  138. argus_regime: dict = {}
  139. try:
  140. argus_snapshot = await argus_get_snapshot(cfg.argus_url)
  141. except Exception:
  142. argus_snapshot = {}
  143. try:
  144. argus_regime = await argus_get_regime(cfg.argus_url)
  145. except Exception:
  146. argus_regime = {}
  147. if argus_snapshot or argus_regime:
  148. upsert_observation(
  149. id=f"{cycle_id}:argus",
  150. cycle_id=cycle_id,
  151. concern_id=None,
  152. source="argus-mcp",
  153. kind="macro_snapshot",
  154. payload_json=json.dumps({"snapshot": argus_snapshot, "regime": argus_regime}, ensure_ascii=False),
  155. observed_at=datetime.now(timezone.utc).isoformat(),
  156. )
  157. for concern in concerns:
  158. symbol = _resolve_regime_symbol(concern)
  159. if not symbol:
  160. continue
  161. account_id = str(concern.get("account_id") or "").strip()
  162. account_info = {}
  163. if account_id:
  164. try:
  165. payload = await _call_exec_tool(cfg.exec_url, "get_account_info", {"account_id": account_id})
  166. account_info = payload if isinstance(payload, dict) else {}
  167. except Exception:
  168. account_info = {}
  169. current_regimes: list[dict] = []
  170. for timeframe in cfg.crypto_timeframes:
  171. regime = await get_regime(cfg.crypto_url, str(symbol), timeframe)
  172. current_regimes.append({**regime, "timeframe": timeframe})
  173. upsert_regime_sample(
  174. id=f"{cycle_id}:{concern['id']}:{timeframe}",
  175. cycle_id=cycle_id,
  176. concern_id=str(concern["id"]),
  177. timeframe=timeframe,
  178. regime_json=json.dumps(regime, ensure_ascii=False),
  179. captured_at=datetime.now(timezone.utc).isoformat(),
  180. )
  181. try:
  182. state = synthesize_state(
  183. concern=concern,
  184. regimes=current_regimes,
  185. account_info=account_info,
  186. argus_snapshot=argus_snapshot,
  187. argus_regime=argus_regime,
  188. )
  189. upsert_state(
  190. id=f"{cycle_id}:{concern['id']}",
  191. cycle_id=cycle_id,
  192. concern_id=str(concern["id"]),
  193. market_regime=state.market_regime,
  194. volatility_state=state.volatility_state,
  195. liquidity_state=state.liquidity_state,
  196. sentiment_pressure=state.sentiment_pressure,
  197. event_risk=state.event_risk,
  198. execution_quality=state.execution_quality,
  199. confidence=state.confidence,
  200. payload_json=json.dumps(state.payload, ensure_ascii=False),
  201. created_at=state.payload.get("generated_at"),
  202. )
  203. narrative = build_narrative(concern=concern, state_payload=state.payload)
  204. upsert_narrative(
  205. id=f"{cycle_id}:{concern['id']}",
  206. cycle_id=cycle_id,
  207. concern_id=str(concern["id"]),
  208. summary=narrative.summary,
  209. key_drivers_json=json.dumps(narrative.key_drivers, ensure_ascii=False),
  210. risk_flags_json=json.dumps(narrative.risk_flags, ensure_ascii=False),
  211. uncertainties_json=json.dumps(narrative.uncertainties, ensure_ascii=False),
  212. confidence=narrative.confidence,
  213. created_at=narrative.payload.get("generated_at"),
  214. )
  215. latest_price = None
  216. if current_regimes:
  217. latest_price = next((r.get("price") for r in reversed(current_regimes) if r.get("price") is not None), None)
  218. wallet_state = assess_wallet_state(
  219. account_info=account_info,
  220. concern=concern,
  221. price=float(latest_price) if latest_price is not None else None,
  222. strategies=strategy_inventory,
  223. )
  224. breakout_window_seconds = max(300, int(getattr(cfg, "breakout_memory_window_seconds", 900) or 900))
  225. recent_state_rows = recent_states_for_concern(concern_id=str(concern["id"]), since_seconds=breakout_window_seconds, limit=12)
  226. decision = make_decision(
  227. concern=concern,
  228. narrative_payload={
  229. **state.payload,
  230. **narrative.payload,
  231. "confidence": narrative.confidence,
  232. },
  233. wallet_state=wallet_state,
  234. strategies=strategy_inventory,
  235. history_window={
  236. "window_seconds": breakout_window_seconds,
  237. "recent_states": recent_state_rows,
  238. },
  239. )
  240. decision_id = f"{cycle_id}:{concern['id']}"
  241. dispatch_record = await _maybe_dispatch_trader_action(
  242. cfg=cfg,
  243. decision_id=decision_id,
  244. concern=concern,
  245. decision=decision,
  246. )
  247. decision_payload = {
  248. **decision.payload,
  249. "dispatch": dispatch_record,
  250. }
  251. upsert_decision(
  252. id=decision_id,
  253. cycle_id=cycle_id,
  254. concern_id=str(concern["id"]),
  255. mode=decision.mode,
  256. action=decision.action,
  257. target_strategy=decision.target_strategy,
  258. target_policy_json=json.dumps(decision_payload, ensure_ascii=False),
  259. reason_summary=decision.reason_summary,
  260. confidence=decision.confidence,
  261. requires_action=decision.requires_action,
  262. created_at=decision.payload.get("generated_at"),
  263. )
  264. except Exception:
  265. pass
  266. upsert_cycle(id=cycle_id, started_at=started, finished_at=datetime.now(timezone.utc).isoformat(), status="ok", trigger="interval", notes=f"polled {len(concerns)} concerns over {','.join(cfg.crypto_timeframes)}")
  267. await asyncio.sleep(max(10, cfg.cycle_seconds))
  268. asyncio.create_task(_poll_loop())
  269. yield
  270. app = FastAPI(title="Hermes MCP", lifespan=lifespan)
  271. app.mount("/mcp", mcp.sse_app())
  272. @app.get("/")
  273. def root() -> dict:
  274. return {"status": "ok", "mount": "/mcp/sse", "dashboard": "/dashboard"}
  275. @app.get("/health")
  276. def health() -> dict:
  277. return {"status": "ok", "db": "sqlite", "tool": "report"}
  278. @app.delete("/concerns/{concern_id}")
  279. def remove_concern(concern_id: str) -> JSONResponse:
  280. deleted = delete_concern(concern_id=concern_id)
  281. if not deleted.get("concerns"):
  282. return JSONResponse({"ok": False, "error": "concern not found", "deleted": deleted}, status_code=404)
  283. return JSONResponse({"ok": True, "deleted": deleted})
  284. def _strip_sse(url: str) -> str:
  285. root = url.rstrip("/")
  286. return root[:-8] if root.endswith("/mcp/sse") else root
  287. async def _call_exec_tool(exec_url: str, tool: str, arguments: dict) -> object:
  288. async with sse_client(exec_url) as (read_stream, write_stream):
  289. async with ClientSession(read_stream, write_stream) as session:
  290. await session.initialize()
  291. result = await session.call_tool(tool, arguments)
  292. content = getattr(result, "content", None) or []
  293. if not content:
  294. return None
  295. first = content[0]
  296. text = getattr(first, "text", None) if not isinstance(first, dict) else first.get("text")
  297. if text is None:
  298. return None
  299. try:
  300. return json.loads(text)
  301. except Exception:
  302. return text
  303. async def _load_exec_enrichment(exec_url: str, crypto_url: str, concerns: list[dict]) -> tuple[dict[str, dict], dict[str, dict], dict[str, float | None]]:
  304. account_ids = sorted({str(c.get("account_id") or "").strip() for c in concerns if str(c.get("account_id") or "").strip()})
  305. market_symbols = sorted({str(c.get("market_symbol") or "").strip().lower() for c in concerns if str(c.get("market_symbol") or "").strip()})
  306. account_payloads = await asyncio.gather(*[_call_exec_tool(exec_url, "get_account_info", {"account_id": account_id}) for account_id in account_ids])
  307. market_payload = await _call_exec_tool(exec_url, "list_markets", {})
  308. accounts_by_id = {account_id: payload for account_id, payload in zip(account_ids, account_payloads) if isinstance(payload, dict)}
  309. total_values: dict[str, float | None] = {}
  310. for account_id, payload in accounts_by_id.items():
  311. total_values[account_id] = await _live_total_value(crypto_url, payload)
  312. markets_by_symbol: dict[str, dict] = {}
  313. if isinstance(market_payload, list):
  314. for market in market_payload:
  315. if not isinstance(market, dict):
  316. continue
  317. symbol = str(market.get("symbol") or market.get("market_symbol") or "").strip().lower()
  318. if symbol in market_symbols or symbol:
  319. markets_by_symbol[symbol] = market
  320. return accounts_by_id, markets_by_symbol, total_values
  321. async def _live_total_value(crypto_url: str, account_info: dict) -> float | None:
  322. balances = account_info.get("balances")
  323. if not isinstance(balances, list):
  324. return None
  325. total = 0.0
  326. seen = False
  327. for item in balances:
  328. if not isinstance(item, dict):
  329. continue
  330. asset = str(item.get("asset_code") or item.get("asset") or "").strip().lower()
  331. amount = item.get("total")
  332. if not asset or amount is None:
  333. continue
  334. try:
  335. amount_f = float(amount)
  336. except Exception:
  337. continue
  338. seen = True
  339. if asset == "usd":
  340. total += amount_f
  341. continue
  342. price_payload = await get_price(crypto_url, asset)
  343. try:
  344. price = float(price_payload.get("price"))
  345. except Exception:
  346. price = 0.0
  347. total += amount_f * price
  348. return total if seen else None
  349. def _compact_balances(payload: object) -> str:
  350. if not isinstance(payload, list):
  351. return "-"
  352. parts: list[str] = []
  353. for item in payload:
  354. if not isinstance(item, dict):
  355. continue
  356. asset = str(item.get("asset_code") or item.get("asset") or "").upper().strip()
  357. total = item.get("total")
  358. available = item.get("available")
  359. value_usd = item.get("value_usd")
  360. if not asset:
  361. continue
  362. segment = f"{asset} {float(total):.8g}" if total is not None else asset
  363. if available is not None and total is not None and available != total:
  364. segment += f" (avail {float(available):.8g})"
  365. if isinstance(value_usd, (int, float)):
  366. segment += f" ≈ ${float(value_usd):,.2f}"
  367. parts.append(segment)
  368. return " | ".join(parts[:5]) or "-"
  369. def _resolve_regime_symbol(concern: dict) -> str | None:
  370. base = str(concern.get("base_currency") or "").strip().upper()
  371. if base:
  372. return base
  373. market = str(concern.get("market_symbol") or "").strip().upper().replace("/", "").replace("-", "")
  374. for suffix in ("USDT", "USDC", "USD", "EUR", "BTC", "ETH"):
  375. if market.endswith(suffix) and len(market) > len(suffix):
  376. return market[:-len(suffix)]
  377. return market or None
  378. @app.get("/dashboard/data")
  379. def dashboard_data() -> JSONResponse:
  380. cfg = load_config()
  381. concerns = list_concerns()
  382. accounts_by_id: dict[str, dict] = {}
  383. markets_by_symbol: dict[str, dict] = {}
  384. strategy_inventory: list[dict] = []
  385. strategy_inventory_available = True
  386. try:
  387. accounts_by_id, markets_by_symbol, total_values = anyio.run(_load_exec_enrichment, cfg.exec_url, cfg.crypto_url, concerns)
  388. except Exception:
  389. total_values = {}
  390. pass
  391. try:
  392. strategy_inventory = anyio.run(list_strategies, cfg.trader_url)
  393. except Exception:
  394. strategy_inventory = []
  395. strategy_inventory_available = False
  396. live_scopes = {
  397. (str(strategy.get("account_id") or "").strip(), str(strategy.get("market_symbol") or "").strip().lower())
  398. for strategy in strategy_inventory
  399. if str(strategy.get("account_id") or "").strip() and str(strategy.get("market_symbol") or "").strip()
  400. }
  401. enriched = []
  402. concern_lookup: dict[str, dict] = {}
  403. for concern in concerns:
  404. account_id = str(concern.get("account_id") or "").strip()
  405. market_symbol = str(concern.get("market_symbol") or "").strip().lower()
  406. account_info = accounts_by_id.get(account_id, {})
  407. market_info = markets_by_symbol.get(market_symbol, {})
  408. enriched.append({
  409. **concern,
  410. "account_display": account_info.get("display_name") or account_id,
  411. "balances": account_info.get("balances") or account_info.get("balance") or account_info.get("wallets") or [],
  412. "balance_summary": _compact_balances(account_info.get("balances") or account_info.get("balance") or account_info.get("wallets") or []),
  413. "total_value_usd": total_values.get(account_id) if total_values.get(account_id) is not None else account_info.get("total_value_usd"),
  414. "market_display": market_info.get("name") or concern.get("market_symbol") or "",
  415. "market_description": market_info.get("description") or "",
  416. "orphaned": strategy_inventory_available and (account_id, market_symbol) not in live_scopes,
  417. })
  418. concern_lookup[str(concern.get("id") or "")] = enriched[-1]
  419. regimes = []
  420. histories_by_key: dict[str, list[dict]] = {}
  421. for sample in recent_regime_samples(1000):
  422. concern_id = str(sample.get("concern_id") or "")
  423. timeframe = str(sample.get("timeframe") or "")
  424. key = f"{concern_id}::{timeframe}"
  425. bucket = histories_by_key.setdefault(key, [])
  426. if len(bucket) < 24:
  427. bucket.append(sample)
  428. for sample in latest_regime_samples(20):
  429. concern_meta = concern_lookup.get(str(sample.get("concern_id") or ""), {})
  430. regimes.append({**sample, **{
  431. "account_display": concern_meta.get("account_display"),
  432. "market_display": concern_meta.get("market_display"),
  433. "market_symbol": concern_meta.get("market_symbol"),
  434. }})
  435. return JSONResponse({
  436. "latest_cycle": latest_cycle(),
  437. "cycles": latest_cycles(10),
  438. "argus_observations": latest_observations(20, source="argus-mcp"),
  439. "concerns": enriched,
  440. "regime_samples": regimes,
  441. "regime_histories": histories_by_key,
  442. "state_samples": latest_states(20),
  443. "state_history": latest_states(100),
  444. "narrative_samples": latest_narratives(20),
  445. "decision_samples": latest_decisions(20),
  446. "decision_history": latest_decisions(100),
  447. })