| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353 |
- from __future__ import annotations
- from datetime import datetime, timezone
- from src.trader_mcp.strategy_sdk import Strategy
- from src.trader_mcp.logging_utils import log_event
- class Strategy(Strategy):
- LABEL = "Exposure Protector"
- STRATEGY_PROFILE = {
- "expects": {
- "trend": "mixed",
- "volatility": "moderate",
- "event_risk": "low",
- "liquidity": "normal",
- },
- "avoids": {
- "volatility": "chaotic",
- "event_risk": "high",
- "liquidity": "thin",
- },
- "risk_profile": "defensive",
- "capabilities": ["inventory_rebalancing", "exposure_trim", "companion_defense"],
- "role": "defensive",
- "inventory_behavior": "rebalancing",
- "requires_rebalance_before_start": False,
- "requires_rebalance_before_stop": False,
- "safe_when_unbalanced": True,
- "can_run_with": ["grid_trader", "trend_follower"],
- }
- TICK_MINUTES = 0.2
- CONFIG_SCHEMA = {
- "trail_distance_pct": {"type": "float", "default": 0.03, "min": 0.0, "max": 1.0},
- "rebalance_target_ratio": {"type": "float", "default": 0.5, "min": 0.0, "max": 1.0},
- "rebalance_step_ratio": {"type": "float", "default": 0.15, "min": 0.0, "max": 1.0},
- "min_order_size": {"type": "float", "default": 0.0, "min": 0.0},
- "max_order_size": {"type": "float", "default": 0.0, "min": 0.0},
- "order_spacing_ticks": {"type": "int", "default": 1, "min": 0, "max": 1000},
- "cooldown_ticks": {"type": "int", "default": 2, "min": 0, "max": 1000},
- "min_rebalance_seconds": {"type": "int", "default": 180, "min": 0, "max": 86400},
- "min_price_move_pct": {"type": "float", "default": 0.005, "min": 0.0, "max": 1.0},
- "balance_tolerance": {"type": "float", "default": 0.05, "min": 0.0, "max": 1.0},
- "fee_rate": {"type": "float", "default": 0.0025, "min": 0.0, "max": 0.05},
- "debug_orders": {"type": "bool", "default": True},
- }
- STATE_SCHEMA = {
- "last_price": {"type": "float", "default": 0.0},
- "last_action": {"type": "string", "default": "idle"},
- "last_error": {"type": "string", "default": ""},
- "debug_log": {"type": "list", "default": []},
- "regimes": {"type": "dict", "default": {}},
- "regimes_updated_at": {"type": "string", "default": ""},
- "base_available": {"type": "float", "default": 0.0},
- "counter_available": {"type": "float", "default": 0.0},
- "trailing_anchor": {"type": "float", "default": 0.0},
- "cooldown_remaining": {"type": "int", "default": 0},
- "last_order_at": {"type": "float", "default": 0.0},
- "last_order_price": {"type": "float", "default": 0.0},
- }
- def init(self):
- return {
- "last_price": 0.0,
- "last_action": "idle",
- "last_error": "",
- "debug_log": ["init exposure protector"],
- "regimes": {},
- "regimes_updated_at": "",
- "base_available": 0.0,
- "counter_available": 0.0,
- "trailing_anchor": 0.0,
- "cooldown_remaining": 0,
- "last_order_at": 0.0,
- "last_order_price": 0.0,
- }
- def _log(self, message: str) -> None:
- state = getattr(self, "state", {}) or {}
- log = list(state.get("debug_log") or [])
- log.append(message)
- state["debug_log"] = log[-12:]
- self.state = state
- log_event("stoploss", message)
- def _base_symbol(self) -> str:
- return (self.context.base_currency or self.context.market_symbol or "XRP").split("/")[0].upper()
- def _market_symbol(self) -> str:
- return self.context.market_symbol or f"{self._base_symbol().lower()}usd"
- def apply_policy(self):
- policy = super().apply_policy()
- risk = str(policy.get("risk_posture") or "normal").lower()
- priority = str(policy.get("priority") or "normal").lower()
- trail_map = {"cautious": 0.02, "normal": 0.03, "assertive": 0.04}
- step_map = {"cautious": 0.08, "normal": 0.15, "assertive": 0.25}
- wait_map = {"cautious": 420, "normal": 180, "assertive": 90}
- move_map = {"cautious": 0.01, "normal": 0.005, "assertive": 0.003}
- if priority in {"low", "background"}:
- trail = trail_map.get("cautious", 0.02)
- step = step_map.get("cautious", 0.08)
- wait = wait_map.get("cautious", 600)
- move = move_map.get("cautious", 0.02)
- elif priority in {"high", "urgent"}:
- trail = trail_map.get("assertive", 0.04)
- step = step_map.get("assertive", 0.25)
- wait = wait_map.get("assertive", 120)
- move = move_map.get("assertive", 0.005)
- else:
- trail = trail_map.get(risk, 0.03)
- step = step_map.get(risk, 0.15)
- wait = wait_map.get(risk, 300)
- move = move_map.get(risk, 0.01)
- self.config["trail_distance_pct"] = trail
- self.config["rebalance_step_ratio"] = step
- self.config["min_rebalance_seconds"] = wait
- self.config["min_price_move_pct"] = move
- self.state["policy_derived"] = {
- "trail_distance_pct": trail,
- "rebalance_step_ratio": step,
- "min_rebalance_seconds": wait,
- "min_price_move_pct": move,
- }
- return policy
- def _live_fee_rate(self) -> float:
- try:
- payload = self.context.get_fee_rates(self._market_symbol())
- return float(payload.get("maker") or 0.0)
- except Exception as exc:
- self._log(f"fee lookup failed: {exc}")
- return float(self.config.get("fee_rate", 0.0025) or 0.0)
- def _price(self) -> float:
- payload = self.context.get_price(self._base_symbol())
- return float(payload.get("price") or 0.0)
- def _refresh_regimes(self) -> None:
- try:
- self.state["regimes"] = self.context.get_strategy_snapshot().get("fit", {}) if hasattr(self.context, "get_strategy_snapshot") else {}
- except Exception:
- self.state["regimes"] = {}
- self.state["regimes_updated_at"] = datetime.now(timezone.utc).isoformat()
- def _refresh_balance_snapshot(self) -> None:
- try:
- info = self.context.get_account_info()
- except Exception as exc:
- self._log(f"balance refresh failed: {exc}")
- return
- balances = info.get("balances") if isinstance(info, dict) else []
- if not isinstance(balances, list):
- return
- base = self._base_symbol()
- quote = str(self.context.counter_currency or "USD").upper()
- for balance in balances:
- if not isinstance(balance, dict):
- continue
- asset = str(balance.get("asset_code") or "").upper()
- try:
- available = float(balance.get("available") if balance.get("available") is not None else balance.get("total") or 0.0)
- except Exception:
- continue
- if asset == base:
- self.state["base_available"] = available
- if asset == quote:
- self.state["counter_available"] = available
- def _account_value_ratio(self, price: float) -> float:
- base_value = float(self.state.get("base_available") or 0.0) * price
- counter_value = float(self.state.get("counter_available") or 0.0)
- total = base_value + counter_value
- if total <= 0:
- return 0.5
- return base_value / total
- def _supervision(self) -> dict:
- price = float(self.state.get("last_price") or 0.0)
- ratio = self._account_value_ratio(price if price > 0 else 1.0)
- target = float(self.config.get("rebalance_target_ratio", 0.5) or 0.5)
- tolerance = float(self.config.get("balance_tolerance", 0.05) or 0.05)
- drift = abs(ratio - target)
- last_error = str(self.state.get("last_error") or "")
- if drift >= 0.35:
- pressure = "critical"
- elif drift > tolerance:
- pressure = "elevated"
- else:
- pressure = "contained"
- return {
- "health": "degraded" if last_error else "healthy",
- "degraded": bool(last_error),
- "inventory_pressure": pressure,
- "capacity_available": drift > tolerance,
- "switch_readiness": "handoff_complete" if drift <= tolerance else "stay_attached",
- "last_reason": last_error or f"base_ratio={ratio:.3f}, target={target:.3f}, drift={drift:.3f}",
- "desired_companion": None,
- }
- def _desired_side(self, price: float) -> str:
- # If base dominates, sell some into strength, otherwise buy some back.
- ratio = self._account_value_ratio(price)
- target = float(self.config.get("rebalance_target_ratio", 0.5) or 0.5)
- return "sell" if ratio > target else "buy"
- def _suggest_amount(self, side: str, price: float) -> float:
- fee_rate = self._live_fee_rate()
- step_ratio = float(self.config.get("rebalance_step_ratio", 0.15) or 0.0)
- target = float(self.config.get("rebalance_target_ratio", 0.5) or 0.5)
- min_order = float(self.config.get("min_order_size", 0.0) or 0.0)
- max_order = float(self.config.get("max_order_size", 0.0) or 0.0)
- balance_tolerance = float(self.config.get("balance_tolerance", 0.05) or 0.0)
- base_value = float(self.state.get("base_available") or 0.0) * price
- counter_value = float(self.state.get("counter_available") or 0.0)
- total = base_value + counter_value
- if total <= 0 or price <= 0:
- return 0.0
- current = base_value / total
- drift = abs(current - target)
- if drift <= balance_tolerance:
- return 0.0
- notional = total * min(drift, step_ratio)
- if side == "sell":
- amount = notional / (price * (1 + fee_rate))
- amount = min(amount, float(self.state.get("base_available") or 0.0))
- else:
- amount = notional / (price * (1 + fee_rate))
- amount = min(amount, float(self.state.get("counter_available") or 0.0) / price if price > 0 else 0.0)
- if min_order > 0:
- amount = max(amount, min_order)
- if max_order > 0:
- amount = min(amount, max_order)
- return max(amount, 0.0)
- def on_tick(self, tick):
- self.state["last_error"] = ""
- self._log(f"tick alive price={self.state.get('last_price') or 0.0}")
- self._refresh_balance_snapshot()
- self._refresh_regimes()
- price = self._price()
- self.state["last_price"] = price
- if int(self.state.get("cooldown_remaining") or 0) > 0:
- self.state["cooldown_remaining"] = int(self.state.get("cooldown_remaining") or 0) - 1
- self.state["last_action"] = "cooldown"
- return {"action": "cooldown", "price": price}
- now = datetime.now(timezone.utc).timestamp()
- last_order_at = float(self.state.get("last_order_at") or 0.0)
- min_rebalance_seconds = int(self.config.get("min_rebalance_seconds", 300) or 0)
- if last_order_at and min_rebalance_seconds > 0 and (now - last_order_at) < min_rebalance_seconds:
- self.state["last_action"] = "hold"
- return {"action": "hold", "price": price, "reason": "rebalance cooldown"}
- last_order_price = float(self.state.get("last_order_price") or 0.0)
- min_price_move_pct = float(self.config.get("min_price_move_pct", 0.01) or 0.0)
- if last_order_price > 0 and min_price_move_pct > 0:
- move_pct = abs(price - last_order_price) / last_order_price
- if move_pct < min_price_move_pct:
- self.state["last_action"] = "hold"
- return {"action": "hold", "price": price, "reason": "insufficient price move", "move_pct": move_pct}
- side = self._desired_side(price)
- amount = self._suggest_amount(side, price)
- trail_distance = float(self.config.get("trail_distance_pct", 0.03) or 0.03)
- if amount <= 0:
- self.state["last_action"] = "hold"
- return {"action": "hold", "price": price}
- try:
- market = self._market_symbol()
- if side == "sell":
- self.state["trailing_anchor"] = max(float(self.state.get("trailing_anchor") or 0.0), price)
- order_price = round(price * (1 - trail_distance), 8)
- else:
- self.state["trailing_anchor"] = min(float(self.state.get("trailing_anchor") or price), price) if self.state.get("trailing_anchor") else price
- order_price = round(price * (1 + trail_distance), 8)
- if self.config.get("debug_orders", True):
- self._log(f"{side} rebalance amount={amount:.6g} price={order_price} ratio={self._account_value_ratio(price):.4f}")
- result = self.context.place_order(
- side=side,
- order_type="limit",
- amount=amount,
- price=order_price,
- market=market,
- )
- self.state["cooldown_remaining"] = int(self.config.get("cooldown_ticks", 2) or 2)
- self.state["last_order_at"] = now
- self.state["last_order_price"] = order_price
- self.state["last_action"] = f"{side}_rebalance"
- return {"action": side, "price": order_price, "amount": amount, "result": result}
- except Exception as exc:
- self.state["last_error"] = str(exc)
- self._log(f"rebalance failed: {exc}")
- self.state["last_action"] = "error"
- return {"action": "error", "price": price, "error": str(exc)}
- def report(self):
- snapshot = self.context.get_strategy_snapshot() if hasattr(self.context, "get_strategy_snapshot") else {}
- return {
- "identity": snapshot.get("identity", {}),
- "control": snapshot.get("control", {}),
- "fit": dict(getattr(self, "STRATEGY_PROFILE", {}) or {}),
- "position": {
- "balances": {
- "base_available": self.state.get("base_available", 0.0),
- "counter_available": self.state.get("counter_available", 0.0),
- },
- "open_orders": snapshot.get("orders", {}).get("open_orders", []),
- "exposure": "managed",
- },
- "state": {
- "last_price": self.state.get("last_price", 0.0),
- "last_action": self.state.get("last_action", "idle"),
- "trailing_anchor": self.state.get("trailing_anchor", 0.0),
- "cooldown_remaining": self.state.get("cooldown_remaining", 0),
- "regimes_updated_at": self.state.get("regimes_updated_at", ""),
- },
- "assessment": {
- "confidence": None,
- "uncertainty": None,
- "reason": "defensive exposure protection",
- "warnings": [],
- "policy": dict(self.config.get("policy") or {}),
- },
- "execution": snapshot.get("execution", {}),
- "supervision": self._supervision(),
- }
- def render(self):
- return {
- "widgets": [
- {"type": "metric", "label": "market", "value": self._market_symbol()},
- {"type": "metric", "label": "price", "value": round(float(self.state.get("last_price") or 0.0), 6)},
- {"type": "metric", "label": "state", "value": self.state.get("last_action", "idle")},
- {"type": "metric", "label": "base avail", "value": round(float(self.state.get("base_available") or 0.0), 8)},
- {"type": "metric", "label": "counter avail", "value": round(float(self.state.get("counter_available") or 0.0), 8)},
- {"type": "metric", "label": "ratio", "value": round(self._account_value_ratio(float(self.state.get("last_price") or 0.0) or 1.0), 4)},
- {"type": "metric", "label": "trailing anchor", "value": round(float(self.state.get("trailing_anchor") or 0.0), 6)},
- {"type": "metric", "label": "cooldown", "value": int(self.state.get("cooldown_remaining") or 0)},
- {"type": "text", "label": "error", "value": self.state.get("last_error", "") or "none"},
- {"type": "log", "label": "debug log", "lines": self.state.get("debug_log") or []},
- ]
- }
|