| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094 |
- import json
- import logging
- import os
- import re
- from datetime import datetime, timezone
- from importlib import import_module
- from pathlib import Path
- from typing import Any, Callable, Dict, List, Optional
- import requests
- from requests.auth import HTTPDigestAuth
- from fastapi import FastAPI, HTTPException, Request
- from pydantic import BaseModel
- LOG_LEVEL = os.getenv("MCP_LOG_LEVEL", "INFO").upper()
- logging.basicConfig(level=getattr(logging, LOG_LEVEL, logging.INFO))
- logger = logging.getLogger("virtuoso_mcp")
- app = FastAPI(title="MCP Server")
- # --- CONFIG ---
- VIRTUOSO_ENDPOINT = os.getenv("VIRTUOSO_ENDPOINT") or os.getenv(
- "VIRTUOSO_SPARQL", "http://localhost:8891/sparql"
- )
- VIRTUOSO_USER = os.getenv("VIRTUOSO_USER")
- VIRTUOSO_PASS = os.getenv("VIRTUOSO_PASS")
- SPARQL_TIMEOUT = float(os.getenv("SPARQL_TIMEOUT", 30.0))
- SPARQL_UPDATE_TIMEOUT = float(os.getenv("SPARQL_UPDATE_TIMEOUT", 30.0))
- SPARQL_DEFAULT_LIMIT = int(os.getenv("SPARQL_DEFAULT_LIMIT", 100))
- SPARQL_MAX_LIMIT = int(os.getenv("SPARQL_MAX_LIMIT", 500))
- GRAPH_URI = os.getenv("GRAPH_URI", "http://example.org/catalog#")
- EXAMPLES_DIR = Path(__file__).resolve().parent / "examples"
- EXAMPLE_GRAPH = os.getenv(
- "EXAMPLE_GRAPH", "http://example.org/catalog#test"
- )
- ALLOW_EXAMPLE_LOAD = os.getenv("MCP_ALLOW_EXAMPLE_LOAD", "false").lower() == "true"
- SESSION = requests.Session()
- LOGS_DIR = Path(__file__).resolve().parent / "logs"
- LOGS_DIR.mkdir(parents=True, exist_ok=True)
- tool_logger = logging.getLogger("virtuoso_mcp.tools")
- tool_handler = logging.FileHandler(LOGS_DIR / "tool_usage.log")
- tool_handler.setFormatter(logging.Formatter("%(asctime)s %(message)s"))
- tool_logger.addHandler(tool_handler)
- tool_logger.setLevel(logging.INFO)
- tool_logger.propagate = False
- PREFIXES = f"""
- PREFIX : <{GRAPH_URI}>
- PREFIX xsd: <http://www.w3.org/2001/XMLSchema#>
- PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
- PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
- PREFIX dc: <http://purl.org/dc/elements/1.1/>
- """.strip()
- # --- MODELS ---
- class SparqlQueryRequest(BaseModel):
- query: str
- class ToolRequest(BaseModel):
- tool: str
- input: Dict[str, Any] = {}
- # --- MCP (minimal JSON-RPC 2.0) models ---
- class JsonRpcRequest(BaseModel):
- jsonrpc: str = "2.0"
- id: Optional[Any] = None
- method: str
- params: Dict[str, Any] = {}
- def mcp_error(id_value: Any, message: str, code: int = -32000) -> Dict[str, Any]:
- return {
- "jsonrpc": "2.0",
- "id": id_value,
- "error": {
- "code": code,
- "message": message,
- },
- }
- def mcp_result(id_value: Any, result: Dict[str, Any]) -> Dict[str, Any]:
- return {
- "jsonrpc": "2.0",
- "id": id_value,
- "result": result,
- }
- def _mcp_tool_definition(name: str) -> Dict[str, Any]:
- if name in TOOL_SCHEMAS:
- schema = TOOL_SCHEMAS[name]
- # schema is expected to be an MCP-compatible inputSchema object.
- return {
- "name": name,
- "description": TOOL_DOCS.get(name, ""),
- "inputSchema": schema,
- }
- # Incremental compliance step: add explicit input schemas for the most-used tools.
- # We still keep `additionalProperties: True` so we don't break existing clients.
- base: Dict[str, Any] = {
- "name": name,
- "description": TOOL_DOCS.get(name, ""),
- "inputSchema": {
- "type": "object",
- "additionalProperties": True,
- "properties": {},
- "required": [],
- },
- }
- if name == "sparql_query":
- base["inputSchema"]["properties"] = {
- "query": {"type": "string", "description": "SPARQL SELECT query (bounded + guardrailed)"}
- }
- base["inputSchema"]["required"] = ["query"]
- return base
- if name == "sparql_update":
- base["inputSchema"]["properties"] = {
- "query": {"type": "string", "description": "SPARQL UPDATE query (INSERT/DELETE only, guardrailed)"},
- "require_update_keyword": {"type": "boolean", "description": "Reject queries that do not contain INSERT or DELETE", "default": True},
- }
- base["inputSchema"]["required"] = ["query"]
- return base
- if name == "search_label":
- base["inputSchema"]["properties"] = {
- "term": {"type": "string", "description": "Substring to search in rdfs:label"},
- "limit": {"type": "integer", "minimum": 1, "maximum": SPARQL_MAX_LIMIT, "description": "Max results"},
- }
- base["inputSchema"]["required"] = ["term"]
- return base
- if name == "get_entities_by_type":
- base["inputSchema"]["properties"] = {
- "type_uri": {"type": "string", "description": "RDF type URI"},
- "limit": {"type": "integer", "minimum": 1, "maximum": SPARQL_MAX_LIMIT, "description": "Max subjects"},
- }
- base["inputSchema"]["required"] = ["type_uri"]
- return base
- if name == "list_graphs":
- # No inputs.
- return base
- if name == "list_classes":
- base["inputSchema"]["properties"] = {
- "term": {"type": "string", "description": "Optional substring to match labels/comments"},
- "limit": {"type": "integer", "minimum": 1, "maximum": SPARQL_MAX_LIMIT, "description": "Max results"},
- }
- base["inputSchema"]["required"] = []
- return base
- if name == "list_properties":
- base["inputSchema"]["properties"] = {
- "term": {"type": "string", "description": "Optional substring to match labels/comments"},
- "domain_uri": {"type": "string", "description": "Optional rdfs:domain class URI"},
- "range_uri": {"type": "string", "description": "Optional rdfs:range class URI"},
- "limit": {"type": "integer", "minimum": 1, "maximum": SPARQL_MAX_LIMIT, "description": "Max results"},
- }
- base["inputSchema"]["required"] = []
- return base
- if name == "describe_class":
- base["inputSchema"]["properties"] = {
- "class_uri": {"type": "string", "description": "Class URI to describe"},
- }
- base["inputSchema"]["required"] = ["class_uri"]
- return base
- if name == "describe_property":
- base["inputSchema"]["properties"] = {
- "property_uri": {"type": "string", "description": "Property URI to describe"},
- "usage_limit": {"type": "integer", "minimum": 1, "maximum": SPARQL_MAX_LIMIT, "description": "How many usage examples to include"},
- }
- base["inputSchema"]["required"] = ["property_uri"]
- return base
- # ---- Entity navigation batch (B) ----
- if name == "get_predicates_for_subject":
- base["inputSchema"]["properties"] = {
- "subject_uri": {"type": "string", "description": "Subject URI"},
- "limit": {"type": "integer", "minimum": 1, "maximum": SPARQL_MAX_LIMIT, "description": "Max predicates"},
- }
- base["inputSchema"]["required"] = ["subject_uri"]
- return base
- if name == "get_labels_for_subject":
- base["inputSchema"]["properties"] = {
- "subject_uri": {"type": "string", "description": "Subject URI"},
- }
- base["inputSchema"]["required"] = ["subject_uri"]
- return base
- if name == "traverse_property":
- base["inputSchema"]["properties"] = {
- "subject_uri": {"type": "string", "description": "Starting subject URI"},
- "property_uri": {"type": "string", "description": "Predicate URI to traverse"},
- "direction": {
- "type": "string",
- "enum": ["outgoing", "incoming"],
- "description": "Traversal direction",
- },
- "limit": {"type": "integer", "minimum": 1, "maximum": SPARQL_MAX_LIMIT, "description": "Max neighbors"},
- }
- base["inputSchema"]["required"] = ["subject_uri", "property_uri"]
- return base
- if name == "describe_subject":
- base["inputSchema"]["properties"] = {
- "subject_uri": {"type": "string", "description": "Subject URI"},
- "limit": {"type": "integer", "minimum": 1, "maximum": SPARQL_MAX_LIMIT, "description": "Max outgoing predicate/object pairs"},
- }
- base["inputSchema"]["required"] = ["subject_uri"]
- return base
- if name == "path_traverse":
- base["inputSchema"]["properties"] = {
- "subject_uri": {"type": "string", "description": "Starting subject URI"},
- "property_path": {
- "type": "string",
- "description": "Comma-separated list of predicate URIs (alternative to 'properties')",
- },
- "properties": {
- "type": "array",
- "items": {"type": "string"},
- "description": "List of predicate URIs",
- },
- "direction": {
- "type": "string",
- "enum": ["outgoing", "incoming"],
- "description": "Traversal direction",
- },
- "limit": {"type": "integer", "minimum": 1, "maximum": SPARQL_MAX_LIMIT, "description": "Max results"},
- }
- base["inputSchema"]["required"] = ["subject_uri"]
- return base
- # ---- Relationship analytics batch (C) ----
- if name == "property_usage_statistics":
- base["inputSchema"]["properties"] = {
- "property_uri": {"type": "string", "description": "Predicate URI"},
- "examples_limit": {"type": "integer", "minimum": 1, "maximum": SPARQL_MAX_LIMIT, "description": "How many usage examples to include"},
- }
- base["inputSchema"]["required"] = ["property_uri"]
- return base
- return base
- # --- CORE SPARQL FUNCTION ---
- def _build_auth() -> Optional[HTTPDigestAuth]:
- if VIRTUOSO_USER and VIRTUOSO_PASS:
- return HTTPDigestAuth(VIRTUOSO_USER, VIRTUOSO_PASS)
- return None
- def _with_prefixes(query: str) -> str:
- if re.search(r"^\s*prefix\b", query, re.IGNORECASE):
- return query
- return f"{PREFIXES}\n{query}"
- def run_sparql(query: str) -> Dict[str, Any]:
- """Execute a SPARQL query against Virtuoso and return the JSON payload."""
- logger.debug("Sending SPARQL query: %s", query)
- try:
- response = SESSION.post(
- VIRTUOSO_ENDPOINT,
- data=_with_prefixes(query).encode("utf-8"),
- headers={
- "Accept": "application/sparql-results+json",
- "Content-Type": "application/sparql-query",
- },
- timeout=SPARQL_TIMEOUT,
- auth=_build_auth(),
- )
- if not response.ok:
- logger.warning("SPARQL request failed: %s", response.status_code)
- response.raise_for_status()
- return response.json()
- except Exception as exc: # pragma: no cover - propagate for FastAPI
- logger.warning("SPARQL request failed: %s", exc)
- raise HTTPException(status_code=500, detail=str(exc))
- def run_sparql_update(query: str) -> Dict[str, Any]:
- """Execute a SPARQL UPDATE (INSERT/DELETE) against Virtuoso."""
- logger.debug("Sending SPARQL update: %s", query)
- try:
- response = SESSION.post(
- VIRTUOSO_ENDPOINT,
- data=_with_prefixes(query).encode("utf-8"),
- headers={"Content-Type": "application/sparql-update"},
- timeout=SPARQL_UPDATE_TIMEOUT,
- auth=_build_auth(),
- )
- if not response.ok:
- detail = (response.text or "").strip()
- logger.warning("SPARQL update failed: %s", response.status_code)
- raise HTTPException(
- status_code=500,
- detail=detail or f"SPARQL update failed with {response.status_code}",
- )
- return {"status": "ok"}
- except HTTPException:
- raise
- except Exception as exc: # pragma: no cover - propagate for FastAPI
- logger.warning("SPARQL update failed: %s", exc)
- raise HTTPException(status_code=500, detail=str(exc))
- # --- TOOL HELPERS ---
- def escape_sparql_string(value: str) -> str:
- """Escape a string for SPARQL literal usage."""
- if value is None:
- return ""
- return (
- str(value)
- .replace("\\", "\\\\")
- .replace('"', "\\\"")
- .replace("\n", "\\n")
- .replace("\r", "")
- )
- def sanitize_term(term: str) -> str:
- """Escape quotes inside label searches so we can safely interpolate strings."""
- return escape_sparql_string(term)
- def _extract_limit(query: str) -> Optional[int]:
- match = re.search(r"\blimit\s+(\d+)\b", query, re.IGNORECASE)
- if not match:
- return None
- try:
- return int(match.group(1))
- except ValueError:
- return None
- def _apply_limit(query: str, default_limit: int, max_limit: int) -> str:
- limit = _extract_limit(query)
- if limit is None:
- return f"{query.strip()}\nLIMIT {default_limit}"
- if limit > max_limit:
- return re.sub(
- r"\blimit\s+\d+\b",
- f"LIMIT {max_limit}",
- query,
- flags=re.IGNORECASE,
- )
- return query
- def guard_select_query(query: str) -> str:
- """Enforce that raw queries are read-only and bounded by LIMIT."""
- lowered = query.lower()
- if re.search(r"\b(insert|delete|load|clear|drop|create|move|copy|add)\b", lowered):
- raise HTTPException(status_code=400, detail="SPARQL update operations are not allowed")
- if "select" not in lowered:
- raise HTTPException(status_code=400, detail="Only SELECT queries are allowed")
- return _apply_limit(query, SPARQL_DEFAULT_LIMIT, SPARQL_MAX_LIMIT)
- def guard_update_query(query: str, require_update_keyword: bool = True) -> str:
- """Allow only SPARQL UPDATE statements that actually mutate data."""
- lowered = query.lower()
- if re.search(r"\b(select|ask|construct|describe)\b", lowered):
- raise HTTPException(status_code=400, detail="Only SPARQL UPDATE statements are allowed")
- if not re.search(r"\b(insert|delete)\b", lowered):
- if require_update_keyword:
- raise HTTPException(status_code=400, detail="SPARQL UPDATE must contain INSERT or DELETE")
- if re.search(r"\b(load|clear|drop|create|move|copy|add)\b", lowered):
- raise HTTPException(status_code=400, detail="This update tool only allows INSERT or DELETE operations")
- return query
- def ttl_to_sparql_insert(ttl_text: str, graph: Optional[str]) -> str:
- prefix_lines: List[str] = []
- body_lines: List[str] = []
- for raw_line in ttl_text.splitlines():
- line = raw_line.strip()
- if not line:
- continue
- prefix_match = re.match(r"@prefix\s+([\w-]+):\s*<([^>]+)>\s*\.", line)
- if prefix_match:
- prefix_lines.append(
- f"PREFIX {prefix_match.group(1)}: <{prefix_match.group(2)}>"
- )
- continue
- if line.startswith("@base"):
- # Skip @base entries for now; they are rare in our exports.
- continue
- body_lines.append(raw_line)
- if not body_lines:
- raise HTTPException(status_code=400, detail="No RDF triples found in input")
- prefixes = "\n".join(prefix_lines)
- body = "\n".join(body_lines)
- if graph:
- insert_body = f"GRAPH <{graph}> {{\n{body}\n}}"
- else:
- insert_body = body
- return f"{prefixes}\nINSERT DATA {{\n{insert_body}\n}}"
- # --- MCP TOOL IMPLEMENTATIONS ---
- def tool_sparql_query(input_data: Dict[str, Any]) -> Dict[str, Any]:
- query = input_data.get("query")
- if not query:
- raise ValueError("Missing 'query' field")
- guarded = guard_select_query(query)
- return run_sparql(guarded)
- def tool_list_graphs(_input: Dict[str, Any]) -> Dict[str, Any]:
- query = """
- SELECT DISTINCT ?g WHERE {
- GRAPH ?g { ?s ?p ?o }
- }
- LIMIT 50
- """
- return run_sparql(query)
- def tool_search_label(input_data: Dict[str, Any]) -> Dict[str, Any]:
- term = input_data.get("term", "")
- sanitized = sanitize_term(term)
- limit = int(input_data.get("limit", 20))
- limit = min(max(limit, 1), SPARQL_MAX_LIMIT)
- query = f"""
- SELECT ?s ?label WHERE {{
- ?s rdfs:label ?label .
- FILTER(CONTAINS(LCASE(?label), LCASE(\"{sanitized}\")))
- }}
- LIMIT {limit}
- """
- return run_sparql(query)
- def tool_get_entities_by_type(input_data: Dict[str, Any]) -> Dict[str, Any]:
- type_uri = input_data.get("type_uri")
- if not type_uri:
- raise ValueError("Missing 'type_uri' field")
- limit = int(input_data.get("limit", 50))
- limit = min(max(limit, 1), SPARQL_MAX_LIMIT)
- query = f"""
- SELECT ?s WHERE {{
- ?s rdf:type <{type_uri}> .
- }}
- LIMIT {limit}
- """
- return run_sparql(query)
- def tool_get_predicates_for_subject(input_data: Dict[str, Any]) -> Dict[str, Any]:
- subject_uri = input_data.get("subject_uri")
- if not subject_uri:
- raise ValueError("Missing 'subject_uri' field")
- limit = int(input_data.get("limit", 50))
- limit = min(max(limit, 1), SPARQL_MAX_LIMIT)
- query = f"""
- SELECT DISTINCT ?p WHERE {{
- <{subject_uri}> ?p ?o .
- }}
- LIMIT {limit}
- """
- return run_sparql(query)
- def tool_get_labels_for_subject(input_data: Dict[str, Any]) -> Dict[str, Any]:
- subject_uri = input_data.get("subject_uri")
- if not subject_uri:
- raise ValueError("Missing 'subject_uri' field")
- query = f"""
- SELECT ?label WHERE {{
- <{subject_uri}> rdfs:label ?label .
- }}
- LIMIT 20
- """
- return run_sparql(query)
- def tool_traverse_property(input_data: Dict[str, Any]) -> Dict[str, Any]:
- subject_uri = input_data.get("subject_uri")
- property_uri = input_data.get("property_uri")
- if not subject_uri or not property_uri:
- raise ValueError("Missing 'subject_uri' or 'property_uri'")
- direction = input_data.get("direction", "outgoing")
- limit = int(input_data.get("limit", 50))
- limit = min(max(limit, 1), SPARQL_MAX_LIMIT)
- if direction not in {"outgoing", "incoming"}:
- raise ValueError("direction must be 'outgoing' or 'incoming'")
- if direction == "outgoing":
- triple = f"<{subject_uri}> <{property_uri}> ?neighbor ."
- else:
- triple = f"?neighbor <{property_uri}> <{subject_uri}> ."
- query = f"""
- SELECT ?neighbor ?label ?description WHERE {{
- {triple}
- OPTIONAL {{ ?neighbor rdfs:label ?label }}
- OPTIONAL {{ ?neighbor dc:description ?description }}
- }}
- LIMIT {limit}
- """
- return run_sparql(query)
- def tool_list_classes(input_data: Dict[str, Any]) -> Dict[str, Any]:
- """List ontology classes (rdfs:Class and owl:Class) with optional term filtering."""
- term = sanitize_term(input_data.get("term", ""))
- limit = int(input_data.get("limit", 50))
- limit = min(max(limit, 1), SPARQL_MAX_LIMIT)
- term_filter = ""
- if term:
- term_filter = f"""
- FILTER(
- CONTAINS(LCASE(COALESCE(STR(?label), STR(?class))), LCASE(\"{term}\")) ||
- CONTAINS(LCASE(COALESCE(STR(?comment), \"\")), LCASE(\"{term}\"))
- )
- """
- query = f"""
- SELECT DISTINCT ?class ?label ?comment WHERE {{
- {{ ?class rdf:type rdfs:Class . }}
- UNION
- {{ ?class rdf:type <http://www.w3.org/2002/07/owl#Class> . }}
- OPTIONAL {{ ?class rdfs:label ?label }}
- OPTIONAL {{ ?class rdfs:comment ?comment }}
- {term_filter}
- }}
- LIMIT {limit}
- """
- return run_sparql(query)
- def tool_list_properties(input_data: Dict[str, Any]) -> Dict[str, Any]:
- """List ontology properties with optional term/domain/range filtering."""
- term = sanitize_term(input_data.get("term", ""))
- domain_uri = input_data.get("domain_uri")
- range_uri = input_data.get("range_uri")
- limit = int(input_data.get("limit", 100))
- limit = min(max(limit, 1), SPARQL_MAX_LIMIT)
- filters = []
- if term:
- filters.append(
- f"""
- FILTER(
- CONTAINS(LCASE(COALESCE(STR(?label), STR(?property))), LCASE(\"{term}\")) ||
- CONTAINS(LCASE(COALESCE(STR(?comment), \"\")), LCASE(\"{term}\"))
- )
- """
- )
- if domain_uri:
- filters.append(f"FILTER(?domain = <{domain_uri}>)")
- if range_uri:
- filters.append(f"FILTER(?range = <{range_uri}>)")
- query = f"""
- SELECT DISTINCT ?property ?label ?comment ?domain ?range WHERE {{
- {{ ?property rdf:type rdf:Property . }}
- UNION
- {{ ?property rdf:type <http://www.w3.org/2002/07/owl#ObjectProperty> . }}
- UNION
- {{ ?property rdf:type <http://www.w3.org/2002/07/owl#DatatypeProperty> . }}
- OPTIONAL {{ ?property rdfs:label ?label }}
- OPTIONAL {{ ?property rdfs:comment ?comment }}
- OPTIONAL {{ ?property rdfs:domain ?domain }}
- OPTIONAL {{ ?property rdfs:range ?range }}
- {' '.join(filters)}
- }}
- LIMIT {limit}
- """
- return run_sparql(query)
- def tool_describe_class(input_data: Dict[str, Any]) -> Dict[str, Any]:
- """Describe a class and include properties that declare it as rdfs:domain."""
- class_uri = input_data.get("class_uri")
- if not class_uri:
- raise ValueError("Missing 'class_uri' field")
- query = f"""
- SELECT ?label ?comment ?property ?propertyLabel ?propertyComment ?range WHERE {{
- OPTIONAL {{ <{class_uri}> rdfs:label ?label }}
- OPTIONAL {{ <{class_uri}> rdfs:comment ?comment }}
- OPTIONAL {{
- ?property rdfs:domain <{class_uri}> .
- OPTIONAL {{ ?property rdfs:label ?propertyLabel }}
- OPTIONAL {{ ?property rdfs:comment ?propertyComment }}
- OPTIONAL {{ ?property rdfs:range ?range }}
- }}
- }}
- LIMIT {SPARQL_MAX_LIMIT}
- """
- return run_sparql(query)
- def tool_describe_property(input_data: Dict[str, Any]) -> Dict[str, Any]:
- """Describe a property and include usage examples from the graph."""
- property_uri = input_data.get("property_uri")
- if not property_uri:
- raise ValueError("Missing 'property_uri' field")
- usage_limit = int(input_data.get("usage_limit", 10))
- usage_limit = min(max(usage_limit, 1), SPARQL_MAX_LIMIT)
- metadata_query = f"""
- SELECT ?label ?comment ?domain ?range ?type WHERE {{
- OPTIONAL {{ <{property_uri}> rdfs:label ?label }}
- OPTIONAL {{ <{property_uri}> rdfs:comment ?comment }}
- OPTIONAL {{ <{property_uri}> rdfs:domain ?domain }}
- OPTIONAL {{ <{property_uri}> rdfs:range ?range }}
- OPTIONAL {{ <{property_uri}> rdf:type ?type }}
- }}
- LIMIT {SPARQL_MAX_LIMIT}
- """
- usage_query = f"""
- SELECT ?subject ?subjectLabel ?object ?objectLabel WHERE {{
- ?subject <{property_uri}> ?object .
- OPTIONAL {{ ?subject rdfs:label ?subjectLabel }}
- OPTIONAL {{ ?object rdfs:label ?objectLabel }}
- }}
- LIMIT {usage_limit}
- """
- return {
- "metadata": run_sparql(metadata_query),
- "usage": run_sparql(usage_query),
- }
- def tool_describe_subject(input_data: Dict[str, Any]) -> Dict[str, Any]:
- subject_uri = input_data.get("subject_uri")
- if not subject_uri:
- raise ValueError("Missing 'subject_uri' field")
- limit = int(input_data.get("limit", 50))
- limit = min(max(limit, 1), SPARQL_MAX_LIMIT)
- query = f"""
- SELECT ?predicate ?object ?objectLabel WHERE {{
- <{subject_uri}> ?predicate ?object .
- OPTIONAL {{ ?object rdfs:label ?objectLabel }}
- }}
- LIMIT {limit}
- """
- return run_sparql(query)
- def tool_path_traverse(input_data: Dict[str, Any]) -> Dict[str, Any]:
- subject_uri = input_data.get("subject_uri")
- property_path = input_data.get("property_path") or input_data.get("properties")
- if not subject_uri or not property_path:
- raise ValueError("Missing 'subject_uri' or 'property_path'")
- if isinstance(property_path, str):
- property_path = [p.strip() for p in property_path.split(",") if p.strip()]
- if not isinstance(property_path, list) or not property_path:
- raise ValueError("'property_path' must be a non-empty list of property URIs")
- direction = input_data.get("direction", "outgoing")
- limit = int(input_data.get("limit", 50))
- limit = min(max(limit, 1), SPARQL_MAX_LIMIT)
- statements = []
- optional_lines = []
- select_terms = []
- prev_subject = f"<{subject_uri}>"
- for idx, prop_uri in enumerate(property_path, start=1):
- step_var = f"?n{idx}"
- if direction == "outgoing":
- statements.append(f"{prev_subject} <{prop_uri}> {step_var} .")
- else:
- statements.append(f"{step_var} <{prop_uri}> {prev_subject} .")
- select_terms.append(step_var)
- optional_lines.append(f"OPTIONAL {{ {step_var} rdfs:label {step_var}Label }}")
- optional_lines.append(f"OPTIONAL {{ {step_var} dc:description {step_var}Description }}")
- prev_subject = step_var
- select_clause = " ".join(select_terms)
- query = f"""
- SELECT {select_clause} WHERE {{
- {'\n '.join(statements)}
- {'\n '.join(optional_lines)}
- }}
- LIMIT {limit}
- """
- return {
- "property_path": property_path,
- "direction": direction,
- "result": run_sparql(query),
- }
- def tool_property_usage_statistics(input_data: Dict[str, Any]) -> Dict[str, Any]:
- property_uri = input_data.get("property_uri")
- if not property_uri:
- raise ValueError("Missing 'property_uri' field")
- examples_limit = int(input_data.get("examples_limit", 5))
- examples_limit = min(max(examples_limit, 1), SPARQL_MAX_LIMIT)
- count_query = f"""
- SELECT (COUNT(DISTINCT ?subject) AS ?usageCount) WHERE {{
- ?subject <{property_uri}> ?object .
- }}
- LIMIT {SPARQL_MAX_LIMIT}
- """
- usage_query = f"""
- SELECT ?subject ?subjectLabel ?object ?objectLabel WHERE {{
- ?subject <{property_uri}> ?object .
- OPTIONAL {{ ?subject rdfs:label ?subjectLabel }}
- OPTIONAL {{ ?object rdfs:label ?objectLabel }}
- }}
- LIMIT {examples_limit}
- """
- return {
- "count": run_sparql(count_query),
- "examples": run_sparql(usage_query),
- }
- def tool_sparql_update(input_data: Dict[str, Any]) -> Dict[str, Any]:
- query = input_data.get("query")
- if not query:
- raise ValueError("Missing 'query' field")
- require_update_keyword = input_data.get("require_update_keyword", True)
- guarded = guard_update_query(query, require_update_keyword=require_update_keyword)
- result = run_sparql_update(guarded)
- return {**result, "query": guarded}
- def tool_batch_insert(input_data: Dict[str, Any]) -> Dict[str, Any]:
- ttl_text = input_data.get("ttl")
- triples = input_data.get("triples")
- graph = input_data.get("graph") or GRAPH_URI
- if not ttl_text and not triples:
- raise ValueError("Provide either 'ttl' text or 'triples' list")
- def _format_object(obj_value: Any, obj_type: str, datatype: Optional[str], lang: Optional[str]) -> str:
- if obj_type == "uri":
- return f"<{obj_value}>"
- if obj_type == "literal":
- return f'"{escape_sparql_string(obj_value)}"'
- if obj_type == "typed_literal":
- if not datatype:
- raise ValueError("Missing datatype for typed_literal")
- return f'"{escape_sparql_string(obj_value)}"^^<{datatype}>'
- if obj_type == "lang_literal":
- if not lang:
- raise ValueError("Missing lang for lang_literal")
- return f'"{escape_sparql_string(obj_value)}"@{lang}'
- raise ValueError(f"Unknown object_type: {obj_type}")
- if ttl_text:
- query = ttl_to_sparql_insert(ttl_text, graph)
- else:
- lines = []
- for triple in triples:
- subj = triple.get("subject")
- pred = triple.get("predicate")
- obj_value = triple.get("object")
- if not subj or not pred or obj_value is None:
- raise ValueError("Each triple must provide subject, predicate, and object")
- obj_type = triple.get("object_type", "uri")
- datatype = triple.get("datatype")
- lang = triple.get("lang")
- obj_text = _format_object(obj_value, obj_type, datatype, lang)
- lines.append(f"<{subj}> <{pred}> {obj_text} .")
- ttl_bulk = "\n".join(lines)
- query = ttl_to_sparql_insert(ttl_bulk, graph)
- result = run_sparql_update(query)
- return {**result, "query": query}
- def tool_load_examples(input_data: Dict[str, Any]) -> Dict[str, Any]:
- if not ALLOW_EXAMPLE_LOAD:
- raise HTTPException(status_code=403, detail="Example loading is disabled")
- requested = input_data.get("files") or []
- if isinstance(requested, str):
- requested = [requested]
- graph = input_data.get("graph") or EXAMPLE_GRAPH
- files_to_load = []
- if requested:
- files_to_load = requested
- else:
- files_to_load = sorted(p.name for p in EXAMPLES_DIR.glob("*.ttl"))
- if not files_to_load:
- raise HTTPException(status_code=404, detail="No example files available")
- results = []
- for filename in files_to_load:
- file_path = (EXAMPLES_DIR / filename).resolve()
- if not file_path.exists():
- raise HTTPException(status_code=400, detail=f"Missing example file: {filename}")
- if EXAMPLES_DIR not in file_path.parents and file_path != EXAMPLES_DIR:
- raise HTTPException(status_code=400, detail="Invalid example file path")
- ttl_text = file_path.read_text(encoding="utf-8")
- update_query = ttl_to_sparql_insert(ttl_text, graph)
- run_sparql_update(update_query)
- results.append({"file": filename, "graph": graph})
- return {"loaded": results}
- def tool_insert_triple(input_data: Dict[str, Any]) -> Dict[str, Any]:
- subject = input_data.get("subject")
- predicate = input_data.get("predicate")
- obj = input_data.get("object")
- obj_type = input_data.get("object_type", "uri")
- graph = input_data.get("graph")
- if not subject or not predicate or obj is None:
- raise ValueError("Missing 'subject', 'predicate', or 'object' field")
- if obj_type == "uri":
- obj_value = f"<{obj}>"
- elif obj_type == "literal":
- obj_value = f"\"{escape_sparql_string(obj)}\""
- elif obj_type == "typed_literal":
- datatype = input_data.get("datatype")
- if not datatype:
- raise ValueError("Missing 'datatype' for typed_literal")
- obj_value = f"\"{escape_sparql_string(obj)}\"^^<{datatype}>"
- elif obj_type == "lang_literal":
- lang = input_data.get("lang")
- if not lang:
- raise ValueError("Missing 'lang' for lang_literal")
- obj_value = f"\"{escape_sparql_string(obj)}\"@{lang}"
- else:
- raise ValueError("Unknown object_type")
- ttl = f"<{subject}> <{predicate}> {obj_value} ."
- update_query = ttl_to_sparql_insert(ttl, graph)
- try:
- result = run_sparql_update(update_query)
- except HTTPException as exc:
- detail = f"{exc.detail}\n\nSPARQL:\n{update_query}"
- raise HTTPException(status_code=exc.status_code, detail=detail)
- return {**result, "query": update_query}
- # --- TOOL REGISTRY ---
- TOOLS = {
- "sparql_query": tool_sparql_query,
- "sparql_update": tool_sparql_update,
- "list_graphs": tool_list_graphs,
- "search_label": tool_search_label,
- "get_entities_by_type": tool_get_entities_by_type,
- "get_predicates_for_subject": tool_get_predicates_for_subject,
- "get_labels_for_subject": tool_get_labels_for_subject,
- "traverse_property": tool_traverse_property,
- "list_classes": tool_list_classes,
- "list_properties": tool_list_properties,
- "describe_class": tool_describe_class,
- "describe_property": tool_describe_property,
- "describe_subject": tool_describe_subject,
- "path_traverse": tool_path_traverse,
- "property_usage_statistics": tool_property_usage_statistics,
- "batch_insert": tool_batch_insert,
- "insert_triple": tool_insert_triple,
- "load_examples": tool_load_examples,
- }
- # Tool input schemas registered by domain layers (e.g., garden_layer).
- TOOL_SCHEMAS: Dict[str, Any] = {}
- def load_domain_layers(
- tools: Dict[str, Callable[[Dict[str, Any]], Any]],
- tool_schemas: Dict[str, Any],
- ) -> None:
- raw = os.getenv("DOMAIN_LAYERS", "garden_layer.plugin")
- modules = [item.strip() for item in raw.split(",") if item.strip()]
- if not modules:
- return
- for module_name in modules:
- module = None
- try:
- module = import_module(module_name)
- except ImportError as exc:
- base = module_name.split(".", 1)[0]
- if base != module_name:
- try:
- module = import_module(base)
- logger.info("Falling back to base module '%s' for domain layer '%s'", base, module_name)
- except ImportError:
- logger.warning(
- "Domain layer '%s' could not be imported and base module '%s' is missing: %s",
- module_name,
- base,
- exc,
- )
- else:
- logger.warning("Domain layer '%s' could not be imported: %s", module_name, exc)
- if module is None:
- continue
- register = getattr(module, "register_layer", None)
- if not callable(register):
- logger.warning("Domain layer '%s' does not expose register_layer", module_name)
- continue
- try:
- # Domain layer may optionally register input schemas.
- try:
- if register.__code__.co_argcount >= 2:
- register(tools, tool_schemas)
- else:
- register(tools)
- except Exception:
- register(tools)
- logger.info("Loaded domain layer '%s'", module_name)
- except Exception as exc:
- logger.exception("Domain layer '%s' failed to register: %s", module_name, exc)
- load_domain_layers(TOOLS, TOOL_SCHEMAS)
- TOOL_DOCS = {
- "sparql_query": "Execute a bounded SELECT query and return the JSON result.",
- "sparql_update": "Execute a guarded SPARQL UPDATE query limited to INSERT/DELETE operations.",
- "list_graphs": "List up to 50 active graph URIs.",
- "search_label": "Search rdfs:label values that contain a term (case-insensitive).",
- "get_entities_by_type": "List subjects of a given rdf:type.",
- "get_predicates_for_subject": "List distinct predicates used by a subject.",
- "get_labels_for_subject": "Fetch rdfs:label values for a subject.",
- "traverse_property": "Traverse a property (incoming or outgoing) for a subject and return labels/descriptions.",
- "list_classes": "List ontology classes with optional label/comment term filtering.",
- "list_properties": "List ontology properties with optional term/domain/range filters.",
- "describe_class": "Describe a class and list properties that use it as rdfs:domain.",
- "describe_property": "Describe a property (label/comment/domain/range/type) and sample usage.",
- "describe_subject": "Return subject predicates/objects (with labels) to inspect an individual node.",
- "path_traverse": "Follow a property path (list of predicates) from a subject, returning each step's nodes.",
- "property_usage_statistics": "Count how often a property is used and sample subjects/objects.",
- "batch_insert": "Insert multiple triples or TTL at once with a single guarded update.",
- "insert_triple": "Insert a single triple (useful for debugging updates).",
- "load_examples": "Load Turtle fixtures from the `examples/` directory when MCP_ALLOW_EXAMPLE_LOAD=true.",
- }
- # --- MCP ENDPOINT ---
- @app.post("/mcp")
- async def handle_mcp(http_request: Request):
- """Minimal MCP-ish JSON-RPC 2.0 endpoint on POST /mcp.
- Backward compatible legacy mode:
- {"tool": "search_label", "input": {...}}
- Minimal JSON-RPC mode (first step towards MCP compliance):
- {"jsonrpc":"2.0","id":1,"method":"initialize","params":{...}}
- {"jsonrpc":"2.0","id":2,"method":"tools/list","params":{...}}
- {"jsonrpc":"2.0","id":3,"method":"tools/call","params":{ "tool": "...", "params": {...} }}
- """
- body = None
- try:
- body = await http_request.json()
- except Exception:
- body = None
- # ---- Legacy mode ----
- if isinstance(body, dict) and "tool" in body:
- legacy = ToolRequest(**body)
- tool_name = legacy.tool
- input_data = legacy.input or {}
- client_host = http_request.client.host if http_request.client else "unknown"
- trimmed_input = json.dumps(input_data, ensure_ascii=False, default=str)
- if len(trimmed_input) > 1024:
- trimmed_input = f"{trimmed_input[:1024]}…"
- timestamp = datetime.now(timezone.utc).isoformat()
- tool_logger.info(
- "tool=%s client=%s time=%s input=%s",
- tool_name,
- client_host,
- timestamp,
- trimmed_input,
- )
- if tool_name not in TOOLS:
- raise HTTPException(status_code=400, detail=f"Unknown tool: {tool_name}")
- try:
- result = TOOLS[tool_name](input_data)
- return {
- "status": "ok",
- "tool": tool_name,
- "description": TOOL_DOCS.get(tool_name, ""),
- "result": result,
- }
- except Exception as exc:
- logger.error("Tool %s failed: %s", tool_name, exc)
- raise HTTPException(status_code=500, detail=str(exc))
- # ---- JSON-RPC 2.0 mode ----
- if not isinstance(body, dict):
- return mcp_error(None, "Invalid JSON-RPC request", code=-32600)
- try:
- rpc_req = JsonRpcRequest(**body)
- except Exception as exc:
- # If body is malformed, still surface the id if present.
- rpc_id = body.get("id")
- logger.warning("Invalid JSON-RPC request: %s", exc)
- return mcp_error(rpc_id, "Invalid JSON-RPC request", code=-32600)
- method = rpc_req.method
- rpc_id = rpc_req.id
- params = rpc_req.params or {}
- if method == "initialize":
- tools = [_mcp_tool_definition(name) for name in sorted(TOOLS.keys())]
- return mcp_result(
- rpc_id,
- {
- "protocolVersion": "0.1",
- "capabilities": {
- "tools": True,
- "list": True,
- "call": True,
- },
- "tools": tools,
- },
- )
- if method in {"tools/list", "tools/listTools"}:
- tools = [_mcp_tool_definition(name) for name in sorted(TOOLS.keys())]
- return mcp_result(rpc_id, {"tools": tools})
- if method in {"tools/call", "tools/callTool"}:
- # Different clients sometimes wrap the call slightly differently.
- tool_name = (
- params.get("tool")
- or params.get("name")
- or params.get("toolName")
- )
- input_data = (
- params.get("params")
- or params.get("input")
- or params.get("arguments")
- or {}
- )
- if not tool_name:
- return mcp_error(rpc_id, "Missing tool name", code=-32602)
- if tool_name not in TOOLS:
- return mcp_error(rpc_id, f"Unknown tool: {tool_name}", code=-32601)
- try:
- result = TOOLS[tool_name](input_data)
- return mcp_result(rpc_id, {"result": result})
- except HTTPException as exc:
- return mcp_error(rpc_id, str(exc.detail), code=exc.status_code)
- except Exception as exc:
- logger.error("Tool %s failed: %s", tool_name, exc)
- return mcp_error(rpc_id, str(exc), code=-32000)
- return mcp_error(rpc_id, f"Method not found: {method}", code=-32601)
- # --- HEALTH CHECK ---
- @app.get("/")
- def root():
- return {
- "status": "MCP server running",
- "tools": list(TOOLS.keys()),
- "virtuoso": VIRTUOSO_ENDPOINT,
- "guardrails": {
- "default_limit": SPARQL_DEFAULT_LIMIT,
- "max_limit": SPARQL_MAX_LIMIT,
- "allow_example_load": ALLOW_EXAMPLE_LOAD,
- "turtle_examples": True,
- },
- }
- @app.get("/health")
- def health():
- return root()
|