| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782 |
- import json
- import logging
- import os
- import re
- from datetime import datetime, timezone
- from importlib import import_module
- from pathlib import Path
- from typing import Any, Callable, Dict, List, Optional
- import requests
- from requests.auth import HTTPDigestAuth
- from fastapi import FastAPI, HTTPException, Request
- from pydantic import BaseModel
- LOG_LEVEL = os.getenv("MCP_LOG_LEVEL", "INFO").upper()
- logging.basicConfig(level=getattr(logging, LOG_LEVEL, logging.INFO))
- logger = logging.getLogger("virtuoso_mcp")
- app = FastAPI(title="MCP Server")
- # --- CONFIG ---
- VIRTUOSO_ENDPOINT = os.getenv("VIRTUOSO_ENDPOINT") or os.getenv(
- "VIRTUOSO_SPARQL", "http://localhost:8891/sparql"
- )
- VIRTUOSO_USER = os.getenv("VIRTUOSO_USER")
- VIRTUOSO_PASS = os.getenv("VIRTUOSO_PASS")
- SPARQL_TIMEOUT = float(os.getenv("SPARQL_TIMEOUT", 10.0))
- SPARQL_UPDATE_TIMEOUT = float(os.getenv("SPARQL_UPDATE_TIMEOUT", 15.0))
- SPARQL_DEFAULT_LIMIT = int(os.getenv("SPARQL_DEFAULT_LIMIT", 100))
- SPARQL_MAX_LIMIT = int(os.getenv("SPARQL_MAX_LIMIT", 500))
- GRAPH_URI = os.getenv("GRAPH_URI", "http://example.org/catalog#")
- EXAMPLES_DIR = Path(__file__).resolve().parent / "examples"
- EXAMPLE_GRAPH = os.getenv(
- "EXAMPLE_GRAPH", "http://example.org/catalog#test"
- )
- ALLOW_EXAMPLE_LOAD = os.getenv("MCP_ALLOW_EXAMPLE_LOAD", "false").lower() == "true"
- SESSION = requests.Session()
- LOGS_DIR = Path(__file__).resolve().parent / "logs"
- LOGS_DIR.mkdir(parents=True, exist_ok=True)
- tool_logger = logging.getLogger("virtuoso_mcp.tools")
- tool_handler = logging.FileHandler(LOGS_DIR / "tool_usage.log")
- tool_handler.setFormatter(logging.Formatter("%(asctime)s %(message)s"))
- tool_logger.addHandler(tool_handler)
- tool_logger.setLevel(logging.INFO)
- tool_logger.propagate = False
- PREFIXES = f"""
- PREFIX : <{GRAPH_URI}>
- PREFIX xsd: <http://www.w3.org/2001/XMLSchema#>
- PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
- PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
- PREFIX dc: <http://purl.org/dc/elements/1.1/>
- """.strip()
- # --- MODELS ---
- class SparqlQueryRequest(BaseModel):
- query: str
- class ToolRequest(BaseModel):
- tool: str
- input: Dict[str, Any] = {}
- # --- CORE SPARQL FUNCTION ---
- def _build_auth() -> Optional[HTTPDigestAuth]:
- if VIRTUOSO_USER and VIRTUOSO_PASS:
- return HTTPDigestAuth(VIRTUOSO_USER, VIRTUOSO_PASS)
- return None
- def _with_prefixes(query: str) -> str:
- if re.search(r"^\s*prefix\b", query, re.IGNORECASE):
- return query
- return f"{PREFIXES}\n{query}"
- def run_sparql(query: str) -> Dict[str, Any]:
- """Execute a SPARQL query against Virtuoso and return the JSON payload."""
- logger.debug("Sending SPARQL query: %s", query)
- try:
- response = SESSION.post(
- VIRTUOSO_ENDPOINT,
- data=_with_prefixes(query).encode("utf-8"),
- headers={
- "Accept": "application/sparql-results+json",
- "Content-Type": "application/sparql-query",
- },
- timeout=SPARQL_TIMEOUT,
- auth=_build_auth(),
- )
- if not response.ok:
- logger.warning("SPARQL request failed: %s", response.status_code)
- response.raise_for_status()
- return response.json()
- except Exception as exc: # pragma: no cover - propagate for FastAPI
- logger.warning("SPARQL request failed: %s", exc)
- raise HTTPException(status_code=500, detail=str(exc))
- def run_sparql_update(query: str) -> Dict[str, Any]:
- """Execute a SPARQL UPDATE (INSERT/DELETE) against Virtuoso."""
- logger.debug("Sending SPARQL update: %s", query)
- try:
- response = SESSION.post(
- VIRTUOSO_ENDPOINT,
- data=_with_prefixes(query).encode("utf-8"),
- headers={"Content-Type": "application/sparql-update"},
- timeout=SPARQL_UPDATE_TIMEOUT,
- auth=_build_auth(),
- )
- if not response.ok:
- detail = (response.text or "").strip()
- logger.warning("SPARQL update failed: %s", response.status_code)
- raise HTTPException(
- status_code=500,
- detail=detail or f"SPARQL update failed with {response.status_code}",
- )
- return {"status": "ok"}
- except HTTPException:
- raise
- except Exception as exc: # pragma: no cover - propagate for FastAPI
- logger.warning("SPARQL update failed: %s", exc)
- raise HTTPException(status_code=500, detail=str(exc))
- # --- TOOL HELPERS ---
- def escape_sparql_string(value: str) -> str:
- """Escape a string for SPARQL literal usage."""
- if value is None:
- return ""
- return (
- str(value)
- .replace("\\", "\\\\")
- .replace('"', "\\\"")
- .replace("\n", "\\n")
- .replace("\r", "")
- )
- def sanitize_term(term: str) -> str:
- """Escape quotes inside label searches so we can safely interpolate strings."""
- return escape_sparql_string(term)
- def _extract_limit(query: str) -> Optional[int]:
- match = re.search(r"\blimit\s+(\d+)\b", query, re.IGNORECASE)
- if not match:
- return None
- try:
- return int(match.group(1))
- except ValueError:
- return None
- def _apply_limit(query: str, default_limit: int, max_limit: int) -> str:
- limit = _extract_limit(query)
- if limit is None:
- return f"{query.strip()}\nLIMIT {default_limit}"
- if limit > max_limit:
- return re.sub(
- r"\blimit\s+\d+\b",
- f"LIMIT {max_limit}",
- query,
- flags=re.IGNORECASE,
- )
- return query
- def guard_select_query(query: str) -> str:
- """Enforce that raw queries are read-only and bounded by LIMIT."""
- lowered = query.lower()
- if re.search(r"\b(insert|delete|load|clear|drop|create|move|copy|add)\b", lowered):
- raise HTTPException(status_code=400, detail="SPARQL update operations are not allowed")
- if "select" not in lowered:
- raise HTTPException(status_code=400, detail="Only SELECT queries are allowed")
- return _apply_limit(query, SPARQL_DEFAULT_LIMIT, SPARQL_MAX_LIMIT)
- def ttl_to_sparql_insert(ttl_text: str, graph: Optional[str]) -> str:
- prefix_lines: List[str] = []
- body_lines: List[str] = []
- for raw_line in ttl_text.splitlines():
- line = raw_line.strip()
- if not line:
- continue
- prefix_match = re.match(r"@prefix\s+([\w-]+):\s*<([^>]+)>\s*\.", line)
- if prefix_match:
- prefix_lines.append(
- f"PREFIX {prefix_match.group(1)}: <{prefix_match.group(2)}>"
- )
- continue
- if line.startswith("@base"):
- # Skip @base entries for now; they are rare in our exports.
- continue
- body_lines.append(raw_line)
- if not body_lines:
- raise HTTPException(status_code=400, detail="No RDF triples found in input")
- prefixes = "\n".join(prefix_lines)
- body = "\n".join(body_lines)
- if graph:
- insert_body = f"GRAPH <{graph}> {{\n{body}\n}}"
- else:
- insert_body = body
- return f"{prefixes}\nINSERT DATA {{\n{insert_body}\n}}"
- # --- MCP TOOL IMPLEMENTATIONS ---
- def tool_sparql_query(input_data: Dict[str, Any]) -> Dict[str, Any]:
- query = input_data.get("query")
- if not query:
- raise ValueError("Missing 'query' field")
- guarded = guard_select_query(query)
- return run_sparql(guarded)
- def tool_list_graphs(_input: Dict[str, Any]) -> Dict[str, Any]:
- query = """
- SELECT DISTINCT ?g WHERE {
- GRAPH ?g { ?s ?p ?o }
- }
- LIMIT 50
- """
- return run_sparql(query)
- def tool_search_label(input_data: Dict[str, Any]) -> Dict[str, Any]:
- term = input_data.get("term", "")
- sanitized = sanitize_term(term)
- limit = int(input_data.get("limit", 20))
- limit = min(max(limit, 1), SPARQL_MAX_LIMIT)
- query = f"""
- SELECT ?s ?label WHERE {{
- ?s rdfs:label ?label .
- FILTER(CONTAINS(LCASE(?label), LCASE(\"{sanitized}\")))
- }}
- LIMIT {limit}
- """
- return run_sparql(query)
- def tool_get_entities_by_type(input_data: Dict[str, Any]) -> Dict[str, Any]:
- type_uri = input_data.get("type_uri")
- if not type_uri:
- raise ValueError("Missing 'type_uri' field")
- limit = int(input_data.get("limit", 50))
- limit = min(max(limit, 1), SPARQL_MAX_LIMIT)
- query = f"""
- SELECT ?s WHERE {{
- ?s rdf:type <{type_uri}> .
- }}
- LIMIT {limit}
- """
- return run_sparql(query)
- def tool_get_predicates_for_subject(input_data: Dict[str, Any]) -> Dict[str, Any]:
- subject_uri = input_data.get("subject_uri")
- if not subject_uri:
- raise ValueError("Missing 'subject_uri' field")
- limit = int(input_data.get("limit", 50))
- limit = min(max(limit, 1), SPARQL_MAX_LIMIT)
- query = f"""
- SELECT DISTINCT ?p WHERE {{
- <{subject_uri}> ?p ?o .
- }}
- LIMIT {limit}
- """
- return run_sparql(query)
- def tool_get_labels_for_subject(input_data: Dict[str, Any]) -> Dict[str, Any]:
- subject_uri = input_data.get("subject_uri")
- if not subject_uri:
- raise ValueError("Missing 'subject_uri' field")
- query = f"""
- SELECT ?label WHERE {{
- <{subject_uri}> rdfs:label ?label .
- }}
- LIMIT 20
- """
- return run_sparql(query)
- def tool_traverse_property(input_data: Dict[str, Any]) -> Dict[str, Any]:
- subject_uri = input_data.get("subject_uri")
- property_uri = input_data.get("property_uri")
- if not subject_uri or not property_uri:
- raise ValueError("Missing 'subject_uri' or 'property_uri'")
- direction = input_data.get("direction", "outgoing")
- limit = int(input_data.get("limit", 50))
- limit = min(max(limit, 1), SPARQL_MAX_LIMIT)
- if direction not in {"outgoing", "incoming"}:
- raise ValueError("direction must be 'outgoing' or 'incoming'")
- if direction == "outgoing":
- triple = f"<{subject_uri}> <{property_uri}> ?neighbor ."
- else:
- triple = f"?neighbor <{property_uri}> <{subject_uri}> ."
- query = f"""
- SELECT ?neighbor ?label ?description WHERE {{
- {triple}
- OPTIONAL {{ ?neighbor rdfs:label ?label }}
- OPTIONAL {{ ?neighbor dc:description ?description }}
- }}
- LIMIT {limit}
- """
- return run_sparql(query)
- def tool_list_classes(input_data: Dict[str, Any]) -> Dict[str, Any]:
- """List ontology classes (rdfs:Class and owl:Class) with optional term filtering."""
- term = sanitize_term(input_data.get("term", ""))
- limit = int(input_data.get("limit", 50))
- limit = min(max(limit, 1), SPARQL_MAX_LIMIT)
- term_filter = ""
- if term:
- term_filter = f"""
- FILTER(
- CONTAINS(LCASE(COALESCE(STR(?label), STR(?class))), LCASE(\"{term}\")) ||
- CONTAINS(LCASE(COALESCE(STR(?comment), \"\")), LCASE(\"{term}\"))
- )
- """
- query = f"""
- SELECT DISTINCT ?class ?label ?comment WHERE {{
- {{ ?class rdf:type rdfs:Class . }}
- UNION
- {{ ?class rdf:type <http://www.w3.org/2002/07/owl#Class> . }}
- OPTIONAL {{ ?class rdfs:label ?label }}
- OPTIONAL {{ ?class rdfs:comment ?comment }}
- {term_filter}
- }}
- LIMIT {limit}
- """
- return run_sparql(query)
- def tool_list_properties(input_data: Dict[str, Any]) -> Dict[str, Any]:
- """List ontology properties with optional term/domain/range filtering."""
- term = sanitize_term(input_data.get("term", ""))
- domain_uri = input_data.get("domain_uri")
- range_uri = input_data.get("range_uri")
- limit = int(input_data.get("limit", 100))
- limit = min(max(limit, 1), SPARQL_MAX_LIMIT)
- filters = []
- if term:
- filters.append(
- f"""
- FILTER(
- CONTAINS(LCASE(COALESCE(STR(?label), STR(?property))), LCASE(\"{term}\")) ||
- CONTAINS(LCASE(COALESCE(STR(?comment), \"\")), LCASE(\"{term}\"))
- )
- """
- )
- if domain_uri:
- filters.append(f"FILTER(?domain = <{domain_uri}>)")
- if range_uri:
- filters.append(f"FILTER(?range = <{range_uri}>)")
- query = f"""
- SELECT DISTINCT ?property ?label ?comment ?domain ?range WHERE {{
- {{ ?property rdf:type rdf:Property . }}
- UNION
- {{ ?property rdf:type <http://www.w3.org/2002/07/owl#ObjectProperty> . }}
- UNION
- {{ ?property rdf:type <http://www.w3.org/2002/07/owl#DatatypeProperty> . }}
- OPTIONAL {{ ?property rdfs:label ?label }}
- OPTIONAL {{ ?property rdfs:comment ?comment }}
- OPTIONAL {{ ?property rdfs:domain ?domain }}
- OPTIONAL {{ ?property rdfs:range ?range }}
- {' '.join(filters)}
- }}
- LIMIT {limit}
- """
- return run_sparql(query)
- def tool_describe_class(input_data: Dict[str, Any]) -> Dict[str, Any]:
- """Describe a class and include properties that declare it as rdfs:domain."""
- class_uri = input_data.get("class_uri")
- if not class_uri:
- raise ValueError("Missing 'class_uri' field")
- query = f"""
- SELECT ?label ?comment ?property ?propertyLabel ?propertyComment ?range WHERE {{
- OPTIONAL {{ <{class_uri}> rdfs:label ?label }}
- OPTIONAL {{ <{class_uri}> rdfs:comment ?comment }}
- OPTIONAL {{
- ?property rdfs:domain <{class_uri}> .
- OPTIONAL {{ ?property rdfs:label ?propertyLabel }}
- OPTIONAL {{ ?property rdfs:comment ?propertyComment }}
- OPTIONAL {{ ?property rdfs:range ?range }}
- }}
- }}
- LIMIT {SPARQL_MAX_LIMIT}
- """
- return run_sparql(query)
- def tool_describe_property(input_data: Dict[str, Any]) -> Dict[str, Any]:
- """Describe a property and include usage examples from the graph."""
- property_uri = input_data.get("property_uri")
- if not property_uri:
- raise ValueError("Missing 'property_uri' field")
- usage_limit = int(input_data.get("usage_limit", 10))
- usage_limit = min(max(usage_limit, 1), SPARQL_MAX_LIMIT)
- metadata_query = f"""
- SELECT ?label ?comment ?domain ?range ?type WHERE {{
- OPTIONAL {{ <{property_uri}> rdfs:label ?label }}
- OPTIONAL {{ <{property_uri}> rdfs:comment ?comment }}
- OPTIONAL {{ <{property_uri}> rdfs:domain ?domain }}
- OPTIONAL {{ <{property_uri}> rdfs:range ?range }}
- OPTIONAL {{ <{property_uri}> rdf:type ?type }}
- }}
- LIMIT {SPARQL_MAX_LIMIT}
- """
- usage_query = f"""
- SELECT ?subject ?subjectLabel ?object ?objectLabel WHERE {{
- ?subject <{property_uri}> ?object .
- OPTIONAL {{ ?subject rdfs:label ?subjectLabel }}
- OPTIONAL {{ ?object rdfs:label ?objectLabel }}
- }}
- LIMIT {usage_limit}
- """
- return {
- "metadata": run_sparql(metadata_query),
- "usage": run_sparql(usage_query),
- }
- def tool_describe_subject(input_data: Dict[str, Any]) -> Dict[str, Any]:
- subject_uri = input_data.get("subject_uri")
- if not subject_uri:
- raise ValueError("Missing 'subject_uri' field")
- limit = int(input_data.get("limit", 50))
- limit = min(max(limit, 1), SPARQL_MAX_LIMIT)
- query = f"""
- SELECT ?predicate ?object ?objectLabel WHERE {{
- <{subject_uri}> ?predicate ?object .
- OPTIONAL {{ ?object rdfs:label ?objectLabel }}
- }}
- LIMIT {limit}
- """
- return run_sparql(query)
- def tool_path_traverse(input_data: Dict[str, Any]) -> Dict[str, Any]:
- subject_uri = input_data.get("subject_uri")
- property_path = input_data.get("property_path") or input_data.get("properties")
- if not subject_uri or not property_path:
- raise ValueError("Missing 'subject_uri' or 'property_path'")
- if isinstance(property_path, str):
- property_path = [p.strip() for p in property_path.split(",") if p.strip()]
- if not isinstance(property_path, list) or not property_path:
- raise ValueError("'property_path' must be a non-empty list of property URIs")
- direction = input_data.get("direction", "outgoing")
- limit = int(input_data.get("limit", 50))
- limit = min(max(limit, 1), SPARQL_MAX_LIMIT)
- statements = []
- optional_lines = []
- select_terms = []
- prev_subject = f"<{subject_uri}>"
- for idx, prop_uri in enumerate(property_path, start=1):
- step_var = f"?n{idx}"
- if direction == "outgoing":
- statements.append(f"{prev_subject} <{prop_uri}> {step_var} .")
- else:
- statements.append(f"{step_var} <{prop_uri}> {prev_subject} .")
- select_terms.append(step_var)
- optional_lines.append(f"OPTIONAL {{ {step_var} rdfs:label {step_var}Label }}")
- optional_lines.append(f"OPTIONAL {{ {step_var} dc:description {step_var}Description }}")
- prev_subject = step_var
- select_clause = " ".join(select_terms)
- query = f"""
- SELECT {select_clause} WHERE {{
- {'\n '.join(statements)}
- {'\n '.join(optional_lines)}
- }}
- LIMIT {limit}
- """
- return {
- "property_path": property_path,
- "direction": direction,
- "result": run_sparql(query),
- }
- def tool_property_usage_statistics(input_data: Dict[str, Any]) -> Dict[str, Any]:
- property_uri = input_data.get("property_uri")
- if not property_uri:
- raise ValueError("Missing 'property_uri' field")
- examples_limit = int(input_data.get("examples_limit", 5))
- examples_limit = min(max(examples_limit, 1), SPARQL_MAX_LIMIT)
- count_query = f"""
- SELECT (COUNT(DISTINCT ?subject) AS ?usageCount) WHERE {{
- ?subject <{property_uri}> ?object .
- }}
- LIMIT {SPARQL_MAX_LIMIT}
- """
- usage_query = f"""
- SELECT ?subject ?subjectLabel ?object ?objectLabel WHERE {{
- ?subject <{property_uri}> ?object .
- OPTIONAL {{ ?subject rdfs:label ?subjectLabel }}
- OPTIONAL {{ ?object rdfs:label ?objectLabel }}
- }}
- LIMIT {examples_limit}
- """
- return {
- "count": run_sparql(count_query),
- "examples": run_sparql(usage_query),
- }
- def tool_batch_insert(input_data: Dict[str, Any]) -> Dict[str, Any]:
- ttl_text = input_data.get("ttl")
- triples = input_data.get("triples")
- graph = input_data.get("graph") or GRAPH_URI
- if not ttl_text and not triples:
- raise ValueError("Provide either 'ttl' text or 'triples' list")
- def _format_object(obj_value: Any, obj_type: str, datatype: Optional[str], lang: Optional[str]) -> str:
- if obj_type == "uri":
- return f"<{obj_value}>"
- if obj_type == "literal":
- return f'"{escape_sparql_string(obj_value)}"'
- if obj_type == "typed_literal":
- if not datatype:
- raise ValueError("Missing datatype for typed_literal")
- return f'"{escape_sparql_string(obj_value)}"^^<{datatype}>'
- if obj_type == "lang_literal":
- if not lang:
- raise ValueError("Missing lang for lang_literal")
- return f'"{escape_sparql_string(obj_value)}"@{lang}'
- raise ValueError(f"Unknown object_type: {obj_type}")
- if ttl_text:
- query = ttl_to_sparql_insert(ttl_text, graph)
- else:
- lines = []
- for triple in triples:
- subj = triple.get("subject")
- pred = triple.get("predicate")
- obj_value = triple.get("object")
- if not subj or not pred or obj_value is None:
- raise ValueError("Each triple must provide subject, predicate, and object")
- obj_type = triple.get("object_type", "uri")
- datatype = triple.get("datatype")
- lang = triple.get("lang")
- obj_text = _format_object(obj_value, obj_type, datatype, lang)
- lines.append(f"<{subj}> <{pred}> {obj_text} .")
- ttl_bulk = "\n".join(lines)
- query = ttl_to_sparql_insert(ttl_bulk, graph)
- result = run_sparql_update(query)
- return {**result, "query": query}
- def tool_load_examples(input_data: Dict[str, Any]) -> Dict[str, Any]:
- if not ALLOW_EXAMPLE_LOAD:
- raise HTTPException(status_code=403, detail="Example loading is disabled")
- requested = input_data.get("files") or []
- if isinstance(requested, str):
- requested = [requested]
- graph = input_data.get("graph") or EXAMPLE_GRAPH
- files_to_load = []
- if requested:
- files_to_load = requested
- else:
- files_to_load = sorted(p.name for p in EXAMPLES_DIR.glob("*.ttl"))
- if not files_to_load:
- raise HTTPException(status_code=404, detail="No example files available")
- results = []
- for filename in files_to_load:
- file_path = (EXAMPLES_DIR / filename).resolve()
- if not file_path.exists():
- raise HTTPException(status_code=400, detail=f"Missing example file: {filename}")
- if EXAMPLES_DIR not in file_path.parents and file_path != EXAMPLES_DIR:
- raise HTTPException(status_code=400, detail="Invalid example file path")
- ttl_text = file_path.read_text(encoding="utf-8")
- update_query = ttl_to_sparql_insert(ttl_text, graph)
- run_sparql_update(update_query)
- results.append({"file": filename, "graph": graph})
- return {"loaded": results}
- def tool_insert_triple(input_data: Dict[str, Any]) -> Dict[str, Any]:
- subject = input_data.get("subject")
- predicate = input_data.get("predicate")
- obj = input_data.get("object")
- obj_type = input_data.get("object_type", "uri")
- graph = input_data.get("graph")
- if not subject or not predicate or obj is None:
- raise ValueError("Missing 'subject', 'predicate', or 'object' field")
- if obj_type == "uri":
- obj_value = f"<{obj}>"
- elif obj_type == "literal":
- obj_value = f"\"{escape_sparql_string(obj)}\""
- elif obj_type == "typed_literal":
- datatype = input_data.get("datatype")
- if not datatype:
- raise ValueError("Missing 'datatype' for typed_literal")
- obj_value = f"\"{escape_sparql_string(obj)}\"^^<{datatype}>"
- elif obj_type == "lang_literal":
- lang = input_data.get("lang")
- if not lang:
- raise ValueError("Missing 'lang' for lang_literal")
- obj_value = f"\"{escape_sparql_string(obj)}\"@{lang}"
- else:
- raise ValueError("Unknown object_type")
- ttl = f"<{subject}> <{predicate}> {obj_value} ."
- update_query = ttl_to_sparql_insert(ttl, graph)
- try:
- result = run_sparql_update(update_query)
- except HTTPException as exc:
- detail = f"{exc.detail}\n\nSPARQL:\n{update_query}"
- raise HTTPException(status_code=exc.status_code, detail=detail)
- return {**result, "query": update_query}
- # --- TOOL REGISTRY ---
- TOOLS = {
- "sparql_query": tool_sparql_query,
- "list_graphs": tool_list_graphs,
- "search_label": tool_search_label,
- "get_entities_by_type": tool_get_entities_by_type,
- "get_predicates_for_subject": tool_get_predicates_for_subject,
- "get_labels_for_subject": tool_get_labels_for_subject,
- "traverse_property": tool_traverse_property,
- "list_classes": tool_list_classes,
- "list_properties": tool_list_properties,
- "describe_class": tool_describe_class,
- "describe_property": tool_describe_property,
- "describe_subject": tool_describe_subject,
- "path_traverse": tool_path_traverse,
- "property_usage_statistics": tool_property_usage_statistics,
- "batch_insert": tool_batch_insert,
- "insert_triple": tool_insert_triple,
- "load_examples": tool_load_examples,
- }
- def load_domain_layers(tools: Dict[str, Callable[[Dict[str, Any]], Any]]) -> None:
- raw = os.getenv("DOMAIN_LAYERS", "garden_layer.plugin")
- modules = [item.strip() for item in raw.split(",") if item.strip()]
- if not modules:
- return
- for module_name in modules:
- module = None
- try:
- module = import_module(module_name)
- except ImportError as exc:
- base = module_name.split(".", 1)[0]
- if base != module_name:
- try:
- module = import_module(base)
- logger.info("Falling back to base module '%s' for domain layer '%s'", base, module_name)
- except ImportError:
- logger.warning(
- "Domain layer '%s' could not be imported and base module '%s' is missing: %s",
- module_name,
- base,
- exc,
- )
- else:
- logger.warning("Domain layer '%s' could not be imported: %s", module_name, exc)
- if module is None:
- continue
- register = getattr(module, "register_layer", None)
- if not callable(register):
- logger.warning("Domain layer '%s' does not expose register_layer", module_name)
- continue
- try:
- register(tools)
- logger.info("Loaded domain layer '%s'", module_name)
- except Exception as exc:
- logger.exception("Domain layer '%s' failed to register: %s", module_name, exc)
- load_domain_layers(TOOLS)
- TOOL_DOCS = {
- "sparql_query": "Execute a bounded SELECT query and return the JSON result.",
- "list_graphs": "List up to 50 active graph URIs.",
- "search_label": "Search rdfs:label values that contain a term (case-insensitive).",
- "get_entities_by_type": "List subjects of a given rdf:type.",
- "get_predicates_for_subject": "List distinct predicates used by a subject.",
- "get_labels_for_subject": "Fetch rdfs:label values for a subject.",
- "traverse_property": "Traverse a property (incoming or outgoing) for a subject and return labels/descriptions.",
- "list_classes": "List ontology classes with optional label/comment term filtering.",
- "list_properties": "List ontology properties with optional term/domain/range filters.",
- "describe_class": "Describe a class and list properties that use it as rdfs:domain.",
- "describe_property": "Describe a property (label/comment/domain/range/type) and sample usage.",
- "describe_subject": "Return subject predicates/objects (with labels) to inspect an individual node.",
- "path_traverse": "Follow a property path (list of predicates) from a subject, returning each step's nodes.",
- "property_usage_statistics": "Count how often a property is used and sample subjects/objects.",
- "batch_insert": "Insert multiple triples or TTL at once with a single guarded update.",
- "insert_triple": "Insert a single triple (useful for debugging updates).",
- "load_examples": "Load Turtle fixtures from the `examples/` directory when MCP_ALLOW_EXAMPLE_LOAD=true.",
- }
- # --- MCP ENDPOINT ---
- @app.post("/mcp")
- def handle_mcp(tool_request: ToolRequest, http_request: Request):
- tool_name = tool_request.tool
- input_data = tool_request.input or {}
- client_host = http_request.client.host if http_request.client else "unknown"
- trimmed_input = json.dumps(input_data, ensure_ascii=False, default=str)
- if len(trimmed_input) > 1024:
- trimmed_input = f"{trimmed_input[:1024]}…"
- timestamp = datetime.now(timezone.utc).isoformat()
- tool_logger.info(
- "tool=%s client=%s time=%s input=%s",
- tool_name,
- client_host,
- timestamp,
- trimmed_input,
- )
- if tool_name not in TOOLS:
- raise HTTPException(status_code=400, detail=f"Unknown tool: {tool_name}")
- try:
- result = TOOLS[tool_name](input_data)
- return {
- "status": "ok",
- "tool": tool_name,
- "description": TOOL_DOCS.get(tool_name, ""),
- "result": result,
- }
- except Exception as exc:
- logger.error("Tool %s failed: %s", tool_name, exc)
- raise HTTPException(status_code=500, detail=str(exc))
- # --- HEALTH CHECK ---
- @app.get("/")
- def root():
- return {
- "status": "MCP server running",
- "tools": list(TOOLS.keys()),
- "virtuoso": VIRTUOSO_ENDPOINT,
- "guardrails": {
- "default_limit": SPARQL_DEFAULT_LIMIT,
- "max_limit": SPARQL_MAX_LIMIT,
- "allow_example_load": ALLOW_EXAMPLE_LOAD,
- "turtle_examples": True,
- },
- }
- @app.get("/health")
- def health():
- return root()
|