| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348 |
- from contextlib import asynccontextmanager
- import asyncio
- from fastapi import FastAPI
- from .dashboard import router as dashboard_router
- from .strategy_engine import reconcile_all, reconcile_instance, run_due_ticks, get_running_strategy, pause_strategy, resume_strategy, tick_strategy
- from .strategy_registry import list_available_strategy_modules
- from .strategy_store import add_strategy_instance, delete_strategy_instance, list_strategy_instances, update_strategy_config, update_strategy_mode, update_strategy_state
- try:
- from mcp.server.fastmcp import FastMCP
- from mcp.server.transport_security import TransportSecuritySettings
- except ImportError: # pragma: no cover
- FastMCP = None
- TransportSecuritySettings = None
- async def _tick_loop(stop_event: asyncio.Event) -> None:
- while not stop_event.is_set():
- run_due_ticks()
- await asyncio.sleep(1)
- @asynccontextmanager
- async def lifespan(_: FastAPI):
- stop_event = asyncio.Event()
- reconcile_all()
- tick_task = asyncio.create_task(_tick_loop(stop_event))
- try:
- yield
- finally:
- stop_event.set()
- tick_task.cancel()
- app = FastAPI(title="Trader MCP", lifespan=lifespan)
- app.include_router(dashboard_router)
- @app.get("/")
- def landing():
- return {"name": "trader-mcp", "status": "ok"}
- @app.get("/health")
- def health():
- return {"status": "ok"}
- @app.get("/strategies")
- def strategies_list():
- """Return available strategy modules and configured strategy instances."""
- return {
- "available": [s.__dict__ for s in list_available_strategy_modules()],
- "configured": [s.__dict__ for s in list_strategy_instances()],
- }
- @app.post("/strategies")
- def strategies_add(payload: dict):
- """Create a new strategy instance from the supplied payload."""
- record = add_strategy_instance(
- id=payload["id"],
- strategy_type=payload["strategy_type"],
- account_id=payload["account_id"],
- client_id=payload.get("client_id"),
- mode=payload.get("mode", "off"),
- config=payload.get("config") or {},
- started_at=payload.get("started_at"),
- activated_at=payload.get("activated_at"),
- )
- reconcile_instance(record.id)
- return record.__dict__
- @app.delete("/strategies/{instance_id}")
- def strategies_delete(instance_id: str):
- """Delete a strategy instance and reconcile the runtime state."""
- result = delete_strategy_instance(instance_id)
- reconcile_instance(instance_id)
- return {"ok": result, "id": instance_id}
- @app.post("/strategies/{instance_id}/mode")
- def strategies_mode(instance_id: str, payload: dict):
- """Update a strategy mode and reconcile it into the runtime."""
- ok = update_strategy_mode(instance_id, payload["mode"], started_at=payload.get("started_at"), activated_at=payload.get("activated_at"))
- if ok:
- return reconcile_instance(instance_id)
- return {"ok": False, "id": instance_id}
- @app.post("/strategies/{instance_id}/config")
- def strategies_config(instance_id: str, payload: dict):
- """Replace a strategy config and reconcile it into the runtime."""
- ok = update_strategy_config(instance_id, payload["config"])
- if ok:
- return reconcile_instance(instance_id)
- return {"ok": False, "id": instance_id}
- @app.post("/strategies/reconcile")
- def strategies_reconcile():
- """Reconcile every configured strategy with the live runtime."""
- return reconcile_all()
- def list_strategies() -> dict:
- """list_strategies()
- Return the configured strategy instances in a compact, standardized form.
- Each item includes the live runtime summary needed by humans and agents.
- """
- strategies = []
- for record in list_strategy_instances():
- state = record.state or {}
- strategies.append(
- {
- "id": record.id,
- "name": record.name or record.strategy_type,
- "strategy_type": record.strategy_type,
- "mode": record.mode,
- "status": "running" if record.mode != "off" else "stopped",
- "account_id": record.account_id,
- "market_symbol": record.market_symbol,
- "last_price": state.get("last_price"),
- "last_side": state.get("last_side") or state.get("last_action"),
- "open_order_count": state.get("open_order_count", 0),
- }
- )
- return {"strategies": strategies}
- def get_strategy(
- instance_id: str,
- include_config: bool = False,
- include_state: bool = False,
- include_render: bool = False,
- include_debug: bool = False,
- include_report: bool = True,
- ) -> dict:
- """get_strategy(instance_id)
- Return one strategy record with compact live metadata.
- Expanded config, state, render, and debug data are opt-in. Report is included by default.
- """
- record = next((r for r in list_strategy_instances() if r.id == instance_id), None)
- if record is None:
- return {"ok": False, "error": "strategy not found", "id": instance_id}
- runtime = get_running_strategy(instance_id)
- state = dict(record.state or {})
- if runtime is not None:
- state = dict(runtime.instance.state or state)
- state["paused"] = runtime.paused
- state["next_tick_at"] = runtime.next_tick_at
- render = None
- if include_render and runtime is not None:
- try:
- render = runtime.instance.render()
- except Exception as exc:
- render = {"error": str(exc)}
- debug = None
- if include_debug:
- debug = state.get("debug_log") or []
- report = None
- if include_report:
- try:
- instance = runtime.instance if runtime is not None else None
- if instance is None:
- from .strategy_engine import _instantiate
- instance = _instantiate(record)
- instance.state = state
- report = instance.report()
- except Exception as exc:
- report = {"error": str(exc)}
- response = {
- "ok": True,
- "id": record.id,
- "name": record.name or record.strategy_type,
- "strategy_type": record.strategy_type,
- "mode": record.mode,
- "status": "running" if runtime is not None and not runtime.paused and record.mode != "off" else "paused" if runtime is not None and runtime.paused else "stopped",
- "account_id": record.account_id,
- "client_id": record.client_id,
- "market_symbol": record.market_symbol,
- "base_currency": record.base_currency,
- "counter_currency": record.counter_currency,
- }
- if include_config:
- response["config"] = record.config
- if include_state:
- response["state"] = state
- response["last_price"] = state.get("last_price")
- response["last_side"] = state.get("last_side") or state.get("last_action")
- response["open_order_count"] = state.get("open_order_count", 0)
- response["last_error"] = state.get("last_error", "")
- if include_report:
- response["report"] = report
- if include_render:
- response["render"] = render
- if include_debug:
- response["debug_log"] = debug
- return response
- def update_strategy(instance_id: str, config: dict | None = None, state: dict | None = None) -> dict:
- """update_strategy(instance_id, config=None, state=None)
- Update the stored config and/or state for a strategy, then reconcile it.
- Use this for edits that should be persisted without changing lifecycle mode.
- """
- changed = False
- if config is not None:
- changed = update_strategy_config(instance_id, config) or changed
- if state is not None:
- changed = update_strategy_state(instance_id, state) or changed
- if changed:
- return reconcile_instance(instance_id)
- return {"ok": False, "id": instance_id}
- def set_strategy_policy(instance_id: str, policy: dict) -> dict:
- """set_strategy_policy(instance_id, policy)
- Store a high-level Hermes policy on the strategy and persist it.
- Policy is intentionally abstract, for example: risk_posture and priority.
- """
- record = next((r for r in list_strategy_instances() if r.id == instance_id), None)
- if record is None:
- return {"ok": False, "id": instance_id, "error": "strategy not found"}
- if not isinstance(policy, dict):
- return {"ok": False, "id": instance_id, "error": "policy must be an object"}
- config = dict(record.config or {})
- config["policy"] = {
- "risk_posture": policy.get("risk_posture", config.get("policy", {}).get("risk_posture", "normal")),
- "priority": policy.get("priority", config.get("policy", {}).get("priority", "normal")),
- "reason": policy.get("reason", config.get("policy", {}).get("reason", "")),
- "decision_id": policy.get("decision_id", config.get("policy", {}).get("decision_id", "")),
- }
- state = dict(record.state or {})
- state["policy"] = config["policy"]
- ok_config = update_strategy_config(instance_id, config)
- ok_state = update_strategy_state(instance_id, state)
- if ok_config and ok_state:
- return reconcile_instance(instance_id)
- return {"ok": False, "id": instance_id, "error": "failed to persist policy"}
- def control_strategy(instance_id: str, action: str) -> dict:
- """control_strategy(instance_id, action)
- Control a strategy with one action: start, pause, resume, stop, reconcile.
- This is the lifecycle entry point for operators and agents.
- """
- action = str(action or "").strip().lower()
- if action == "pause":
- return pause_strategy(instance_id)
- if action == "resume":
- return resume_strategy(instance_id)
- if action == "reconcile":
- return reconcile_instance(instance_id)
- if action == "start":
- ok = update_strategy_mode(instance_id, "active")
- return reconcile_instance(instance_id) if ok else {"ok": False, "id": instance_id}
- if action == "stop":
- ok = update_strategy_mode(instance_id, "off")
- return reconcile_instance(instance_id) if ok else {"ok": False, "id": instance_id}
- return {"ok": False, "id": instance_id, "error": f"unsupported action: {action}"}
- def get_capabilities() -> dict:
- """get_capabilities()
- Describe the current public MCP surface and the strategy record shape.
- """
- return {
- "name": "trader-mcp",
- "tools": [
- {
- "name": "list_strategies",
- "description": "List configured strategy instances with compact live metadata.",
- },
- {
- "name": "get_strategy",
- "description": "Return one strategy record with optional render and debug data.",
- "params": {"include_render": "bool", "include_debug": "bool"},
- },
- {
- "name": "update_strategy",
- "description": "Update stored strategy config and/or state, then reconcile.",
- },
- {
- "name": "control_strategy",
- "description": "Control lifecycle or reconcile with a single action.",
- "params": {"action": "start|pause|resume|stop|reconcile"},
- },
- {
- "name": "set_strategy_policy",
- "description": "Store a high-level Hermes policy on a strategy.",
- "params": {"policy": "{risk_posture, priority, reason?, decision_id?}"},
- },
- ],
- "strategy_summary_fields": [
- "id",
- "name",
- "strategy_type",
- "mode",
- "status",
- "account_id",
- "client_id",
- "market_symbol",
- "base_currency",
- "counter_currency",
- "config",
- "state",
- "last_price",
- "last_side",
- "open_order_count",
- "last_error",
- ],
- }
- # MCP (SSE)
- # FastMCP mounted at /mcp with SSE at /mcp/sse (when FastMCP is available)
- if FastMCP is not None:
- mcp = FastMCP(
- "trader-mcp",
- transport_security=TransportSecuritySettings(
- enable_dns_rebinding_protection=False,
- ),
- )
- mcp.tool()(list_strategies)
- mcp.tool()(get_strategy)
- mcp.tool()(update_strategy)
- mcp.tool()(control_strategy)
- mcp.tool()(set_strategy_policy)
- mcp.tool()(get_capabilities)
- app.mount("/mcp", mcp.sse_app())
|