from contextlib import asynccontextmanager import asyncio from fastapi import FastAPI from .dashboard import router as dashboard_router from .strategy_engine import reconcile_all, reconcile_instance, run_due_ticks, get_running_strategy, pause_strategy, resume_strategy, tick_strategy from .strategy_registry import list_available_strategy_modules from .strategy_store import add_strategy_instance, delete_strategy_instance, list_strategy_instances, update_strategy_config, update_strategy_mode, update_strategy_state try: from mcp.server.fastmcp import FastMCP from mcp.server.transport_security import TransportSecuritySettings except ImportError: # pragma: no cover FastMCP = None TransportSecuritySettings = None async def _tick_loop(stop_event: asyncio.Event) -> None: while not stop_event.is_set(): run_due_ticks() await asyncio.sleep(1) @asynccontextmanager async def lifespan(_: FastAPI): stop_event = asyncio.Event() reconcile_all() tick_task = asyncio.create_task(_tick_loop(stop_event)) try: yield finally: stop_event.set() tick_task.cancel() app = FastAPI(title="Trader MCP", lifespan=lifespan) app.include_router(dashboard_router) @app.get("/") def landing(): return {"name": "trader-mcp", "status": "ok"} @app.get("/health") def health(): return {"status": "ok"} @app.get("/strategies") def strategies_list(): """Return available strategy modules and configured strategy instances.""" return { "available": [s.__dict__ for s in list_available_strategy_modules()], "configured": [s.__dict__ for s in list_strategy_instances()], } @app.post("/strategies") def strategies_add(payload: dict): """Create a new strategy instance from the supplied payload.""" record = add_strategy_instance( id=payload["id"], strategy_type=payload["strategy_type"], account_id=payload["account_id"], client_id=payload.get("client_id"), mode=payload.get("mode", "off"), config=payload.get("config") or {}, started_at=payload.get("started_at"), activated_at=payload.get("activated_at"), ) reconcile_instance(record.id) return record.__dict__ @app.delete("/strategies/{instance_id}") def strategies_delete(instance_id: str): """Delete a strategy instance and reconcile the runtime state.""" result = delete_strategy_instance(instance_id) reconcile_instance(instance_id) return {"ok": result, "id": instance_id} @app.post("/strategies/{instance_id}/mode") def strategies_mode(instance_id: str, payload: dict): """Update a strategy mode and reconcile it into the runtime.""" ok = update_strategy_mode(instance_id, payload["mode"], started_at=payload.get("started_at"), activated_at=payload.get("activated_at")) if ok: return reconcile_instance(instance_id) return {"ok": False, "id": instance_id} @app.post("/strategies/{instance_id}/config") def strategies_config(instance_id: str, payload: dict): """Replace a strategy config and reconcile it into the runtime.""" ok = update_strategy_config(instance_id, payload["config"]) if ok: return reconcile_instance(instance_id) return {"ok": False, "id": instance_id} @app.post("/strategies/reconcile") def strategies_reconcile(): """Reconcile every configured strategy with the live runtime.""" return reconcile_all() def list_strategies() -> dict: """list_strategies() Return the configured strategy instances in a compact, standardized form. Each item includes the live runtime summary needed by humans and agents. """ strategies = [] for record in list_strategy_instances(): state = record.state or {} strategies.append( { "id": record.id, "name": record.name or record.strategy_type, "strategy_type": record.strategy_type, "mode": record.mode, "status": "running" if record.mode != "off" else "stopped", "account_id": record.account_id, "market_symbol": record.market_symbol, "last_price": state.get("last_price"), "last_side": state.get("last_side") or state.get("last_action"), "open_order_count": state.get("open_order_count", 0), } ) return {"strategies": strategies} def get_strategy( instance_id: str, include_config: bool = False, include_state: bool = False, include_render: bool = False, include_debug: bool = False, include_report: bool = True, ) -> dict: """get_strategy(instance_id) Return one strategy record with compact live metadata. Expanded config, state, render, and debug data are opt-in. Report is included by default. """ record = next((r for r in list_strategy_instances() if r.id == instance_id), None) if record is None: return {"ok": False, "error": "strategy not found", "id": instance_id} runtime = get_running_strategy(instance_id) state = dict(record.state or {}) if runtime is not None: state = dict(runtime.instance.state or state) state["paused"] = runtime.paused state["next_tick_at"] = runtime.next_tick_at render = None if include_render and runtime is not None: try: render = runtime.instance.render() except Exception as exc: render = {"error": str(exc)} debug = None if include_debug: debug = state.get("debug_log") or [] report = None if include_report: try: instance = runtime.instance if runtime is not None else None if instance is None: from .strategy_engine import _instantiate instance = _instantiate(record) instance.state = state report = instance.report() except Exception as exc: report = {"error": str(exc)} response = { "ok": True, "id": record.id, "name": record.name or record.strategy_type, "strategy_type": record.strategy_type, "mode": record.mode, "status": "running" if runtime is not None and not runtime.paused and record.mode != "off" else "paused" if runtime is not None and runtime.paused else "stopped", "account_id": record.account_id, "client_id": record.client_id, "market_symbol": record.market_symbol, "base_currency": record.base_currency, "counter_currency": record.counter_currency, } if include_config: response["config"] = record.config if include_state: response["state"] = state response["last_price"] = state.get("last_price") response["last_side"] = state.get("last_side") or state.get("last_action") response["open_order_count"] = state.get("open_order_count", 0) response["last_error"] = state.get("last_error", "") if include_report: response["report"] = report if include_render: response["render"] = render if include_debug: response["debug_log"] = debug return response def update_strategy(instance_id: str, config: dict | None = None, state: dict | None = None) -> dict: """update_strategy(instance_id, config=None, state=None) Update the stored config and/or state for a strategy, then reconcile it. Use this for edits that should be persisted without changing lifecycle mode. """ changed = False if config is not None: changed = update_strategy_config(instance_id, config) or changed if state is not None: changed = update_strategy_state(instance_id, state) or changed if changed: return reconcile_instance(instance_id) return {"ok": False, "id": instance_id} def set_strategy_policy(instance_id: str, policy: dict) -> dict: """set_strategy_policy(instance_id, policy) Store a high-level Hermes policy on the strategy and persist it. Policy is intentionally abstract, for example: risk_posture and priority. """ record = next((r for r in list_strategy_instances() if r.id == instance_id), None) if record is None: return {"ok": False, "id": instance_id, "error": "strategy not found"} if not isinstance(policy, dict): return {"ok": False, "id": instance_id, "error": "policy must be an object"} config = dict(record.config or {}) config["policy"] = { "risk_posture": policy.get("risk_posture", config.get("policy", {}).get("risk_posture", "normal")), "priority": policy.get("priority", config.get("policy", {}).get("priority", "normal")), "reason": policy.get("reason", config.get("policy", {}).get("reason", "")), "decision_id": policy.get("decision_id", config.get("policy", {}).get("decision_id", "")), } state = dict(record.state or {}) state["policy"] = config["policy"] ok_config = update_strategy_config(instance_id, config) ok_state = update_strategy_state(instance_id, state) if ok_config and ok_state: return reconcile_instance(instance_id) return {"ok": False, "id": instance_id, "error": "failed to persist policy"} def control_strategy(instance_id: str, action: str) -> dict: """control_strategy(instance_id, action) Control a strategy with one action: start, pause, resume, stop, reconcile. This is the lifecycle entry point for operators and agents. """ action = str(action or "").strip().lower() if action == "pause": return pause_strategy(instance_id) if action == "resume": return resume_strategy(instance_id) if action == "reconcile": return reconcile_instance(instance_id) if action == "start": ok = update_strategy_mode(instance_id, "active") return reconcile_instance(instance_id) if ok else {"ok": False, "id": instance_id} if action == "stop": ok = update_strategy_mode(instance_id, "off") return reconcile_instance(instance_id) if ok else {"ok": False, "id": instance_id} return {"ok": False, "id": instance_id, "error": f"unsupported action: {action}"} def get_capabilities() -> dict: """get_capabilities() Describe the current public MCP surface and the strategy record shape. """ return { "name": "trader-mcp", "tools": [ { "name": "list_strategies", "description": "List configured strategy instances with compact live metadata.", }, { "name": "get_strategy", "description": "Return one strategy record with optional render and debug data.", "params": {"include_render": "bool", "include_debug": "bool"}, }, { "name": "update_strategy", "description": "Update stored strategy config and/or state, then reconcile.", }, { "name": "control_strategy", "description": "Control lifecycle or reconcile with a single action.", "params": {"action": "start|pause|resume|stop|reconcile"}, }, { "name": "set_strategy_policy", "description": "Store a high-level Hermes policy on a strategy.", "params": {"policy": "{risk_posture, priority, reason?, decision_id?}"}, }, ], "strategy_summary_fields": [ "id", "name", "strategy_type", "mode", "status", "account_id", "client_id", "market_symbol", "base_currency", "counter_currency", "config", "state", "last_price", "last_side", "open_order_count", "last_error", ], } # MCP (SSE) # FastMCP mounted at /mcp with SSE at /mcp/sse (when FastMCP is available) if FastMCP is not None: mcp = FastMCP( "trader-mcp", transport_security=TransportSecuritySettings( enable_dns_rebinding_protection=False, ), ) mcp.tool()(list_strategies) mcp.tool()(get_strategy) mcp.tool()(update_strategy) mcp.tool()(control_strategy) mcp.tool()(set_strategy_policy) mcp.tool()(get_capabilities) app.mount("/mcp", mcp.sse_app())