| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849 |
- from __future__ import annotations
- from typing import Any
- import json
- import anyio
- from mcp import ClientSession
- from mcp.client.sse import sse_client
- async def get_regime(base_url: str, symbol: str, timeframe: str = "1h") -> dict[str, Any]:
- async with sse_client(base_url) as (read_stream, write_stream):
- async with ClientSession(read_stream, write_stream) as session:
- await session.initialize()
- result = await session.call_tool("get_regime", {"symbol": symbol, "timeframe": timeframe})
- content = getattr(result, "content", None)
- if not content:
- return {"error": "EMPTY_RESULT", "symbol": symbol, "timeframe": timeframe}
- # FastMCP commonly returns tool results as text content blocks.
- first = content[0]
- text = getattr(first, "text", None)
- if text is None and isinstance(first, dict):
- text = first.get("text")
- if text is None:
- return {"error": "UNPARSEABLE_RESULT", "symbol": symbol, "timeframe": timeframe}
- try:
- return json.loads(text)
- except Exception:
- return {"raw": text, "symbol": symbol, "timeframe": timeframe}
- async def get_price(base_url: str, symbol: str) -> dict[str, Any]:
- async with sse_client(base_url) as (read_stream, write_stream):
- async with ClientSession(read_stream, write_stream) as session:
- await session.initialize()
- result = await session.call_tool("get_price", {"symbol": symbol})
- content = getattr(result, "content", None)
- if not content:
- return {"error": "EMPTY_RESULT", "symbol": symbol}
- first = content[0]
- text = getattr(first, "text", None)
- if text is None and isinstance(first, dict):
- text = first.get("text")
- if text is None:
- return {"error": "UNPARSEABLE_RESULT", "symbol": symbol}
- try:
- return json.loads(text)
- except Exception:
- return {"raw": text, "symbol": symbol}
|