server.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348
  1. from contextlib import asynccontextmanager
  2. import asyncio
  3. from fastapi import FastAPI
  4. from .dashboard import router as dashboard_router
  5. from .strategy_engine import reconcile_all, reconcile_instance, run_due_ticks, get_running_strategy, pause_strategy, resume_strategy, tick_strategy
  6. from .strategy_registry import list_available_strategy_modules
  7. from .strategy_store import add_strategy_instance, delete_strategy_instance, list_strategy_instances, update_strategy_config, update_strategy_mode, update_strategy_state
  8. try:
  9. from mcp.server.fastmcp import FastMCP
  10. from mcp.server.transport_security import TransportSecuritySettings
  11. except ImportError: # pragma: no cover
  12. FastMCP = None
  13. TransportSecuritySettings = None
  14. async def _tick_loop(stop_event: asyncio.Event) -> None:
  15. while not stop_event.is_set():
  16. run_due_ticks()
  17. await asyncio.sleep(1)
  18. @asynccontextmanager
  19. async def lifespan(_: FastAPI):
  20. stop_event = asyncio.Event()
  21. reconcile_all()
  22. tick_task = asyncio.create_task(_tick_loop(stop_event))
  23. try:
  24. yield
  25. finally:
  26. stop_event.set()
  27. tick_task.cancel()
  28. app = FastAPI(title="Trader MCP", lifespan=lifespan)
  29. app.include_router(dashboard_router)
  30. @app.get("/")
  31. def landing():
  32. return {"name": "trader-mcp", "status": "ok"}
  33. @app.get("/health")
  34. def health():
  35. return {"status": "ok"}
  36. @app.get("/strategies")
  37. def strategies_list():
  38. """Return available strategy modules and configured strategy instances."""
  39. return {
  40. "available": [s.__dict__ for s in list_available_strategy_modules()],
  41. "configured": [s.__dict__ for s in list_strategy_instances()],
  42. }
  43. @app.post("/strategies")
  44. def strategies_add(payload: dict):
  45. """Create a new strategy instance from the supplied payload."""
  46. record = add_strategy_instance(
  47. id=payload["id"],
  48. strategy_type=payload["strategy_type"],
  49. account_id=payload["account_id"],
  50. client_id=payload.get("client_id"),
  51. mode=payload.get("mode", "off"),
  52. config=payload.get("config") or {},
  53. started_at=payload.get("started_at"),
  54. activated_at=payload.get("activated_at"),
  55. )
  56. reconcile_instance(record.id)
  57. return record.__dict__
  58. @app.delete("/strategies/{instance_id}")
  59. def strategies_delete(instance_id: str):
  60. """Delete a strategy instance and reconcile the runtime state."""
  61. result = delete_strategy_instance(instance_id)
  62. reconcile_instance(instance_id)
  63. return {"ok": result, "id": instance_id}
  64. @app.post("/strategies/{instance_id}/mode")
  65. def strategies_mode(instance_id: str, payload: dict):
  66. """Update a strategy mode and reconcile it into the runtime."""
  67. ok = update_strategy_mode(instance_id, payload["mode"], started_at=payload.get("started_at"), activated_at=payload.get("activated_at"))
  68. if ok:
  69. return reconcile_instance(instance_id)
  70. return {"ok": False, "id": instance_id}
  71. @app.post("/strategies/{instance_id}/config")
  72. def strategies_config(instance_id: str, payload: dict):
  73. """Replace a strategy config and reconcile it into the runtime."""
  74. ok = update_strategy_config(instance_id, payload["config"])
  75. if ok:
  76. return reconcile_instance(instance_id)
  77. return {"ok": False, "id": instance_id}
  78. @app.post("/strategies/reconcile")
  79. def strategies_reconcile():
  80. """Reconcile every configured strategy with the live runtime."""
  81. return reconcile_all()
  82. def list_strategies() -> dict:
  83. """list_strategies()
  84. Return the configured strategy instances in a compact, standardized form.
  85. Each item includes the live runtime summary needed by humans and agents.
  86. """
  87. strategies = []
  88. for record in list_strategy_instances():
  89. state = record.state or {}
  90. strategies.append(
  91. {
  92. "id": record.id,
  93. "name": record.name or record.strategy_type,
  94. "strategy_type": record.strategy_type,
  95. "mode": record.mode,
  96. "status": "running" if record.mode != "off" else "stopped",
  97. "account_id": record.account_id,
  98. "market_symbol": record.market_symbol,
  99. "last_price": state.get("last_price"),
  100. "last_side": state.get("last_side") or state.get("last_action"),
  101. "open_order_count": state.get("open_order_count", 0),
  102. }
  103. )
  104. return {"strategies": strategies}
  105. def get_strategy(
  106. instance_id: str,
  107. include_config: bool = False,
  108. include_state: bool = False,
  109. include_render: bool = False,
  110. include_debug: bool = False,
  111. include_report: bool = True,
  112. ) -> dict:
  113. """get_strategy(instance_id)
  114. Return one strategy record with compact live metadata.
  115. Expanded config, state, render, and debug data are opt-in. Report is included by default.
  116. """
  117. record = next((r for r in list_strategy_instances() if r.id == instance_id), None)
  118. if record is None:
  119. return {"ok": False, "error": "strategy not found", "id": instance_id}
  120. runtime = get_running_strategy(instance_id)
  121. state = dict(record.state or {})
  122. if runtime is not None:
  123. state = dict(runtime.instance.state or state)
  124. state["paused"] = runtime.paused
  125. state["next_tick_at"] = runtime.next_tick_at
  126. render = None
  127. if include_render and runtime is not None:
  128. try:
  129. render = runtime.instance.render()
  130. except Exception as exc:
  131. render = {"error": str(exc)}
  132. debug = None
  133. if include_debug:
  134. debug = state.get("debug_log") or []
  135. report = None
  136. if include_report:
  137. try:
  138. instance = runtime.instance if runtime is not None else None
  139. if instance is None:
  140. from .strategy_engine import _instantiate
  141. instance = _instantiate(record)
  142. instance.state = state
  143. report = instance.report()
  144. except Exception as exc:
  145. report = {"error": str(exc)}
  146. response = {
  147. "ok": True,
  148. "id": record.id,
  149. "name": record.name or record.strategy_type,
  150. "strategy_type": record.strategy_type,
  151. "mode": record.mode,
  152. "status": "running" if runtime is not None and not runtime.paused and record.mode != "off" else "paused" if runtime is not None and runtime.paused else "stopped",
  153. "account_id": record.account_id,
  154. "client_id": record.client_id,
  155. "market_symbol": record.market_symbol,
  156. "base_currency": record.base_currency,
  157. "counter_currency": record.counter_currency,
  158. }
  159. if include_config:
  160. response["config"] = record.config
  161. if include_state:
  162. response["state"] = state
  163. response["last_price"] = state.get("last_price")
  164. response["last_side"] = state.get("last_side") or state.get("last_action")
  165. response["open_order_count"] = state.get("open_order_count", 0)
  166. response["last_error"] = state.get("last_error", "")
  167. if include_report:
  168. response["report"] = report
  169. if include_render:
  170. response["render"] = render
  171. if include_debug:
  172. response["debug_log"] = debug
  173. return response
  174. def update_strategy(instance_id: str, config: dict | None = None, state: dict | None = None) -> dict:
  175. """update_strategy(instance_id, config=None, state=None)
  176. Update the stored config and/or state for a strategy, then reconcile it.
  177. Use this for edits that should be persisted without changing lifecycle mode.
  178. """
  179. changed = False
  180. if config is not None:
  181. changed = update_strategy_config(instance_id, config) or changed
  182. if state is not None:
  183. changed = update_strategy_state(instance_id, state) or changed
  184. if changed:
  185. return reconcile_instance(instance_id)
  186. return {"ok": False, "id": instance_id}
  187. def set_strategy_policy(instance_id: str, policy: dict) -> dict:
  188. """set_strategy_policy(instance_id, policy)
  189. Store a high-level Hermes policy on the strategy and persist it.
  190. Policy is intentionally abstract, for example: risk_posture and priority.
  191. """
  192. record = next((r for r in list_strategy_instances() if r.id == instance_id), None)
  193. if record is None:
  194. return {"ok": False, "id": instance_id, "error": "strategy not found"}
  195. if not isinstance(policy, dict):
  196. return {"ok": False, "id": instance_id, "error": "policy must be an object"}
  197. config = dict(record.config or {})
  198. config["policy"] = {
  199. "risk_posture": policy.get("risk_posture", config.get("policy", {}).get("risk_posture", "normal")),
  200. "priority": policy.get("priority", config.get("policy", {}).get("priority", "normal")),
  201. "reason": policy.get("reason", config.get("policy", {}).get("reason", "")),
  202. "decision_id": policy.get("decision_id", config.get("policy", {}).get("decision_id", "")),
  203. }
  204. state = dict(record.state or {})
  205. state["policy"] = config["policy"]
  206. ok_config = update_strategy_config(instance_id, config)
  207. ok_state = update_strategy_state(instance_id, state)
  208. if ok_config and ok_state:
  209. return reconcile_instance(instance_id)
  210. return {"ok": False, "id": instance_id, "error": "failed to persist policy"}
  211. def control_strategy(instance_id: str, action: str) -> dict:
  212. """control_strategy(instance_id, action)
  213. Control a strategy with one action: start, pause, resume, stop, reconcile.
  214. This is the lifecycle entry point for operators and agents.
  215. """
  216. action = str(action or "").strip().lower()
  217. if action == "pause":
  218. return pause_strategy(instance_id)
  219. if action == "resume":
  220. return resume_strategy(instance_id)
  221. if action == "reconcile":
  222. return reconcile_instance(instance_id)
  223. if action == "start":
  224. ok = update_strategy_mode(instance_id, "active")
  225. return reconcile_instance(instance_id) if ok else {"ok": False, "id": instance_id}
  226. if action == "stop":
  227. ok = update_strategy_mode(instance_id, "off")
  228. return reconcile_instance(instance_id) if ok else {"ok": False, "id": instance_id}
  229. return {"ok": False, "id": instance_id, "error": f"unsupported action: {action}"}
  230. def get_capabilities() -> dict:
  231. """get_capabilities()
  232. Describe the current public MCP surface and the strategy record shape.
  233. """
  234. return {
  235. "name": "trader-mcp",
  236. "tools": [
  237. {
  238. "name": "list_strategies",
  239. "description": "List configured strategy instances with compact live metadata.",
  240. },
  241. {
  242. "name": "get_strategy",
  243. "description": "Return one strategy record with optional render and debug data.",
  244. "params": {"include_render": "bool", "include_debug": "bool"},
  245. },
  246. {
  247. "name": "update_strategy",
  248. "description": "Update stored strategy config and/or state, then reconcile.",
  249. },
  250. {
  251. "name": "control_strategy",
  252. "description": "Control lifecycle or reconcile with a single action.",
  253. "params": {"action": "start|pause|resume|stop|reconcile"},
  254. },
  255. {
  256. "name": "set_strategy_policy",
  257. "description": "Store a high-level Hermes policy on a strategy.",
  258. "params": {"policy": "{risk_posture, priority, reason?, decision_id?}"},
  259. },
  260. ],
  261. "strategy_summary_fields": [
  262. "id",
  263. "name",
  264. "strategy_type",
  265. "mode",
  266. "status",
  267. "account_id",
  268. "client_id",
  269. "market_symbol",
  270. "base_currency",
  271. "counter_currency",
  272. "config",
  273. "state",
  274. "last_price",
  275. "last_side",
  276. "open_order_count",
  277. "last_error",
  278. ],
  279. }
  280. # MCP (SSE)
  281. # FastMCP mounted at /mcp with SSE at /mcp/sse (when FastMCP is available)
  282. if FastMCP is not None:
  283. mcp = FastMCP(
  284. "trader-mcp",
  285. transport_security=TransportSecuritySettings(
  286. enable_dns_rebinding_protection=False,
  287. ),
  288. )
  289. mcp.tool()(list_strategies)
  290. mcp.tool()(get_strategy)
  291. mcp.tool()(update_strategy)
  292. mcp.tool()(control_strategy)
  293. mcp.tool()(set_strategy_policy)
  294. mcp.tool()(get_capabilities)
  295. app.mount("/mcp", mcp.sse_app())