from __future__ import annotations from typing import Any from datetime import timedelta import json from mcp import ClientSession from mcp.client.sse import sse_client def _normalize_url(base_url: str) -> str: url = (base_url or "").strip() if not url: return url if not url.endswith("/mcp/sse"): url = url.rstrip("/") + "/mcp/sse" return url async def _call_tool(base_url: str, tool: str, arguments: dict[str, Any]) -> dict[str, Any]: url = _normalize_url(base_url) if not url: return {} async with sse_client(url, timeout=8.0, sse_read_timeout=8.0) as streams: async with ClientSession(*streams, read_timeout_seconds=timedelta(seconds=8)) as session: await session.initialize() result = await session.call_tool(tool, arguments) content = getattr(result, "content", None) or [] if not content: return {} first = content[0] text = getattr(first, "text", None) if text is None and isinstance(first, dict): text = first.get("text") if text is None: return {} try: payload = json.loads(text) return payload if isinstance(payload, dict) else {} except Exception: return {"raw": text} async def list_strategies(base_url: str) -> list[dict[str, Any]]: payload = await _call_tool(base_url, "list_strategies", {}) strategies = payload.get("strategies", payload.get("configured", [])) or [] return [s for s in strategies if isinstance(s, dict)] async def get_strategy(base_url: str, instance_id: str, *, include_state: bool = True, include_report: bool = True) -> dict[str, Any]: payload = await _call_tool( base_url, "get_strategy", { "instance_id": instance_id, "include_state": include_state, "include_report": include_report, }, ) return payload if isinstance(payload, dict) else {} async def list_accounts(base_url: str) -> list[dict[str, Any]]: payload = await _call_tool(base_url, "list_accounts", {}) accounts = payload.get("accounts", []) or [] return [a for a in accounts if isinstance(a, dict)] async def cancel_all_orders(base_url: str, account_id: str, client_id: str | None = None) -> dict[str, Any]: payload = await _call_tool(base_url, "cancel_all_orders", {"account_id": account_id, "client_id": client_id}) return payload if isinstance(payload, dict) else {} async def control_strategy(base_url: str, instance_id: str, action: str) -> dict[str, Any]: payload = await _call_tool(base_url, "control_strategy", {"instance_id": instance_id, "action": action}) return payload if isinstance(payload, dict) else {} async def apply_control_decision(base_url: str, payload: dict[str, Any]) -> dict[str, Any]: response = await _call_tool(base_url, "apply_control_decision", {"payload": payload}) return response if isinstance(response, dict) else {}