import logging import os import re from typing import Any, Dict import requests from fastapi import FastAPI, HTTPException from pydantic import BaseModel logging.basicConfig(level=logging.INFO) logger = logging.getLogger("virtuoso_mcp") app = FastAPI(title="MCP Server") # --- CONFIG --- VIRTUOSO_SPARQL = os.getenv("VIRTUOSO_SPARQL", "http://localhost:8891/sparql") SPARQL_TIMEOUT = float(os.getenv("SPARQL_TIMEOUT", 10.0)) SESSION = requests.Session() # --- MODELS --- class SparqlQueryRequest(BaseModel): query: str class ToolRequest(BaseModel): tool: str input: Dict[str, Any] = {} # --- CORE SPARQL FUNCTION --- def run_sparql(query: str) -> Dict[str, Any]: """Execute a SPARQL query against Virtuoso and return the JSON payload.""" logger.debug("Sending SPARQL query: %s", query) try: response = SESSION.post( VIRTUOSO_SPARQL, data={"query": query}, headers={"Accept": "application/sparql-results+json"}, timeout=SPARQL_TIMEOUT, ) response.raise_for_status() return response.json() except Exception as exc: # pragma: no cover - propagate for FastAPI logger.warning("SPARQL request failed: %s", exc) raise HTTPException(status_code=500, detail=str(exc)) # --- TOOL HELPERS --- def sanitize_term(term: str) -> str: """Escape quotes inside label searches so we can safely interpolate strings.""" return re.sub(r"\"", "\\\"", term) # --- MCP TOOL IMPLEMENTATIONS --- def tool_sparql_query(input_data: Dict[str, Any]) -> Dict[str, Any]: query = input_data.get("query") if not query: raise ValueError("Missing 'query' field") return run_sparql(query) def tool_list_graphs(_input: Dict[str, Any]) -> Dict[str, Any]: query = """ SELECT DISTINCT ?g WHERE { GRAPH ?g { ?s ?p ?o } } LIMIT 50 """ return run_sparql(query) def tool_search_label(input_data: Dict[str, Any]) -> Dict[str, Any]: term = input_data.get("term", "") sanitized = sanitize_term(term) query = f""" SELECT ?s ?label WHERE {{ ?s rdfs:label ?label . FILTER(CONTAINS(LCASE(?label), LCASE(\"{sanitized}\"))) }} LIMIT 20 """ return run_sparql(query) # --- TOOL REGISTRY --- TOOLS = { "sparql_query": tool_sparql_query, "list_graphs": tool_list_graphs, "search_label": tool_search_label, } TOOL_DOCS = { "sparql_query": "Execute arbitrary SPARQL and return the JSON result.", "list_graphs": "List up to 50 active graph URIs.", "search_label": "Search rdfs:label values that contain a term (case-insensitive).", } # --- MCP ENDPOINT --- @app.post("/mcp") def handle_mcp(request: ToolRequest): tool_name = request.tool input_data = request.input or {} if tool_name not in TOOLS: raise HTTPException(status_code=400, detail=f"Unknown tool: {tool_name}") try: result = TOOLS[tool_name](input_data) return { "status": "ok", "tool": tool_name, "description": TOOL_DOCS.get(tool_name, ""), "result": result, } except Exception as exc: logger.error("Tool %s failed: %s", tool_name, exc) raise HTTPException(status_code=500, detail=str(exc)) # --- HEALTH CHECK --- @app.get("/") def root(): return { "status": "MCP server running", "tools": list(TOOLS.keys()), "virtuoso": VIRTUOSO_SPARQL, } @app.get("/health") def health(): return root()