import json import logging import os import re from datetime import datetime, timezone from importlib import import_module from pathlib import Path from typing import Any, Callable, Dict, List, Optional import requests from requests.auth import HTTPDigestAuth from fastapi import FastAPI, HTTPException, Request from pydantic import BaseModel LOG_LEVEL = os.getenv("MCP_LOG_LEVEL", "INFO").upper() logging.basicConfig(level=getattr(logging, LOG_LEVEL, logging.INFO)) logger = logging.getLogger("virtuoso_mcp") app = FastAPI(title="MCP Server") # --- CONFIG --- VIRTUOSO_ENDPOINT = os.getenv("VIRTUOSO_ENDPOINT") or os.getenv( "VIRTUOSO_SPARQL", "http://localhost:8891/sparql" ) VIRTUOSO_USER = os.getenv("VIRTUOSO_USER") VIRTUOSO_PASS = os.getenv("VIRTUOSO_PASS") SPARQL_TIMEOUT = float(os.getenv("SPARQL_TIMEOUT", 30.0)) SPARQL_UPDATE_TIMEOUT = float(os.getenv("SPARQL_UPDATE_TIMEOUT", 30.0)) SPARQL_DEFAULT_LIMIT = int(os.getenv("SPARQL_DEFAULT_LIMIT", 100)) SPARQL_MAX_LIMIT = int(os.getenv("SPARQL_MAX_LIMIT", 500)) GRAPH_URI = os.getenv("GRAPH_URI", "http://example.org/catalog#") EXAMPLES_DIR = Path(__file__).resolve().parent / "examples" EXAMPLE_GRAPH = os.getenv( "EXAMPLE_GRAPH", "http://example.org/catalog#test" ) ALLOW_EXAMPLE_LOAD = os.getenv("MCP_ALLOW_EXAMPLE_LOAD", "false").lower() == "true" SESSION = requests.Session() LOGS_DIR = Path(__file__).resolve().parent / "logs" LOGS_DIR.mkdir(parents=True, exist_ok=True) tool_logger = logging.getLogger("virtuoso_mcp.tools") tool_handler = logging.FileHandler(LOGS_DIR / "tool_usage.log") tool_handler.setFormatter(logging.Formatter("%(asctime)s %(message)s")) tool_logger.addHandler(tool_handler) tool_logger.setLevel(logging.INFO) tool_logger.propagate = False PREFIXES = f""" PREFIX : <{GRAPH_URI}> PREFIX xsd: PREFIX rdf: PREFIX rdfs: PREFIX dc: """.strip() # --- MODELS --- class SparqlQueryRequest(BaseModel): query: str class ToolRequest(BaseModel): tool: str input: Dict[str, Any] = {} # --- MCP (minimal JSON-RPC 2.0) models --- class JsonRpcRequest(BaseModel): jsonrpc: str = "2.0" id: Optional[Any] = None method: str params: Dict[str, Any] = {} def mcp_error(id_value: Any, message: str, code: int = -32000) -> Dict[str, Any]: return { "jsonrpc": "2.0", "id": id_value, "error": { "code": code, "message": message, }, } def mcp_result(id_value: Any, result: Dict[str, Any]) -> Dict[str, Any]: return { "jsonrpc": "2.0", "id": id_value, "result": result, } def _mcp_tool_definition(name: str) -> Dict[str, Any]: if name in TOOL_SCHEMAS: schema = TOOL_SCHEMAS[name] # schema is expected to be an MCP-compatible inputSchema object. return { "name": name, "description": TOOL_DOCS.get(name, ""), "inputSchema": schema, } # Incremental compliance step: add explicit input schemas for the most-used tools. # We still keep `additionalProperties: True` so we don't break existing clients. base: Dict[str, Any] = { "name": name, "description": TOOL_DOCS.get(name, ""), "inputSchema": { "type": "object", "additionalProperties": True, "properties": {}, "required": [], }, } if name == "sparql_query": base["inputSchema"]["properties"] = { "query": {"type": "string", "description": "SPARQL SELECT query (bounded + guardrailed)"} } base["inputSchema"]["required"] = ["query"] return base if name == "sparql_update": base["inputSchema"]["properties"] = { "query": {"type": "string", "description": "SPARQL UPDATE query (INSERT/DELETE only, guardrailed)"}, "require_update_keyword": {"type": "boolean", "description": "Reject queries that do not contain INSERT or DELETE", "default": True}, } base["inputSchema"]["required"] = ["query"] return base if name == "search_label": base["inputSchema"]["properties"] = { "term": {"type": "string", "description": "Substring to search in rdfs:label"}, "limit": {"type": "integer", "minimum": 1, "maximum": SPARQL_MAX_LIMIT, "description": "Max results"}, } base["inputSchema"]["required"] = ["term"] return base if name == "get_entities_by_type": base["inputSchema"]["properties"] = { "type_uri": {"type": "string", "description": "RDF type URI"}, "limit": {"type": "integer", "minimum": 1, "maximum": SPARQL_MAX_LIMIT, "description": "Max subjects"}, } base["inputSchema"]["required"] = ["type_uri"] return base if name == "list_graphs": # No inputs. return base if name == "list_classes": base["inputSchema"]["properties"] = { "term": {"type": "string", "description": "Optional substring to match labels/comments"}, "limit": {"type": "integer", "minimum": 1, "maximum": SPARQL_MAX_LIMIT, "description": "Max results"}, } base["inputSchema"]["required"] = [] return base if name == "list_properties": base["inputSchema"]["properties"] = { "term": {"type": "string", "description": "Optional substring to match labels/comments"}, "domain_uri": {"type": "string", "description": "Optional rdfs:domain class URI"}, "range_uri": {"type": "string", "description": "Optional rdfs:range class URI"}, "limit": {"type": "integer", "minimum": 1, "maximum": SPARQL_MAX_LIMIT, "description": "Max results"}, } base["inputSchema"]["required"] = [] return base if name == "describe_class": base["inputSchema"]["properties"] = { "class_uri": {"type": "string", "description": "Class URI to describe"}, } base["inputSchema"]["required"] = ["class_uri"] return base if name == "describe_property": base["inputSchema"]["properties"] = { "property_uri": {"type": "string", "description": "Property URI to describe"}, "usage_limit": {"type": "integer", "minimum": 1, "maximum": SPARQL_MAX_LIMIT, "description": "How many usage examples to include"}, } base["inputSchema"]["required"] = ["property_uri"] return base # ---- Entity navigation batch (B) ---- if name == "get_predicates_for_subject": base["inputSchema"]["properties"] = { "subject_uri": {"type": "string", "description": "Subject URI"}, "limit": {"type": "integer", "minimum": 1, "maximum": SPARQL_MAX_LIMIT, "description": "Max predicates"}, } base["inputSchema"]["required"] = ["subject_uri"] return base if name == "get_labels_for_subject": base["inputSchema"]["properties"] = { "subject_uri": {"type": "string", "description": "Subject URI"}, } base["inputSchema"]["required"] = ["subject_uri"] return base if name == "traverse_property": base["inputSchema"]["properties"] = { "subject_uri": {"type": "string", "description": "Starting subject URI"}, "property_uri": {"type": "string", "description": "Predicate URI to traverse"}, "direction": { "type": "string", "enum": ["outgoing", "incoming"], "description": "Traversal direction", }, "limit": {"type": "integer", "minimum": 1, "maximum": SPARQL_MAX_LIMIT, "description": "Max neighbors"}, } base["inputSchema"]["required"] = ["subject_uri", "property_uri"] return base if name == "describe_subject": base["inputSchema"]["properties"] = { "subject_uri": {"type": "string", "description": "Subject URI"}, "limit": {"type": "integer", "minimum": 1, "maximum": SPARQL_MAX_LIMIT, "description": "Max outgoing predicate/object pairs"}, } base["inputSchema"]["required"] = ["subject_uri"] return base if name == "path_traverse": base["inputSchema"]["properties"] = { "subject_uri": {"type": "string", "description": "Starting subject URI"}, "property_path": { "type": "string", "description": "Comma-separated list of predicate URIs (alternative to 'properties')", }, "properties": { "type": "array", "items": {"type": "string"}, "description": "List of predicate URIs", }, "direction": { "type": "string", "enum": ["outgoing", "incoming"], "description": "Traversal direction", }, "limit": {"type": "integer", "minimum": 1, "maximum": SPARQL_MAX_LIMIT, "description": "Max results"}, } base["inputSchema"]["required"] = ["subject_uri"] return base # ---- Relationship analytics batch (C) ---- if name == "property_usage_statistics": base["inputSchema"]["properties"] = { "property_uri": {"type": "string", "description": "Predicate URI"}, "examples_limit": {"type": "integer", "minimum": 1, "maximum": SPARQL_MAX_LIMIT, "description": "How many usage examples to include"}, } base["inputSchema"]["required"] = ["property_uri"] return base return base # --- CORE SPARQL FUNCTION --- def _build_auth() -> Optional[HTTPDigestAuth]: if VIRTUOSO_USER and VIRTUOSO_PASS: return HTTPDigestAuth(VIRTUOSO_USER, VIRTUOSO_PASS) return None def _with_prefixes(query: str) -> str: if re.search(r"^\s*prefix\b", query, re.IGNORECASE): return query return f"{PREFIXES}\n{query}" def run_sparql(query: str) -> Dict[str, Any]: """Execute a SPARQL query against Virtuoso and return the JSON payload.""" logger.debug("Sending SPARQL query: %s", query) try: response = SESSION.post( VIRTUOSO_ENDPOINT, data=_with_prefixes(query).encode("utf-8"), headers={ "Accept": "application/sparql-results+json", "Content-Type": "application/sparql-query", }, timeout=SPARQL_TIMEOUT, auth=_build_auth(), ) if not response.ok: logger.warning("SPARQL request failed: %s", response.status_code) response.raise_for_status() return response.json() except Exception as exc: # pragma: no cover - propagate for FastAPI logger.warning("SPARQL request failed: %s", exc) raise HTTPException(status_code=500, detail=str(exc)) def run_sparql_update(query: str) -> Dict[str, Any]: """Execute a SPARQL UPDATE (INSERT/DELETE) against Virtuoso.""" logger.debug("Sending SPARQL update: %s", query) try: response = SESSION.post( VIRTUOSO_ENDPOINT, data=_with_prefixes(query).encode("utf-8"), headers={"Content-Type": "application/sparql-update"}, timeout=SPARQL_UPDATE_TIMEOUT, auth=_build_auth(), ) if not response.ok: detail = (response.text or "").strip() logger.warning("SPARQL update failed: %s", response.status_code) raise HTTPException( status_code=500, detail=detail or f"SPARQL update failed with {response.status_code}", ) return {"status": "ok"} except HTTPException: raise except Exception as exc: # pragma: no cover - propagate for FastAPI logger.warning("SPARQL update failed: %s", exc) raise HTTPException(status_code=500, detail=str(exc)) # --- TOOL HELPERS --- def escape_sparql_string(value: str) -> str: """Escape a string for SPARQL literal usage.""" if value is None: return "" return ( str(value) .replace("\\", "\\\\") .replace('"', "\\\"") .replace("\n", "\\n") .replace("\r", "") ) def sanitize_term(term: str) -> str: """Escape quotes inside label searches so we can safely interpolate strings.""" return escape_sparql_string(term) def _extract_limit(query: str) -> Optional[int]: match = re.search(r"\blimit\s+(\d+)\b", query, re.IGNORECASE) if not match: return None try: return int(match.group(1)) except ValueError: return None def _apply_limit(query: str, default_limit: int, max_limit: int) -> str: limit = _extract_limit(query) if limit is None: return f"{query.strip()}\nLIMIT {default_limit}" if limit > max_limit: return re.sub( r"\blimit\s+\d+\b", f"LIMIT {max_limit}", query, flags=re.IGNORECASE, ) return query def guard_select_query(query: str) -> str: """Enforce that raw queries are read-only and bounded by LIMIT.""" lowered = query.lower() if re.search(r"\b(insert|delete|load|clear|drop|create|move|copy|add)\b", lowered): raise HTTPException(status_code=400, detail="SPARQL update operations are not allowed") if "select" not in lowered: raise HTTPException(status_code=400, detail="Only SELECT queries are allowed") return _apply_limit(query, SPARQL_DEFAULT_LIMIT, SPARQL_MAX_LIMIT) def guard_update_query(query: str, require_update_keyword: bool = True) -> str: """Allow only SPARQL UPDATE statements that actually mutate data.""" lowered = query.lower() if re.search(r"\b(select|ask|construct|describe)\b", lowered): raise HTTPException(status_code=400, detail="Only SPARQL UPDATE statements are allowed") if not re.search(r"\b(insert|delete)\b", lowered): if require_update_keyword: raise HTTPException(status_code=400, detail="SPARQL UPDATE must contain INSERT or DELETE") if re.search(r"\b(load|clear|drop|create|move|copy|add)\b", lowered): raise HTTPException(status_code=400, detail="This update tool only allows INSERT or DELETE operations") return query def ttl_to_sparql_insert(ttl_text: str, graph: Optional[str]) -> str: prefix_lines: List[str] = [] body_lines: List[str] = [] for raw_line in ttl_text.splitlines(): line = raw_line.strip() if not line: continue prefix_match = re.match(r"@prefix\s+([\w-]+):\s*<([^>]+)>\s*\.", line) if prefix_match: prefix_lines.append( f"PREFIX {prefix_match.group(1)}: <{prefix_match.group(2)}>" ) continue if line.startswith("@base"): # Skip @base entries for now; they are rare in our exports. continue body_lines.append(raw_line) if not body_lines: raise HTTPException(status_code=400, detail="No RDF triples found in input") prefixes = "\n".join(prefix_lines) body = "\n".join(body_lines) if graph: insert_body = f"GRAPH <{graph}> {{\n{body}\n}}" else: insert_body = body return f"{prefixes}\nINSERT DATA {{\n{insert_body}\n}}" # --- MCP TOOL IMPLEMENTATIONS --- def tool_sparql_query(input_data: Dict[str, Any]) -> Dict[str, Any]: query = input_data.get("query") if not query: raise ValueError("Missing 'query' field") guarded = guard_select_query(query) return run_sparql(guarded) def tool_list_graphs(_input: Dict[str, Any]) -> Dict[str, Any]: query = """ SELECT DISTINCT ?g WHERE { GRAPH ?g { ?s ?p ?o } } LIMIT 50 """ return run_sparql(query) def tool_search_label(input_data: Dict[str, Any]) -> Dict[str, Any]: term = input_data.get("term", "") sanitized = sanitize_term(term) limit = int(input_data.get("limit", 20)) limit = min(max(limit, 1), SPARQL_MAX_LIMIT) query = f""" SELECT ?s ?label WHERE {{ ?s rdfs:label ?label . FILTER(CONTAINS(LCASE(?label), LCASE(\"{sanitized}\"))) }} LIMIT {limit} """ return run_sparql(query) def tool_get_entities_by_type(input_data: Dict[str, Any]) -> Dict[str, Any]: type_uri = input_data.get("type_uri") if not type_uri: raise ValueError("Missing 'type_uri' field") limit = int(input_data.get("limit", 50)) limit = min(max(limit, 1), SPARQL_MAX_LIMIT) query = f""" SELECT ?s WHERE {{ ?s rdf:type <{type_uri}> . }} LIMIT {limit} """ return run_sparql(query) def tool_get_predicates_for_subject(input_data: Dict[str, Any]) -> Dict[str, Any]: subject_uri = input_data.get("subject_uri") if not subject_uri: raise ValueError("Missing 'subject_uri' field") limit = int(input_data.get("limit", 50)) limit = min(max(limit, 1), SPARQL_MAX_LIMIT) query = f""" SELECT DISTINCT ?p WHERE {{ <{subject_uri}> ?p ?o . }} LIMIT {limit} """ return run_sparql(query) def tool_get_labels_for_subject(input_data: Dict[str, Any]) -> Dict[str, Any]: subject_uri = input_data.get("subject_uri") if not subject_uri: raise ValueError("Missing 'subject_uri' field") query = f""" SELECT ?label WHERE {{ <{subject_uri}> rdfs:label ?label . }} LIMIT 20 """ return run_sparql(query) def tool_traverse_property(input_data: Dict[str, Any]) -> Dict[str, Any]: subject_uri = input_data.get("subject_uri") property_uri = input_data.get("property_uri") if not subject_uri or not property_uri: raise ValueError("Missing 'subject_uri' or 'property_uri'") direction = input_data.get("direction", "outgoing") limit = int(input_data.get("limit", 50)) limit = min(max(limit, 1), SPARQL_MAX_LIMIT) if direction not in {"outgoing", "incoming"}: raise ValueError("direction must be 'outgoing' or 'incoming'") if direction == "outgoing": triple = f"<{subject_uri}> <{property_uri}> ?neighbor ." else: triple = f"?neighbor <{property_uri}> <{subject_uri}> ." query = f""" SELECT ?neighbor ?label ?description WHERE {{ {triple} OPTIONAL {{ ?neighbor rdfs:label ?label }} OPTIONAL {{ ?neighbor dc:description ?description }} }} LIMIT {limit} """ return run_sparql(query) def tool_list_classes(input_data: Dict[str, Any]) -> Dict[str, Any]: """List ontology classes (rdfs:Class and owl:Class) with optional term filtering.""" term = sanitize_term(input_data.get("term", "")) limit = int(input_data.get("limit", 50)) limit = min(max(limit, 1), SPARQL_MAX_LIMIT) term_filter = "" if term: term_filter = f""" FILTER( CONTAINS(LCASE(COALESCE(STR(?label), STR(?class))), LCASE(\"{term}\")) || CONTAINS(LCASE(COALESCE(STR(?comment), \"\")), LCASE(\"{term}\")) ) """ query = f""" SELECT DISTINCT ?class ?label ?comment WHERE {{ {{ ?class rdf:type rdfs:Class . }} UNION {{ ?class rdf:type . }} OPTIONAL {{ ?class rdfs:label ?label }} OPTIONAL {{ ?class rdfs:comment ?comment }} {term_filter} }} LIMIT {limit} """ return run_sparql(query) def tool_list_properties(input_data: Dict[str, Any]) -> Dict[str, Any]: """List ontology properties with optional term/domain/range filtering.""" term = sanitize_term(input_data.get("term", "")) domain_uri = input_data.get("domain_uri") range_uri = input_data.get("range_uri") limit = int(input_data.get("limit", 100)) limit = min(max(limit, 1), SPARQL_MAX_LIMIT) filters = [] if term: filters.append( f""" FILTER( CONTAINS(LCASE(COALESCE(STR(?label), STR(?property))), LCASE(\"{term}\")) || CONTAINS(LCASE(COALESCE(STR(?comment), \"\")), LCASE(\"{term}\")) ) """ ) if domain_uri: filters.append(f"FILTER(?domain = <{domain_uri}>)") if range_uri: filters.append(f"FILTER(?range = <{range_uri}>)") query = f""" SELECT DISTINCT ?property ?label ?comment ?domain ?range WHERE {{ {{ ?property rdf:type rdf:Property . }} UNION {{ ?property rdf:type . }} UNION {{ ?property rdf:type . }} OPTIONAL {{ ?property rdfs:label ?label }} OPTIONAL {{ ?property rdfs:comment ?comment }} OPTIONAL {{ ?property rdfs:domain ?domain }} OPTIONAL {{ ?property rdfs:range ?range }} {' '.join(filters)} }} LIMIT {limit} """ return run_sparql(query) def tool_describe_class(input_data: Dict[str, Any]) -> Dict[str, Any]: """Describe a class and include properties that declare it as rdfs:domain.""" class_uri = input_data.get("class_uri") if not class_uri: raise ValueError("Missing 'class_uri' field") query = f""" SELECT ?label ?comment ?property ?propertyLabel ?propertyComment ?range WHERE {{ OPTIONAL {{ <{class_uri}> rdfs:label ?label }} OPTIONAL {{ <{class_uri}> rdfs:comment ?comment }} OPTIONAL {{ ?property rdfs:domain <{class_uri}> . OPTIONAL {{ ?property rdfs:label ?propertyLabel }} OPTIONAL {{ ?property rdfs:comment ?propertyComment }} OPTIONAL {{ ?property rdfs:range ?range }} }} }} LIMIT {SPARQL_MAX_LIMIT} """ return run_sparql(query) def tool_describe_property(input_data: Dict[str, Any]) -> Dict[str, Any]: """Describe a property and include usage examples from the graph.""" property_uri = input_data.get("property_uri") if not property_uri: raise ValueError("Missing 'property_uri' field") usage_limit = int(input_data.get("usage_limit", 10)) usage_limit = min(max(usage_limit, 1), SPARQL_MAX_LIMIT) metadata_query = f""" SELECT ?label ?comment ?domain ?range ?type WHERE {{ OPTIONAL {{ <{property_uri}> rdfs:label ?label }} OPTIONAL {{ <{property_uri}> rdfs:comment ?comment }} OPTIONAL {{ <{property_uri}> rdfs:domain ?domain }} OPTIONAL {{ <{property_uri}> rdfs:range ?range }} OPTIONAL {{ <{property_uri}> rdf:type ?type }} }} LIMIT {SPARQL_MAX_LIMIT} """ usage_query = f""" SELECT ?subject ?subjectLabel ?object ?objectLabel WHERE {{ ?subject <{property_uri}> ?object . OPTIONAL {{ ?subject rdfs:label ?subjectLabel }} OPTIONAL {{ ?object rdfs:label ?objectLabel }} }} LIMIT {usage_limit} """ return { "metadata": run_sparql(metadata_query), "usage": run_sparql(usage_query), } def tool_describe_subject(input_data: Dict[str, Any]) -> Dict[str, Any]: subject_uri = input_data.get("subject_uri") if not subject_uri: raise ValueError("Missing 'subject_uri' field") limit = int(input_data.get("limit", 50)) limit = min(max(limit, 1), SPARQL_MAX_LIMIT) query = f""" SELECT ?predicate ?object ?objectLabel WHERE {{ <{subject_uri}> ?predicate ?object . OPTIONAL {{ ?object rdfs:label ?objectLabel }} }} LIMIT {limit} """ return run_sparql(query) def tool_path_traverse(input_data: Dict[str, Any]) -> Dict[str, Any]: subject_uri = input_data.get("subject_uri") property_path = input_data.get("property_path") or input_data.get("properties") if not subject_uri or not property_path: raise ValueError("Missing 'subject_uri' or 'property_path'") if isinstance(property_path, str): property_path = [p.strip() for p in property_path.split(",") if p.strip()] if not isinstance(property_path, list) or not property_path: raise ValueError("'property_path' must be a non-empty list of property URIs") direction = input_data.get("direction", "outgoing") limit = int(input_data.get("limit", 50)) limit = min(max(limit, 1), SPARQL_MAX_LIMIT) statements = [] optional_lines = [] select_terms = [] prev_subject = f"<{subject_uri}>" for idx, prop_uri in enumerate(property_path, start=1): step_var = f"?n{idx}" if direction == "outgoing": statements.append(f"{prev_subject} <{prop_uri}> {step_var} .") else: statements.append(f"{step_var} <{prop_uri}> {prev_subject} .") select_terms.append(step_var) optional_lines.append(f"OPTIONAL {{ {step_var} rdfs:label {step_var}Label }}") optional_lines.append(f"OPTIONAL {{ {step_var} dc:description {step_var}Description }}") prev_subject = step_var select_clause = " ".join(select_terms) query = f""" SELECT {select_clause} WHERE {{ {'\n '.join(statements)} {'\n '.join(optional_lines)} }} LIMIT {limit} """ return { "property_path": property_path, "direction": direction, "result": run_sparql(query), } def tool_property_usage_statistics(input_data: Dict[str, Any]) -> Dict[str, Any]: property_uri = input_data.get("property_uri") if not property_uri: raise ValueError("Missing 'property_uri' field") examples_limit = int(input_data.get("examples_limit", 5)) examples_limit = min(max(examples_limit, 1), SPARQL_MAX_LIMIT) count_query = f""" SELECT (COUNT(DISTINCT ?subject) AS ?usageCount) WHERE {{ ?subject <{property_uri}> ?object . }} LIMIT {SPARQL_MAX_LIMIT} """ usage_query = f""" SELECT ?subject ?subjectLabel ?object ?objectLabel WHERE {{ ?subject <{property_uri}> ?object . OPTIONAL {{ ?subject rdfs:label ?subjectLabel }} OPTIONAL {{ ?object rdfs:label ?objectLabel }} }} LIMIT {examples_limit} """ return { "count": run_sparql(count_query), "examples": run_sparql(usage_query), } def tool_sparql_update(input_data: Dict[str, Any]) -> Dict[str, Any]: query = input_data.get("query") if not query: raise ValueError("Missing 'query' field") require_update_keyword = input_data.get("require_update_keyword", True) guarded = guard_update_query(query, require_update_keyword=require_update_keyword) result = run_sparql_update(guarded) return {**result, "query": guarded} def tool_batch_insert(input_data: Dict[str, Any]) -> Dict[str, Any]: ttl_text = input_data.get("ttl") triples = input_data.get("triples") graph = input_data.get("graph") or GRAPH_URI if not ttl_text and not triples: raise ValueError("Provide either 'ttl' text or 'triples' list") def _format_object(obj_value: Any, obj_type: str, datatype: Optional[str], lang: Optional[str]) -> str: if obj_type == "uri": return f"<{obj_value}>" if obj_type == "literal": return f'"{escape_sparql_string(obj_value)}"' if obj_type == "typed_literal": if not datatype: raise ValueError("Missing datatype for typed_literal") return f'"{escape_sparql_string(obj_value)}"^^<{datatype}>' if obj_type == "lang_literal": if not lang: raise ValueError("Missing lang for lang_literal") return f'"{escape_sparql_string(obj_value)}"@{lang}' raise ValueError(f"Unknown object_type: {obj_type}") if ttl_text: query = ttl_to_sparql_insert(ttl_text, graph) else: lines = [] for triple in triples: subj = triple.get("subject") pred = triple.get("predicate") obj_value = triple.get("object") if not subj or not pred or obj_value is None: raise ValueError("Each triple must provide subject, predicate, and object") obj_type = triple.get("object_type", "uri") datatype = triple.get("datatype") lang = triple.get("lang") obj_text = _format_object(obj_value, obj_type, datatype, lang) lines.append(f"<{subj}> <{pred}> {obj_text} .") ttl_bulk = "\n".join(lines) query = ttl_to_sparql_insert(ttl_bulk, graph) result = run_sparql_update(query) return {**result, "query": query} def tool_load_examples(input_data: Dict[str, Any]) -> Dict[str, Any]: if not ALLOW_EXAMPLE_LOAD: raise HTTPException(status_code=403, detail="Example loading is disabled") requested = input_data.get("files") or [] if isinstance(requested, str): requested = [requested] graph = input_data.get("graph") or EXAMPLE_GRAPH files_to_load = [] if requested: files_to_load = requested else: files_to_load = sorted(p.name for p in EXAMPLES_DIR.glob("*.ttl")) if not files_to_load: raise HTTPException(status_code=404, detail="No example files available") results = [] for filename in files_to_load: file_path = (EXAMPLES_DIR / filename).resolve() if not file_path.exists(): raise HTTPException(status_code=400, detail=f"Missing example file: {filename}") if EXAMPLES_DIR not in file_path.parents and file_path != EXAMPLES_DIR: raise HTTPException(status_code=400, detail="Invalid example file path") ttl_text = file_path.read_text(encoding="utf-8") update_query = ttl_to_sparql_insert(ttl_text, graph) run_sparql_update(update_query) results.append({"file": filename, "graph": graph}) return {"loaded": results} def tool_insert_triple(input_data: Dict[str, Any]) -> Dict[str, Any]: subject = input_data.get("subject") predicate = input_data.get("predicate") obj = input_data.get("object") obj_type = input_data.get("object_type", "uri") graph = input_data.get("graph") if not subject or not predicate or obj is None: raise ValueError("Missing 'subject', 'predicate', or 'object' field") if obj_type == "uri": obj_value = f"<{obj}>" elif obj_type == "literal": obj_value = f"\"{escape_sparql_string(obj)}\"" elif obj_type == "typed_literal": datatype = input_data.get("datatype") if not datatype: raise ValueError("Missing 'datatype' for typed_literal") obj_value = f"\"{escape_sparql_string(obj)}\"^^<{datatype}>" elif obj_type == "lang_literal": lang = input_data.get("lang") if not lang: raise ValueError("Missing 'lang' for lang_literal") obj_value = f"\"{escape_sparql_string(obj)}\"@{lang}" else: raise ValueError("Unknown object_type") ttl = f"<{subject}> <{predicate}> {obj_value} ." update_query = ttl_to_sparql_insert(ttl, graph) try: result = run_sparql_update(update_query) except HTTPException as exc: detail = f"{exc.detail}\n\nSPARQL:\n{update_query}" raise HTTPException(status_code=exc.status_code, detail=detail) return {**result, "query": update_query} # --- TOOL REGISTRY --- TOOLS = { "sparql_query": tool_sparql_query, "sparql_update": tool_sparql_update, "list_graphs": tool_list_graphs, "search_label": tool_search_label, "get_entities_by_type": tool_get_entities_by_type, "get_predicates_for_subject": tool_get_predicates_for_subject, "get_labels_for_subject": tool_get_labels_for_subject, "traverse_property": tool_traverse_property, "list_classes": tool_list_classes, "list_properties": tool_list_properties, "describe_class": tool_describe_class, "describe_property": tool_describe_property, "describe_subject": tool_describe_subject, "path_traverse": tool_path_traverse, "property_usage_statistics": tool_property_usage_statistics, "batch_insert": tool_batch_insert, "insert_triple": tool_insert_triple, "load_examples": tool_load_examples, } # Tool input schemas registered by domain layers (e.g., garden_layer). TOOL_SCHEMAS: Dict[str, Any] = {} def load_domain_layers( tools: Dict[str, Callable[[Dict[str, Any]], Any]], tool_schemas: Dict[str, Any], ) -> None: raw = os.getenv("DOMAIN_LAYERS", "garden_layer.plugin") modules = [item.strip() for item in raw.split(",") if item.strip()] if not modules: return for module_name in modules: module = None try: module = import_module(module_name) except ImportError as exc: base = module_name.split(".", 1)[0] if base != module_name: try: module = import_module(base) logger.info("Falling back to base module '%s' for domain layer '%s'", base, module_name) except ImportError: logger.warning( "Domain layer '%s' could not be imported and base module '%s' is missing: %s", module_name, base, exc, ) else: logger.warning("Domain layer '%s' could not be imported: %s", module_name, exc) if module is None: continue register = getattr(module, "register_layer", None) if not callable(register): logger.warning("Domain layer '%s' does not expose register_layer", module_name) continue try: # Domain layer may optionally register input schemas. try: if register.__code__.co_argcount >= 2: register(tools, tool_schemas) else: register(tools) except Exception: register(tools) logger.info("Loaded domain layer '%s'", module_name) except Exception as exc: logger.exception("Domain layer '%s' failed to register: %s", module_name, exc) load_domain_layers(TOOLS, TOOL_SCHEMAS) TOOL_DOCS = { "sparql_query": "Execute a bounded SELECT query and return the JSON result.", "sparql_update": "Execute a guarded SPARQL UPDATE query limited to INSERT/DELETE operations.", "list_graphs": "List up to 50 active graph URIs.", "search_label": "Search rdfs:label values that contain a term (case-insensitive).", "get_entities_by_type": "List subjects of a given rdf:type.", "get_predicates_for_subject": "List distinct predicates used by a subject.", "get_labels_for_subject": "Fetch rdfs:label values for a subject.", "traverse_property": "Traverse a property (incoming or outgoing) for a subject and return labels/descriptions.", "list_classes": "List ontology classes with optional label/comment term filtering.", "list_properties": "List ontology properties with optional term/domain/range filters.", "describe_class": "Describe a class and list properties that use it as rdfs:domain.", "describe_property": "Describe a property (label/comment/domain/range/type) and sample usage.", "describe_subject": "Return subject predicates/objects (with labels) to inspect an individual node.", "path_traverse": "Follow a property path (list of predicates) from a subject, returning each step's nodes.", "property_usage_statistics": "Count how often a property is used and sample subjects/objects.", "batch_insert": "Insert multiple triples or TTL at once with a single guarded update.", "insert_triple": "Insert a single triple (useful for debugging updates).", "load_examples": "Load Turtle fixtures from the `examples/` directory when MCP_ALLOW_EXAMPLE_LOAD=true.", } # --- MCP ENDPOINT --- @app.post("/mcp") async def handle_mcp(http_request: Request): """Minimal MCP-ish JSON-RPC 2.0 endpoint on POST /mcp. Backward compatible legacy mode: {"tool": "search_label", "input": {...}} Minimal JSON-RPC mode (first step towards MCP compliance): {"jsonrpc":"2.0","id":1,"method":"initialize","params":{...}} {"jsonrpc":"2.0","id":2,"method":"tools/list","params":{...}} {"jsonrpc":"2.0","id":3,"method":"tools/call","params":{ "tool": "...", "params": {...} }} """ body = None try: body = await http_request.json() except Exception: body = None # ---- Legacy mode ---- if isinstance(body, dict) and "tool" in body: legacy = ToolRequest(**body) tool_name = legacy.tool input_data = legacy.input or {} client_host = http_request.client.host if http_request.client else "unknown" trimmed_input = json.dumps(input_data, ensure_ascii=False, default=str) if len(trimmed_input) > 1024: trimmed_input = f"{trimmed_input[:1024]}…" timestamp = datetime.now(timezone.utc).isoformat() tool_logger.info( "tool=%s client=%s time=%s input=%s", tool_name, client_host, timestamp, trimmed_input, ) if tool_name not in TOOLS: raise HTTPException(status_code=400, detail=f"Unknown tool: {tool_name}") try: result = TOOLS[tool_name](input_data) return { "status": "ok", "tool": tool_name, "description": TOOL_DOCS.get(tool_name, ""), "result": result, } except Exception as exc: logger.error("Tool %s failed: %s", tool_name, exc) raise HTTPException(status_code=500, detail=str(exc)) # ---- JSON-RPC 2.0 mode ---- if not isinstance(body, dict): return mcp_error(None, "Invalid JSON-RPC request", code=-32600) try: rpc_req = JsonRpcRequest(**body) except Exception as exc: # If body is malformed, still surface the id if present. rpc_id = body.get("id") logger.warning("Invalid JSON-RPC request: %s", exc) return mcp_error(rpc_id, "Invalid JSON-RPC request", code=-32600) method = rpc_req.method rpc_id = rpc_req.id params = rpc_req.params or {} if method == "initialize": tools = [_mcp_tool_definition(name) for name in sorted(TOOLS.keys())] return mcp_result( rpc_id, { "protocolVersion": "0.1", "capabilities": { "tools": True, "list": True, "call": True, }, "tools": tools, }, ) if method in {"tools/list", "tools/listTools"}: tools = [_mcp_tool_definition(name) for name in sorted(TOOLS.keys())] return mcp_result(rpc_id, {"tools": tools}) if method in {"tools/call", "tools/callTool"}: # Different clients sometimes wrap the call slightly differently. tool_name = ( params.get("tool") or params.get("name") or params.get("toolName") ) input_data = ( params.get("params") or params.get("input") or params.get("arguments") or {} ) if not tool_name: return mcp_error(rpc_id, "Missing tool name", code=-32602) if tool_name not in TOOLS: return mcp_error(rpc_id, f"Unknown tool: {tool_name}", code=-32601) try: result = TOOLS[tool_name](input_data) return mcp_result(rpc_id, {"result": result}) except HTTPException as exc: return mcp_error(rpc_id, str(exc.detail), code=exc.status_code) except Exception as exc: logger.error("Tool %s failed: %s", tool_name, exc) return mcp_error(rpc_id, str(exc), code=-32000) return mcp_error(rpc_id, f"Method not found: {method}", code=-32601) # --- HEALTH CHECK --- @app.get("/") def root(): return { "status": "MCP server running", "tools": list(TOOLS.keys()), "virtuoso": VIRTUOSO_ENDPOINT, "guardrails": { "default_limit": SPARQL_DEFAULT_LIMIT, "max_limit": SPARQL_MAX_LIMIT, "allow_example_load": ALLOW_EXAMPLE_LOAD, "turtle_examples": True, }, } @app.get("/health") def health(): return root()