from __future__ import annotations import os from typing import Any, Awaitable, Callable from mcp import ClientSession from mcp.client.sse import sse_client CallToolFn = Callable[[str, dict[str, Any]], Awaitable[dict[str, Any]]] class RemoteSparqlClient: """Thin MCP->tool bridge for remote sparql_query / sparql_update. We intentionally keep this injectable so unit tests can bypass real network. """ def __init__( self, *, sse_url: str | None = None, timeout_s: float | None = None, sse_read_timeout_s: float | None = None, ): self.sse_url = sse_url or os.getenv("REMOTE_MCP_SSE_URL", "http://192.168.0.249:8501/mcp/sse") self.timeout_s = float(timeout_s or os.getenv("REMOTE_MCP_TIMEOUT", "10")) self.sse_read_timeout_s = float(sse_read_timeout_s or os.getenv("REMOTE_MCP_SSE_READ_TIMEOUT", "300")) async def call_tool(self, tool_name: str, payload: dict[str, Any]) -> dict[str, Any]: async with sse_client( self.sse_url, timeout=self.timeout_s, sse_read_timeout=self.sse_read_timeout_s, ) as (read_stream, write_stream): async with ClientSession(read_stream, write_stream) as session: await session.initialize() result = await session.call_tool(tool_name, {"input": payload}) if result.isError: raise RuntimeError(f"MCP tool {tool_name} failed: {result.error}") data = result.structuredContent if result.structuredContent is not None else result.content # We expect dict-like data for sparql_*. if isinstance(data, dict): return data return {"raw": data}