|
@@ -1,4 +1,8 @@
|
|
|
-"""Atlas persistence/read service via virtuoso-mcp (MCP transport)."""
|
|
|
|
|
|
|
+"""Atlas persistence/read service via virtuoso-mcp (MCP transport).
|
|
|
|
|
+
|
|
|
|
|
+We intentionally use the MCP SSE transport ("/mcp/sse") to match the standard across
|
|
|
|
|
+our MCP servers and avoid legacy direct "/rpc" calls.
|
|
|
|
|
+"""
|
|
|
|
|
|
|
|
from __future__ import annotations
|
|
from __future__ import annotations
|
|
|
|
|
|
|
@@ -6,7 +10,9 @@ import json
|
|
|
import logging
|
|
import logging
|
|
|
import os
|
|
import os
|
|
|
from typing import Any, Awaitable, Callable
|
|
from typing import Any, Awaitable, Callable
|
|
|
-from urllib.request import Request, urlopen
|
|
|
|
|
|
|
+
|
|
|
|
|
+from mcp import ClientSession
|
|
|
|
|
+from mcp.client.sse import sse_client
|
|
|
|
|
|
|
|
from app.models import AtlasEntity
|
|
from app.models import AtlasEntity
|
|
|
from app.triple_export import entity_to_turtle
|
|
from app.triple_export import entity_to_turtle
|
|
@@ -14,8 +20,9 @@ from app.triple_export import entity_to_turtle
|
|
|
logger = logging.getLogger(__name__)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
ATLAS_GRAPH_IRI = os.getenv("ATLAS_GRAPH_IRI", "http://world.eu.org/atlas_data#")
|
|
ATLAS_GRAPH_IRI = os.getenv("ATLAS_GRAPH_IRI", "http://world.eu.org/atlas_data#")
|
|
|
-VIRTUOSO_RPC_URL = os.getenv("ATLAS_VIRTUOSO_RPC_URL", "http://192.168.0.249:8501/rpc")
|
|
|
|
|
-VIRTUOSO_RPC_TIMEOUT = float(os.getenv("ATLAS_VIRTUOSO_RPC_TIMEOUT", "20"))
|
|
|
|
|
|
|
+VIRTUOSO_MCP_SSE_URL = os.getenv("ATLAS_VIRTUOSO_MCP_SSE_URL", "http://192.168.0.249:8501/mcp/sse")
|
|
|
|
|
+VIRTUOSO_MCP_TIMEOUT = float(os.getenv("ATLAS_VIRTUOSO_MCP_TIMEOUT", "10"))
|
|
|
|
|
+VIRTUOSO_MCP_SSE_READ_TIMEOUT = float(os.getenv("ATLAS_VIRTUOSO_MCP_SSE_READ_TIMEOUT", str(60 * 5)))
|
|
|
|
|
|
|
|
CallToolFn = Callable[[str, dict[str, Any]], Awaitable[dict[str, Any]]]
|
|
CallToolFn = Callable[[str, dict[str, Any]], Awaitable[dict[str, Any]]]
|
|
|
|
|
|
|
@@ -44,19 +51,20 @@ class AtlasStorageService:
|
|
|
if self._call_tool_override:
|
|
if self._call_tool_override:
|
|
|
return await self._call_tool_override(tool_name, payload)
|
|
return await self._call_tool_override(tool_name, payload)
|
|
|
|
|
|
|
|
- request = Request(
|
|
|
|
|
- VIRTUOSO_RPC_URL,
|
|
|
|
|
- data=json.dumps({"tool": tool_name, "input": payload}).encode("utf-8"),
|
|
|
|
|
- headers={"Content-Type": "application/json"},
|
|
|
|
|
- method="POST",
|
|
|
|
|
- )
|
|
|
|
|
- with urlopen(request, timeout=VIRTUOSO_RPC_TIMEOUT) as response:
|
|
|
|
|
- data = json.loads(response.read().decode("utf-8"))
|
|
|
|
|
- if isinstance(data, dict) and data.get("error"):
|
|
|
|
|
- raise RuntimeError(f"Tool {tool_name} failed: {data['error']}")
|
|
|
|
|
- if isinstance(data, dict) and "result" in data:
|
|
|
|
|
- return data["result"]
|
|
|
|
|
- return data
|
|
|
|
|
|
|
+ try:
|
|
|
|
|
+ async with sse_client(
|
|
|
|
|
+ VIRTUOSO_MCP_SSE_URL,
|
|
|
|
|
+ timeout=VIRTUOSO_MCP_TIMEOUT,
|
|
|
|
|
+ sse_read_timeout=VIRTUOSO_MCP_SSE_READ_TIMEOUT,
|
|
|
|
|
+ ) as (read_stream, write_stream):
|
|
|
|
|
+ async with ClientSession(read_stream, write_stream) as session:
|
|
|
|
|
+ await session.initialize()
|
|
|
|
|
+ result = await session.call_tool(tool_name, {"input": payload})
|
|
|
|
|
+ if result.isError:
|
|
|
|
|
+ raise RuntimeError(f"Tool {tool_name} failed: {result.error}")
|
|
|
|
|
+ return result.structuredContent if result.structuredContent is not None else result.content
|
|
|
|
|
+ except Exception as exc:
|
|
|
|
|
+ raise RuntimeError(f"Virtuoso MCP call failed for {tool_name}: {exc}")
|
|
|
|
|
|
|
|
async def write_entity(self, entity: AtlasEntity) -> dict[str, Any]:
|
|
async def write_entity(self, entity: AtlasEntity) -> dict[str, Any]:
|
|
|
ttl = entity_to_turtle(entity)
|
|
ttl = entity_to_turtle(entity)
|