virtuoso_mcp.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. import logging
  2. import os
  3. import re
  4. from typing import Any, Dict
  5. import requests
  6. from fastapi import FastAPI, HTTPException
  7. from pydantic import BaseModel
  8. logging.basicConfig(level=logging.INFO)
  9. logger = logging.getLogger("virtuoso_mcp")
  10. app = FastAPI(title="MCP Server")
  11. # --- CONFIG ---
  12. VIRTUOSO_SPARQL = os.getenv("VIRTUOSO_SPARQL", "http://localhost:8891/sparql")
  13. SPARQL_TIMEOUT = float(os.getenv("SPARQL_TIMEOUT", 10.0))
  14. SESSION = requests.Session()
  15. # --- MODELS ---
  16. class SparqlQueryRequest(BaseModel):
  17. query: str
  18. class ToolRequest(BaseModel):
  19. tool: str
  20. input: Dict[str, Any] = {}
  21. # --- CORE SPARQL FUNCTION ---
  22. def run_sparql(query: str) -> Dict[str, Any]:
  23. """Execute a SPARQL query against Virtuoso and return the JSON payload."""
  24. logger.debug("Sending SPARQL query: %s", query)
  25. try:
  26. response = SESSION.post(
  27. VIRTUOSO_SPARQL,
  28. data={"query": query},
  29. headers={"Accept": "application/sparql-results+json"},
  30. timeout=SPARQL_TIMEOUT,
  31. )
  32. response.raise_for_status()
  33. return response.json()
  34. except Exception as exc: # pragma: no cover - propagate for FastAPI
  35. logger.warning("SPARQL request failed: %s", exc)
  36. raise HTTPException(status_code=500, detail=str(exc))
  37. # --- TOOL HELPERS ---
  38. def sanitize_term(term: str) -> str:
  39. """Escape quotes inside label searches so we can safely interpolate strings."""
  40. return re.sub(r"\"", "\\\"", term)
  41. # --- MCP TOOL IMPLEMENTATIONS ---
  42. def tool_sparql_query(input_data: Dict[str, Any]) -> Dict[str, Any]:
  43. query = input_data.get("query")
  44. if not query:
  45. raise ValueError("Missing 'query' field")
  46. return run_sparql(query)
  47. def tool_list_graphs(_input: Dict[str, Any]) -> Dict[str, Any]:
  48. query = """
  49. SELECT DISTINCT ?g WHERE {
  50. GRAPH ?g { ?s ?p ?o }
  51. }
  52. LIMIT 50
  53. """
  54. return run_sparql(query)
  55. def tool_search_label(input_data: Dict[str, Any]) -> Dict[str, Any]:
  56. term = input_data.get("term", "")
  57. sanitized = sanitize_term(term)
  58. query = f"""
  59. SELECT ?s ?label WHERE {{
  60. ?s rdfs:label ?label .
  61. FILTER(CONTAINS(LCASE(?label), LCASE(\"{sanitized}\")))
  62. }}
  63. LIMIT 20
  64. """
  65. return run_sparql(query)
  66. # --- TOOL REGISTRY ---
  67. TOOLS = {
  68. "sparql_query": tool_sparql_query,
  69. "list_graphs": tool_list_graphs,
  70. "search_label": tool_search_label,
  71. }
  72. TOOL_DOCS = {
  73. "sparql_query": "Execute arbitrary SPARQL and return the JSON result.",
  74. "list_graphs": "List up to 50 active graph URIs.",
  75. "search_label": "Search rdfs:label values that contain a term (case-insensitive).",
  76. }
  77. # --- MCP ENDPOINT ---
  78. @app.post("/mcp")
  79. def handle_mcp(request: ToolRequest):
  80. tool_name = request.tool
  81. input_data = request.input or {}
  82. if tool_name not in TOOLS:
  83. raise HTTPException(status_code=400, detail=f"Unknown tool: {tool_name}")
  84. try:
  85. result = TOOLS[tool_name](input_data)
  86. return {
  87. "status": "ok",
  88. "tool": tool_name,
  89. "description": TOOL_DOCS.get(tool_name, ""),
  90. "result": result,
  91. }
  92. except Exception as exc:
  93. logger.error("Tool %s failed: %s", tool_name, exc)
  94. raise HTTPException(status_code=500, detail=str(exc))
  95. # --- HEALTH CHECK ---
  96. @app.get("/")
  97. def root():
  98. return {
  99. "status": "MCP server running",
  100. "tools": list(TOOLS.keys()),
  101. "virtuoso": VIRTUOSO_SPARQL,
  102. }
  103. @app.get("/health")
  104. def health():
  105. return root()