remote_sparql_client.py 1.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546
  1. from __future__ import annotations
  2. import os
  3. from typing import Any, Awaitable, Callable
  4. from mcp import ClientSession
  5. from mcp.client.sse import sse_client
  6. CallToolFn = Callable[[str, dict[str, Any]], Awaitable[dict[str, Any]]]
  7. class RemoteSparqlClient:
  8. """Thin MCP->tool bridge for remote sparql_query / sparql_update.
  9. We intentionally keep this injectable so unit tests can bypass real network.
  10. """
  11. def __init__(
  12. self,
  13. *,
  14. sse_url: str | None = None,
  15. timeout_s: float | None = None,
  16. sse_read_timeout_s: float | None = None,
  17. ):
  18. self.sse_url = sse_url or os.getenv("REMOTE_MCP_SSE_URL", "http://192.168.0.249:8501/mcp/sse")
  19. self.timeout_s = float(timeout_s or os.getenv("REMOTE_MCP_TIMEOUT", "10"))
  20. self.sse_read_timeout_s = float(sse_read_timeout_s or os.getenv("REMOTE_MCP_SSE_READ_TIMEOUT", "300"))
  21. async def call_tool(self, tool_name: str, payload: dict[str, Any]) -> dict[str, Any]:
  22. async with sse_client(
  23. self.sse_url,
  24. timeout=self.timeout_s,
  25. sse_read_timeout=self.sse_read_timeout_s,
  26. ) as (read_stream, write_stream):
  27. async with ClientSession(read_stream, write_stream) as session:
  28. await session.initialize()
  29. result = await session.call_tool(tool_name, {"input": payload})
  30. if result.isError:
  31. raise RuntimeError(f"MCP tool {tool_name} failed: {result}")
  32. data = result.structuredContent if result.structuredContent is not None else result.content
  33. # We expect dict-like data for sparql_*.
  34. if isinstance(data, dict):
  35. return data
  36. return {"raw": data}