| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081 |
- from __future__ import annotations
- from typing import Any
- from datetime import timedelta
- import json
- from mcp import ClientSession
- from mcp.client.sse import sse_client
- def _normalize_url(base_url: str) -> str:
- url = (base_url or "").strip()
- if not url:
- return url
- if not url.endswith("/mcp/sse"):
- url = url.rstrip("/") + "/mcp/sse"
- return url
- async def _call_tool(base_url: str, tool: str, arguments: dict[str, Any]) -> dict[str, Any]:
- url = _normalize_url(base_url)
- if not url:
- return {}
- async with sse_client(url, timeout=8.0, sse_read_timeout=8.0) as streams:
- async with ClientSession(*streams, read_timeout_seconds=timedelta(seconds=8)) as session:
- await session.initialize()
- result = await session.call_tool(tool, arguments)
- content = getattr(result, "content", None) or []
- if not content:
- return {}
- first = content[0]
- text = getattr(first, "text", None)
- if text is None and isinstance(first, dict):
- text = first.get("text")
- if text is None:
- return {}
- try:
- payload = json.loads(text)
- return payload if isinstance(payload, dict) else {}
- except Exception:
- return {"raw": text}
- async def list_strategies(base_url: str) -> list[dict[str, Any]]:
- payload = await _call_tool(base_url, "list_strategies", {})
- strategies = payload.get("strategies", payload.get("configured", [])) or []
- return [s for s in strategies if isinstance(s, dict)]
- async def get_strategy(base_url: str, instance_id: str, *, include_state: bool = True, include_report: bool = True) -> dict[str, Any]:
- payload = await _call_tool(
- base_url,
- "get_strategy",
- {
- "instance_id": instance_id,
- "include_state": include_state,
- "include_report": include_report,
- },
- )
- return payload if isinstance(payload, dict) else {}
- async def list_accounts(base_url: str) -> list[dict[str, Any]]:
- payload = await _call_tool(base_url, "list_accounts", {})
- accounts = payload.get("accounts", []) or []
- return [a for a in accounts if isinstance(a, dict)]
- async def cancel_all_orders(base_url: str, account_id: str, client_id: str | None = None) -> dict[str, Any]:
- payload = await _call_tool(base_url, "cancel_all_orders", {"account_id": account_id, "client_id": client_id})
- return payload if isinstance(payload, dict) else {}
- async def control_strategy(base_url: str, instance_id: str, action: str) -> dict[str, Any]:
- payload = await _call_tool(base_url, "control_strategy", {"instance_id": instance_id, "action": action})
- return payload if isinstance(payload, dict) else {}
- async def apply_control_decision(base_url: str, payload: dict[str, Any]) -> dict[str, Any]:
- response = await _call_tool(base_url, "apply_control_decision", {"payload": payload})
- return response if isinstance(response, dict) else {}
|