| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138 |
- import logging
- import os
- import re
- from typing import Any, Dict
- import requests
- from fastapi import FastAPI, HTTPException
- from pydantic import BaseModel
- logging.basicConfig(level=logging.INFO)
- logger = logging.getLogger("virtuoso_mcp")
- app = FastAPI(title="MCP Server")
- # --- CONFIG ---
- VIRTUOSO_SPARQL = os.getenv("VIRTUOSO_SPARQL", "http://localhost:8891/sparql")
- SPARQL_TIMEOUT = float(os.getenv("SPARQL_TIMEOUT", 10.0))
- SESSION = requests.Session()
- # --- MODELS ---
- class SparqlQueryRequest(BaseModel):
- query: str
- class ToolRequest(BaseModel):
- tool: str
- input: Dict[str, Any] = {}
- # --- CORE SPARQL FUNCTION ---
- def run_sparql(query: str) -> Dict[str, Any]:
- """Execute a SPARQL query against Virtuoso and return the JSON payload."""
- logger.debug("Sending SPARQL query: %s", query)
- try:
- response = SESSION.post(
- VIRTUOSO_SPARQL,
- data={"query": query},
- headers={"Accept": "application/sparql-results+json"},
- timeout=SPARQL_TIMEOUT,
- )
- response.raise_for_status()
- return response.json()
- except Exception as exc: # pragma: no cover - propagate for FastAPI
- logger.warning("SPARQL request failed: %s", exc)
- raise HTTPException(status_code=500, detail=str(exc))
- # --- TOOL HELPERS ---
- def sanitize_term(term: str) -> str:
- """Escape quotes inside label searches so we can safely interpolate strings."""
- return re.sub(r"\"", "\\\"", term)
- # --- MCP TOOL IMPLEMENTATIONS ---
- def tool_sparql_query(input_data: Dict[str, Any]) -> Dict[str, Any]:
- query = input_data.get("query")
- if not query:
- raise ValueError("Missing 'query' field")
- return run_sparql(query)
- def tool_list_graphs(_input: Dict[str, Any]) -> Dict[str, Any]:
- query = """
- SELECT DISTINCT ?g WHERE {
- GRAPH ?g { ?s ?p ?o }
- }
- LIMIT 50
- """
- return run_sparql(query)
- def tool_search_label(input_data: Dict[str, Any]) -> Dict[str, Any]:
- term = input_data.get("term", "")
- sanitized = sanitize_term(term)
- query = f"""
- SELECT ?s ?label WHERE {{
- ?s rdfs:label ?label .
- FILTER(CONTAINS(LCASE(?label), LCASE(\"{sanitized}\")))
- }}
- LIMIT 20
- """
- return run_sparql(query)
- # --- TOOL REGISTRY ---
- TOOLS = {
- "sparql_query": tool_sparql_query,
- "list_graphs": tool_list_graphs,
- "search_label": tool_search_label,
- }
- TOOL_DOCS = {
- "sparql_query": "Execute arbitrary SPARQL and return the JSON result.",
- "list_graphs": "List up to 50 active graph URIs.",
- "search_label": "Search rdfs:label values that contain a term (case-insensitive).",
- }
- # --- MCP ENDPOINT ---
- @app.post("/mcp")
- def handle_mcp(request: ToolRequest):
- tool_name = request.tool
- input_data = request.input or {}
- if tool_name not in TOOLS:
- raise HTTPException(status_code=400, detail=f"Unknown tool: {tool_name}")
- try:
- result = TOOLS[tool_name](input_data)
- return {
- "status": "ok",
- "tool": tool_name,
- "description": TOOL_DOCS.get(tool_name, ""),
- "result": result,
- }
- except Exception as exc:
- logger.error("Tool %s failed: %s", tool_name, exc)
- raise HTTPException(status_code=500, detail=str(exc))
- # --- HEALTH CHECK ---
- @app.get("/")
- def root():
- return {
- "status": "MCP server running",
- "tools": list(TOOLS.keys()),
- "virtuoso": VIRTUOSO_SPARQL,
- }
- @app.get("/health")
- def health():
- return root()
|