| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546 |
- from __future__ import annotations
- import os
- from typing import Any, Awaitable, Callable
- from mcp import ClientSession
- from mcp.client.sse import sse_client
- CallToolFn = Callable[[str, dict[str, Any]], Awaitable[dict[str, Any]]]
- class RemoteSparqlClient:
- """Thin MCP->tool bridge for remote sparql_query / sparql_update.
- We intentionally keep this injectable so unit tests can bypass real network.
- """
- def __init__(
- self,
- *,
- sse_url: str | None = None,
- timeout_s: float | None = None,
- sse_read_timeout_s: float | None = None,
- ):
- self.sse_url = sse_url or os.getenv("REMOTE_MCP_SSE_URL", "http://192.168.0.249:8501/mcp/sse")
- self.timeout_s = float(timeout_s or os.getenv("REMOTE_MCP_TIMEOUT", "10"))
- self.sse_read_timeout_s = float(sse_read_timeout_s or os.getenv("REMOTE_MCP_SSE_READ_TIMEOUT", "300"))
- async def call_tool(self, tool_name: str, payload: dict[str, Any]) -> dict[str, Any]:
- async with sse_client(
- self.sse_url,
- timeout=self.timeout_s,
- sse_read_timeout=self.sse_read_timeout_s,
- ) as (read_stream, write_stream):
- async with ClientSession(read_stream, write_stream) as session:
- await session.initialize()
- result = await session.call_tool(tool_name, {"input": payload})
- if result.isError:
- raise RuntimeError(f"MCP tool {tool_name} failed: {result}")
- data = result.structuredContent if result.structuredContent is not None else result.content
- # We expect dict-like data for sparql_*.
- if isinstance(data, dict):
- return data
- return {"raw": data}
|