from __future__ import annotations from datetime import datetime, timezone from src.trader_mcp.strategy_sdk import Strategy from src.trader_mcp.logging_utils import log_event class Strategy(Strategy): LABEL = "Exposure Protector" STRATEGY_PROFILE = { "expects": { "trend": "mixed", "volatility": "moderate", "event_risk": "low", "liquidity": "normal", }, "avoids": { "volatility": "chaotic", "event_risk": "high", "liquidity": "thin", }, "risk_profile": "defensive", "capabilities": ["inventory_rebalancing", "exposure_trim", "companion_defense"], "role": "defensive", "inventory_behavior": "rebalancing", "requires_rebalance_before_start": False, "requires_rebalance_before_stop": False, "safe_when_unbalanced": True, "can_run_with": ["grid_trader", "trend_follower"], } TICK_MINUTES = 0.2 CONFIG_SCHEMA = { "trail_distance_pct": {"type": "float", "default": 0.03, "min": 0.0, "max": 1.0}, "rebalance_target_ratio": {"type": "float", "default": 0.5, "min": 0.0, "max": 1.0}, "rebalance_step_ratio": {"type": "float", "default": 0.15, "min": 0.0, "max": 1.0}, "min_order_size": {"type": "float", "default": 0.0, "min": 0.0}, "max_order_size": {"type": "float", "default": 0.0, "min": 0.0}, "order_spacing_ticks": {"type": "int", "default": 1, "min": 0, "max": 1000}, "cooldown_ticks": {"type": "int", "default": 2, "min": 0, "max": 1000}, "min_rebalance_seconds": {"type": "int", "default": 180, "min": 0, "max": 86400}, "min_price_move_pct": {"type": "float", "default": 0.005, "min": 0.0, "max": 1.0}, "balance_tolerance": {"type": "float", "default": 0.05, "min": 0.0, "max": 1.0}, "fee_rate": {"type": "float", "default": 0.0025, "min": 0.0, "max": 0.05}, "debug_orders": {"type": "bool", "default": True}, } STATE_SCHEMA = { "last_price": {"type": "float", "default": 0.0}, "last_action": {"type": "string", "default": "idle"}, "last_error": {"type": "string", "default": ""}, "debug_log": {"type": "list", "default": []}, "regimes": {"type": "dict", "default": {}}, "regimes_updated_at": {"type": "string", "default": ""}, "base_available": {"type": "float", "default": 0.0}, "counter_available": {"type": "float", "default": 0.0}, "trailing_anchor": {"type": "float", "default": 0.0}, "cooldown_remaining": {"type": "int", "default": 0}, "last_order_at": {"type": "float", "default": 0.0}, "last_order_price": {"type": "float", "default": 0.0}, } def init(self): return { "last_price": 0.0, "last_action": "idle", "last_error": "", "debug_log": ["init exposure protector"], "regimes": {}, "regimes_updated_at": "", "base_available": 0.0, "counter_available": 0.0, "trailing_anchor": 0.0, "cooldown_remaining": 0, "last_order_at": 0.0, "last_order_price": 0.0, } def _log(self, message: str) -> None: state = getattr(self, "state", {}) or {} log = list(state.get("debug_log") or []) log.append(message) state["debug_log"] = log[-12:] self.state = state log_event("stoploss", message) def _base_symbol(self) -> str: return (self.context.base_currency or self.context.market_symbol or "XRP").split("/")[0].upper() def _market_symbol(self) -> str: return self.context.market_symbol or f"{self._base_symbol().lower()}usd" def apply_policy(self): policy = super().apply_policy() risk = str(policy.get("risk_posture") or "normal").lower() priority = str(policy.get("priority") or "normal").lower() trail_map = {"cautious": 0.02, "normal": 0.03, "assertive": 0.04} step_map = {"cautious": 0.08, "normal": 0.15, "assertive": 0.25} wait_map = {"cautious": 420, "normal": 180, "assertive": 90} move_map = {"cautious": 0.01, "normal": 0.005, "assertive": 0.003} if priority in {"low", "background"}: trail = trail_map.get("cautious", 0.02) step = step_map.get("cautious", 0.08) wait = wait_map.get("cautious", 600) move = move_map.get("cautious", 0.02) elif priority in {"high", "urgent"}: trail = trail_map.get("assertive", 0.04) step = step_map.get("assertive", 0.25) wait = wait_map.get("assertive", 120) move = move_map.get("assertive", 0.005) else: trail = trail_map.get(risk, 0.03) step = step_map.get(risk, 0.15) wait = wait_map.get(risk, 300) move = move_map.get(risk, 0.01) self.config["trail_distance_pct"] = trail self.config["rebalance_step_ratio"] = step self.config["min_rebalance_seconds"] = wait self.config["min_price_move_pct"] = move self.state["policy_derived"] = { "trail_distance_pct": trail, "rebalance_step_ratio": step, "min_rebalance_seconds": wait, "min_price_move_pct": move, } return policy def _live_fee_rate(self) -> float: try: payload = self.context.get_fee_rates(self._market_symbol()) return float(payload.get("maker") or 0.0) except Exception as exc: self._log(f"fee lookup failed: {exc}") return float(self.config.get("fee_rate", 0.0025) or 0.0) def _price(self) -> float: payload = self.context.get_price(self._base_symbol()) return float(payload.get("price") or 0.0) def _refresh_regimes(self) -> None: try: self.state["regimes"] = self.context.get_strategy_snapshot().get("fit", {}) if hasattr(self.context, "get_strategy_snapshot") else {} except Exception: self.state["regimes"] = {} self.state["regimes_updated_at"] = datetime.now(timezone.utc).isoformat() def _refresh_balance_snapshot(self) -> None: try: info = self.context.get_account_info() except Exception as exc: self._log(f"balance refresh failed: {exc}") return balances = info.get("balances") if isinstance(info, dict) else [] if not isinstance(balances, list): return base = self._base_symbol() quote = str(self.context.counter_currency or "USD").upper() for balance in balances: if not isinstance(balance, dict): continue asset = str(balance.get("asset_code") or "").upper() try: available = float(balance.get("available") if balance.get("available") is not None else balance.get("total") or 0.0) except Exception: continue if asset == base: self.state["base_available"] = available if asset == quote: self.state["counter_available"] = available def _account_value_ratio(self, price: float) -> float: base_value = float(self.state.get("base_available") or 0.0) * price counter_value = float(self.state.get("counter_available") or 0.0) total = base_value + counter_value if total <= 0: return 0.5 return base_value / total def _supervision(self) -> dict: price = float(self.state.get("last_price") or 0.0) ratio = self._account_value_ratio(price if price > 0 else 1.0) target = float(self.config.get("rebalance_target_ratio", 0.5) or 0.5) tolerance = float(self.config.get("balance_tolerance", 0.05) or 0.05) drift = abs(ratio - target) last_error = str(self.state.get("last_error") or "") if drift >= 0.35: pressure = "critical" elif drift > tolerance: pressure = "elevated" else: pressure = "contained" return { "health": "degraded" if last_error else "healthy", "degraded": bool(last_error), "inventory_pressure": pressure, "capacity_available": drift > tolerance, "switch_readiness": "handoff_complete" if drift <= tolerance else "stay_attached", "last_reason": last_error or f"base_ratio={ratio:.3f}, target={target:.3f}, drift={drift:.3f}", "desired_companion": None, } def _desired_side(self, price: float) -> str: # If base dominates, sell some into strength, otherwise buy some back. ratio = self._account_value_ratio(price) target = float(self.config.get("rebalance_target_ratio", 0.5) or 0.5) return "sell" if ratio > target else "buy" def _suggest_amount(self, side: str, price: float) -> float: fee_rate = self._live_fee_rate() step_ratio = float(self.config.get("rebalance_step_ratio", 0.15) or 0.0) target = float(self.config.get("rebalance_target_ratio", 0.5) or 0.5) min_order = float(self.config.get("min_order_size", 0.0) or 0.0) max_order = float(self.config.get("max_order_size", 0.0) or 0.0) balance_tolerance = float(self.config.get("balance_tolerance", 0.05) or 0.0) base_value = float(self.state.get("base_available") or 0.0) * price counter_value = float(self.state.get("counter_available") or 0.0) total = base_value + counter_value if total <= 0 or price <= 0: return 0.0 current = base_value / total drift = abs(current - target) if drift <= balance_tolerance: return 0.0 notional = total * min(drift, step_ratio) if side == "sell": amount = notional / (price * (1 + fee_rate)) amount = min(amount, float(self.state.get("base_available") or 0.0)) else: amount = notional / (price * (1 + fee_rate)) amount = min(amount, float(self.state.get("counter_available") or 0.0) / price if price > 0 else 0.0) if min_order > 0: amount = max(amount, min_order) if max_order > 0: amount = min(amount, max_order) return max(amount, 0.0) def on_tick(self, tick): self.state["last_error"] = "" self._log(f"tick alive price={self.state.get('last_price') or 0.0}") self._refresh_balance_snapshot() self._refresh_regimes() price = self._price() self.state["last_price"] = price if int(self.state.get("cooldown_remaining") or 0) > 0: self.state["cooldown_remaining"] = int(self.state.get("cooldown_remaining") or 0) - 1 self.state["last_action"] = "cooldown" return {"action": "cooldown", "price": price} now = datetime.now(timezone.utc).timestamp() last_order_at = float(self.state.get("last_order_at") or 0.0) min_rebalance_seconds = int(self.config.get("min_rebalance_seconds", 300) or 0) if last_order_at and min_rebalance_seconds > 0 and (now - last_order_at) < min_rebalance_seconds: self.state["last_action"] = "hold" return {"action": "hold", "price": price, "reason": "rebalance cooldown"} last_order_price = float(self.state.get("last_order_price") or 0.0) min_price_move_pct = float(self.config.get("min_price_move_pct", 0.01) or 0.0) if last_order_price > 0 and min_price_move_pct > 0: move_pct = abs(price - last_order_price) / last_order_price if move_pct < min_price_move_pct: self.state["last_action"] = "hold" return {"action": "hold", "price": price, "reason": "insufficient price move", "move_pct": move_pct} side = self._desired_side(price) amount = self._suggest_amount(side, price) trail_distance = float(self.config.get("trail_distance_pct", 0.03) or 0.03) if amount <= 0: self.state["last_action"] = "hold" return {"action": "hold", "price": price} try: market = self._market_symbol() if side == "sell": self.state["trailing_anchor"] = max(float(self.state.get("trailing_anchor") or 0.0), price) order_price = round(price * (1 - trail_distance), 8) else: self.state["trailing_anchor"] = min(float(self.state.get("trailing_anchor") or price), price) if self.state.get("trailing_anchor") else price order_price = round(price * (1 + trail_distance), 8) if self.config.get("debug_orders", True): self._log(f"{side} rebalance amount={amount:.6g} price={order_price} ratio={self._account_value_ratio(price):.4f}") result = self.context.place_order( side=side, order_type="limit", amount=amount, price=order_price, market=market, ) self.state["cooldown_remaining"] = int(self.config.get("cooldown_ticks", 2) or 2) self.state["last_order_at"] = now self.state["last_order_price"] = order_price self.state["last_action"] = f"{side}_rebalance" return {"action": side, "price": order_price, "amount": amount, "result": result} except Exception as exc: self.state["last_error"] = str(exc) self._log(f"rebalance failed: {exc}") self.state["last_action"] = "error" return {"action": "error", "price": price, "error": str(exc)} def report(self): snapshot = self.context.get_strategy_snapshot() if hasattr(self.context, "get_strategy_snapshot") else {} return { "identity": snapshot.get("identity", {}), "control": snapshot.get("control", {}), "fit": dict(getattr(self, "STRATEGY_PROFILE", {}) or {}), "position": { "balances": { "base_available": self.state.get("base_available", 0.0), "counter_available": self.state.get("counter_available", 0.0), }, "open_orders": snapshot.get("orders", {}).get("open_orders", []), "exposure": "managed", }, "state": { "last_price": self.state.get("last_price", 0.0), "last_action": self.state.get("last_action", "idle"), "trailing_anchor": self.state.get("trailing_anchor", 0.0), "cooldown_remaining": self.state.get("cooldown_remaining", 0), "regimes_updated_at": self.state.get("regimes_updated_at", ""), }, "assessment": { "confidence": None, "uncertainty": None, "reason": "defensive exposure protection", "warnings": [], "policy": dict(self.config.get("policy") or {}), }, "execution": snapshot.get("execution", {}), "supervision": self._supervision(), } def render(self): return { "widgets": [ {"type": "metric", "label": "market", "value": self._market_symbol()}, {"type": "metric", "label": "price", "value": round(float(self.state.get("last_price") or 0.0), 6)}, {"type": "metric", "label": "state", "value": self.state.get("last_action", "idle")}, {"type": "metric", "label": "base avail", "value": round(float(self.state.get("base_available") or 0.0), 8)}, {"type": "metric", "label": "counter avail", "value": round(float(self.state.get("counter_available") or 0.0), 8)}, {"type": "metric", "label": "ratio", "value": round(self._account_value_ratio(float(self.state.get("last_price") or 0.0) or 1.0), 4)}, {"type": "metric", "label": "trailing anchor", "value": round(float(self.state.get("trailing_anchor") or 0.0), 6)}, {"type": "metric", "label": "cooldown", "value": int(self.state.get("cooldown_remaining") or 0)}, {"type": "text", "label": "error", "value": self.state.get("last_error", "") or "none"}, {"type": "log", "label": "debug log", "lines": self.state.get("debug_log") or []}, ] }