|
|
@@ -1,5 +1,7 @@
|
|
|
from __future__ import annotations
|
|
|
|
|
|
+import json
|
|
|
+import os
|
|
|
from dataclasses import dataclass
|
|
|
from typing import Any, Optional
|
|
|
|
|
|
@@ -10,50 +12,35 @@ from fastmcp.client.transports.sse import SSETransport
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
|
class McpSseConfig:
|
|
|
- base_url: str # e.g. http://host:port/mcp/sse
|
|
|
+ base_url: str
|
|
|
|
|
|
|
|
|
class GenericMcpClient:
|
|
|
- def __init__(
|
|
|
- self,
|
|
|
- sse: McpSseConfig,
|
|
|
- *,
|
|
|
- timeout: int | float = 30,
|
|
|
- ) -> None:
|
|
|
+ def __init__(self, sse: McpSseConfig, *, timeout: int | float = 30) -> None:
|
|
|
self._sse = sse
|
|
|
self._timeout = timeout
|
|
|
|
|
|
def _build_client(self) -> Client:
|
|
|
- transport = SSETransport(self._sse.base_url)
|
|
|
- return Client(transport, auto_initialize=True, timeout=self._timeout)
|
|
|
-
|
|
|
- async def _call_tool_async(
|
|
|
- self,
|
|
|
- tool_name: str,
|
|
|
- arguments: Optional[dict[str, Any]] = None,
|
|
|
- ) -> Any:
|
|
|
+ return Client(SSETransport(self._sse.base_url), auto_initialize=True, timeout=self._timeout)
|
|
|
+
|
|
|
+ async def _call_tool_async(self, tool_name: str, arguments: Optional[dict[str, Any]] = None) -> Any:
|
|
|
async with self._build_client() as client:
|
|
|
result = await client.call_tool(tool_name, arguments or {})
|
|
|
-
|
|
|
- # FastMCP may already return parsed structured content.
|
|
|
if hasattr(result, "structuredContent") and result.structuredContent is not None:
|
|
|
return result.structuredContent
|
|
|
|
|
|
payload = getattr(result, "content", None)
|
|
|
if hasattr(payload, "text"):
|
|
|
- import json
|
|
|
-
|
|
|
payload = json.loads(payload.text)
|
|
|
-
|
|
|
if payload is None:
|
|
|
payload = result
|
|
|
-
|
|
|
if isinstance(payload, list) and payload and hasattr(payload[0], "text"):
|
|
|
- import json
|
|
|
-
|
|
|
payload = json.loads(payload[0].text)
|
|
|
-
|
|
|
return payload
|
|
|
|
|
|
def call_tool(self, tool_name: str, arguments: Optional[dict[str, Any]] = None) -> Any:
|
|
|
return anyio.run(self._call_tool_async, tool_name, arguments)
|
|
|
+
|
|
|
+
|
|
|
+def sse_url_from_env(var_name: str, default: str) -> str:
|
|
|
+ return os.getenv(var_name, default)
|