from typing import Any, Dict, List, Optional import requests from .config import GRAPH, MCP_URL class GardenLayer: """Domain helpers that orchestrate MCP calls for your garden project.""" def __init__(self, mcp_url: str = MCP_URL): self.mcp_url = mcp_url def _call(self, tool: str, payload: Dict[str, Any]) -> Any: response = requests.post(self.mcp_url, json={"tool": tool, "input": payload}, timeout=10) response.raise_for_status() body = response.json() if body.get("status") != "ok": raise RuntimeError(body.get("detail", "MCP tool failed")) return body["result"] def traverse(self, subject_uri: str, property_uri: str, direction: str = "outgoing", limit: int = 20) -> Any: return self._call("traverse_property", { "subject_uri": subject_uri, "property_uri": property_uri, "direction": direction, "limit": limit, }) def describe_subject(self, subject_uri: str, limit: int = 10) -> Any: return self._call("describe_subject", { "subject_uri": subject_uri, "limit": limit, }) def path_traverse( self, subject_uri: str, property_path: List[str], direction: str = "outgoing", limit: int = 10, ) -> Any: return self._call("path_traverse", { "subject_uri": subject_uri, "property_path": property_path, "direction": direction, "limit": limit, }) def property_usage_statistics(self, property_uri: str, examples_limit: int = 5) -> Any: return self._call("property_usage_statistics", { "property_uri": property_uri, "examples_limit": examples_limit, }) def batch_insert(self, ttl: str, graph: Optional[str] = None) -> Any: payload = {"ttl": ttl} if graph: payload["graph"] = graph return self._call("batch_insert", payload) def add_seedling( self, plant_uri: str, seed_product_uri: str, cycle_uri: str, label: str, strain_uri: str = "http://world.eu.org/example1#Strain_lucky_experimental_line", graph: str = GRAPH, ) -> None: triples = [ ("http://www.w3.org/1999/02/22-rdf-syntax-ns#type", "http://world.eu.org/cannabis-breeding#IndividualPlant", "uri"), ("http://www.w3.org/2000/01/rdf-schema#label", label, "literal"), ("http://world.eu.org/cannabis-breeding#inCycle", cycle_uri, "uri"), ("http://world.eu.org/cannabis-breeding#grownFromSeedProduct", seed_product_uri, "uri"), ("http://world.eu.org/cannabis-breeding#belongsToStrain", strain_uri, "uri"), ] for predicate, obj, obj_type in triples: self._call("insert_triple", { "subject": plant_uri, "predicate": predicate, "object": obj, "object_type": obj_type, "graph": graph, })