from __future__ import annotations import time from datetime import datetime, timezone from src.trader_mcp.strategy_sdk import Strategy class Strategy(Strategy): LABEL = "Grid Trader" TICK_MINUTES = 0.2 CONFIG_SCHEMA = { "grid_levels": {"type": "int", "default": 6, "min": 1, "max": 20}, "grid_step_pct": {"type": "float", "default": 0.012, "min": 0.001, "max": 0.1}, "volatility_timeframe": {"type": "string", "default": "1h"}, "volatility_multiplier": {"type": "float", "default": 0.5, "min": 0.0, "max": 10.0}, "grid_step_min_pct": {"type": "float", "default": 0.005, "min": 0.0001, "max": 0.5}, "grid_step_max_pct": {"type": "float", "default": 0.03, "min": 0.0001, "max": 1.0}, "order_size": {"type": "float", "default": 0.0, "min": 0.0}, "inventory_cap_pct": {"type": "float", "default": 0.7, "min": 0.0, "max": 1.0}, "recenter_pct": {"type": "float", "default": 0.05, "min": 0.0, "max": 0.5}, "fee_rate": {"type": "float", "default": 0.0025, "min": 0.0, "max": 0.05}, "trade_sides": {"type": "string", "default": "both"}, "max_notional_per_order": {"type": "float", "default": 0.0, "min": 0.0}, "order_call_delay_ms": {"type": "int", "default": 250, "min": 0, "max": 10000}, "enable_trend_guard": {"type": "bool", "default": True}, "trend_guard_reversal_max": {"type": "float", "default": 0.25, "min": 0.0, "max": 1.0}, "debug_orders": {"type": "bool", "default": True}, "use_all_available": {"type": "bool", "default": True}, } STATE_SCHEMA = { "center_price": {"type": "float", "default": 0.0}, "last_price": {"type": "float", "default": 0.0}, "seeded": {"type": "bool", "default": False}, "last_action": {"type": "string", "default": "idle"}, "last_error": {"type": "string", "default": ""}, "orders": {"type": "list", "default": []}, "order_ids": {"type": "list", "default": []}, "debug_log": {"type": "list", "default": []}, "base_available": {"type": "float", "default": 0.0}, "counter_available": {"type": "float", "default": 0.0}, "trend_guard_active": {"type": "bool", "default": False}, "regimes_updated_at": {"type": "string", "default": ""}, "account_snapshot_updated_at": {"type": "string", "default": ""}, "grid_refresh_pending_until": {"type": "string", "default": ""}, } def init(self): return { "center_price": 0.0, "last_price": 0.0, "seeded": False, "last_action": "idle", "last_error": "", "orders": [], "order_ids": [], "debug_log": ["init cancel all orders"], "base_available": 0.0, "counter_available": 0.0, "trend_guard_active": False, "regimes_updated_at": "", "account_snapshot_updated_at": "", "grid_refresh_pending_until": "", } def _log(self, message: str) -> None: state = getattr(self, "state", {}) or {} log = list(state.get("debug_log") or []) log.append(message) state["debug_log"] = log[-12:] self.state = state def _set_grid_refresh_pause(self, seconds: float = 30.0) -> None: self.state["grid_refresh_pending_until"] = (datetime.now(timezone.utc).timestamp() + max(seconds, 0.0)) def _grid_refresh_paused(self) -> bool: try: until = float(self.state.get("grid_refresh_pending_until") or 0.0) except Exception: until = 0.0 return until > datetime.now(timezone.utc).timestamp() def _base_symbol(self) -> str: return (self.context.base_currency or self.context.market_symbol or "XRP").split("/")[0].upper() def _market_symbol(self) -> str: return self.context.market_symbol or f"{self._base_symbol().lower()}usd" def _mode(self) -> str: return getattr(self.context, "mode", "active") or "active" def _price(self) -> float: payload = self.context.get_price(self._base_symbol()) return float(payload.get("price") or 0.0) def _regime_snapshot(self) -> dict: timeframes = ["1d", "4h", "1h", "15m"] snapshot = {} for tf in timeframes: try: snapshot[tf] = self.context.get_regime(self._base_symbol(), tf) except Exception as exc: snapshot[tf] = {"error": str(exc)} return snapshot def _refresh_regimes(self) -> None: self.state["regimes"] = self._regime_snapshot() self.state["regimes_updated_at"] = datetime.now(timezone.utc).isoformat() def _trend_guard_status(self) -> tuple[bool, str]: if not bool(self.config.get("enable_trend_guard", True)): return False, "disabled" reversal_max = float(self.config.get("trend_guard_reversal_max", 0.25) or 0.0) regimes = self.state.get("regimes") or self._regime_snapshot() d1 = (regimes.get("1d") or {}) if isinstance(regimes, dict) else {} h4 = (regimes.get("4h") or {}) if isinstance(regimes, dict) else {} d1_trend = str((d1.get("trend") or {}).get("state") or "unknown") h4_trend = str((h4.get("trend") or {}).get("state") or "unknown") d1_rev = float((d1.get("reversal") or {}).get("score") or 0.0) h4_rev = float((h4.get("reversal") or {}).get("score") or 0.0) strong_trend = d1_trend in {"bull", "bear"} and d1_trend == h4_trend weak_reversal = max(d1_rev, h4_rev) <= reversal_max active = bool(strong_trend and weak_reversal) reason = f"1d={d1_trend} 4h={h4_trend} rev={max(d1_rev, h4_rev):.3f}" return active, reason def _grid_step_pct(self) -> float: base_step = float(self.config.get("grid_step_pct", 0.012) or 0.012) tf = str(self.config.get("volatility_timeframe", "1h") or "1h") multiplier = float(self.config.get("volatility_multiplier", 0.5) or 0.0) min_step = float(self.config.get("grid_step_min_pct", 0.005) or 0.0) max_step = float(self.config.get("grid_step_max_pct", 0.03) or 1.0) try: regime = self.context.get_regime(self._base_symbol(), tf) short_regime = self.context.get_regime(self._base_symbol(), "15m") tf_atr_pct = float((regime or {}).get("volatility", {}).get("atr_percent") or 0.0) atr_pct = float((regime or {}).get("volatility", {}).get("atr_percent") or 0.0) short_atr_pct = float((short_regime or {}).get("volatility", {}).get("atr_percent") or 0.0) atr_pct = max(atr_pct, short_atr_pct) self.state["regimes"] = self._regime_snapshot() except Exception as exc: self._log(f"regime fetch failed: {exc}") tf_atr_pct = 0.0 atr_pct = 0.0 short_atr_pct = 0.0 adaptive = (atr_pct / 100.0) * multiplier if atr_pct > 0 else base_step step = adaptive if atr_pct > 0 else base_step step = max(step, min_step) step = min(step, max_step) self.state["grid_step_pct"] = step self.state["atr_percent_tf"] = tf_atr_pct self.state["atr_percent_15m"] = short_atr_pct self.state["atr_percent"] = atr_pct return step def _available_balance(self, asset_code: str) -> float: try: info = self.context.get_account_info() except Exception as exc: self._log(f"account info failed: {exc}") return 0.0 balances = info.get("balances") if isinstance(info, dict) else [] if not isinstance(balances, list): return 0.0 wanted = str(asset_code or "").upper() for balance in balances: if not isinstance(balance, dict): continue if str(balance.get("asset_code") or "").upper() != wanted: continue try: return float(balance.get("available") if balance.get("available") is not None else balance.get("total") or 0.0) except Exception: return 0.0 return 0.0 def _refresh_balance_snapshot(self) -> None: try: info = self.context.get_account_info() except Exception as exc: self._log(f"balance refresh failed: {exc}") return balances = info.get("balances") if isinstance(info, dict) else [] if not isinstance(balances, list): return base = self._base_symbol() quote = self.context.counter_currency or "USD" for balance in balances: if not isinstance(balance, dict): continue asset = str(balance.get("asset_code") or "").upper() try: available = float(balance.get("available") if balance.get("available") is not None else balance.get("total") or 0.0) except Exception: continue if asset == base: self.state["base_available"] = available if asset == str(quote).upper(): self.state["counter_available"] = available self.state["account_snapshot_updated_at"] = datetime.now(timezone.utc).isoformat() def _supported_levels(self, side: str, price: float, min_notional: float) -> int: if min_notional <= 0 or price <= 0: return 0 safety = 0.995 fee_rate = float(self.config.get("fee_rate", 0.0025) or 0.0) if side == "buy": quote = self.context.counter_currency or "USD" quote_available = self._available_balance(quote) self.state["counter_available"] = quote_available usable_notional = quote_available * safety return max(int(usable_notional / min_notional), 0) base = self._base_symbol() base_available = self._available_balance(base) self.state["base_available"] = base_available usable_notional = base_available * safety * price / (1 + fee_rate) return max(int(usable_notional / min_notional), 0) def _side_allowed(self, side: str) -> bool: selected = str(self.config.get("trade_sides", "both") or "both").strip().lower() if selected == "both": return True return selected == side def _desired_sides(self) -> set[str]: selected = str(self.config.get("trade_sides", "both") or "both").strip().lower() if selected == "both": return {"buy", "sell"} if selected in {"buy", "sell"}: return {selected} return {"buy", "sell"} def _suggest_amount(self, side: str, price: float, levels: int, min_notional: float) -> float: """Derive a per-order amount from the currently available balance. This helper is used when the grid seeds, tops up, or replaces an order. It folds in the live available balance, fee cushion, per-order caps, and the exchange minimum notional. If the wallet cannot support a valid order, it returns 0.0 instead of forcing an impossible minimum size. """ if levels <= 0 or price <= 0: return 0.0 safety = 0.995 fee_rate = float(self.config.get("fee_rate", 0.0025) or 0.0) max_notional = float(self.config.get("max_notional_per_order", 0.0) or 0.0) manual = float(self.config.get("order_size", 0.0) or 0.0) min_amount = (min_notional / price) if min_notional > 0 else 0.0 if side == "buy": quote = self.context.counter_currency or "USD" quote_available = self._available_balance(quote) self.state["counter_available"] = quote_available spendable_quote = quote_available * safety max_affordable = spendable_quote / (price * (1 + fee_rate)) if max_affordable < min_amount: return 0.0 amount = min(spendable_quote / (max(levels, 1) * price * (1 + fee_rate)), max_affordable) else: base = self._base_symbol() base_available = self._available_balance(base) self.state["base_available"] = base_available spendable_base = (base_available * safety) / (1 + fee_rate) max_affordable = spendable_base if max_affordable < min_amount: return 0.0 amount = min(spendable_base / max(levels, 1), max_affordable) amount = max(amount, min_amount * 1.05) if max_notional > 0 and price > 0: amount = min(amount, max_notional / (price * (1 + fee_rate))) if manual > 0: if manual >= min_amount: amount = min(amount, manual) else: self._log( f"manual order_size below minimum: order_size={manual:.6g} min_amount={min_amount:.6g} price={price} min_notional={min_notional}" ) return max(amount, 0.0) def _place_grid(self, center: float) -> None: mode = self._mode() levels = int(self.config.get("grid_levels", 6) or 6) step = self._grid_step_pct() min_notional = float(self.context.minimum_order_value or 0.0) market = self._market_symbol() orders = [] order_ids = [] def _capture_order_id(result): if isinstance(result, dict): return result.get("bitstamp_order_id") or result.get("order_id") or result.get("id") or result.get("client_order_id") return None buy_levels = min(levels, self._supported_levels("buy", center, min_notional)) if (mode == "active" and self._side_allowed("buy")) else (levels if self._side_allowed("buy") else 0) sell_levels = min(levels, self._supported_levels("sell", center, min_notional)) if (mode == "active" and self._side_allowed("sell")) else (levels if self._side_allowed("sell") else 0) buy_amount = self._suggest_amount("buy", center, max(buy_levels, 1), min_notional) sell_amount = self._suggest_amount("sell", center, max(sell_levels, 1), min_notional) for i in range(1, levels + 1): buy_price = round(center * (1 - (step * i)), 8) sell_price = round(center * (1 + (step * i)), 8) if mode != "active": orders.append({"side": "buy", "price": buy_price, "amount": buy_amount, "result": {"simulated": True}}) orders.append({"side": "sell", "price": sell_price, "amount": sell_amount, "result": {"simulated": True}}) self._log(f"plan level {i}: buy {buy_price} amount {buy_amount:.6g} / sell {sell_price} amount {sell_amount:.6g}") continue if i > buy_levels and i > sell_levels: self._log(f"skip level {i}: no capacity on either side") continue min_size_buy = (min_notional / buy_price) if buy_price > 0 else 0.0 min_size_sell = (min_notional / sell_price) if sell_price > 0 else 0.0 try: if i <= buy_levels and buy_amount >= min_size_buy: buy = self.context.place_order(side="buy", order_type="limit", amount=buy_amount, price=buy_price, market=market) orders.append({"side": "buy", "price": buy_price, "amount": buy_amount, "result": buy}) buy_id = _capture_order_id(buy) if buy_id is not None: order_ids.append(str(buy_id)) if i <= sell_levels and sell_amount >= min_size_sell: sell = self.context.place_order(side="sell", order_type="limit", amount=sell_amount, price=sell_price, market=market) orders.append({"side": "sell", "price": sell_price, "amount": sell_amount, "result": sell}) sell_id = _capture_order_id(sell) if sell_id is not None: order_ids.append(str(sell_id)) self._log(f"seed level {i}: buy {buy_price} amount {buy_amount:.6g} / sell {sell_price} amount {sell_amount:.6g}") except Exception as exc: # best effort for first draft self.state["last_error"] = str(exc) self._log(f"seed level {i} failed: {exc}") continue delay = max(int(self.config.get("order_call_delay_ms", 250) or 0), 0) / 1000.0 if delay > 0: time.sleep(delay) self.state["orders"] = orders self.state["order_ids"] = order_ids self.state["last_action"] = "seeded grid" self._set_grid_refresh_pause() def _place_side_grid(self, side: str, center: float, *, start_level: int = 1) -> None: levels = int(self.config.get("grid_levels", 6) or 6) step = self._grid_step_pct() min_notional = float(self.context.minimum_order_value or 0.0) fee_rate = float(self.config.get("fee_rate", 0.0025) or 0.0) safety = 0.995 market = self._market_symbol() orders = list(self.state.get("orders") or []) order_ids = list(self.state.get("order_ids") or []) side_levels = min(levels, self._supported_levels(side, center, min_notional)) amount = self._suggest_amount(side, center, max(side_levels, 1), min_notional) if side == "buy": quote = self.context.counter_currency or "USD" quote_available = self._available_balance(quote) max_affordable_amount = (quote_available * safety) / (center * (1 + fee_rate)) if center > 0 else 0.0 min_amount = (min_notional / center) if center > 0 and min_notional > 0 else 0.0 if max_affordable_amount < min_amount: self._log( f"skip side buy: insufficient counter balance quote={quote_available:.6g} max_affordable_amount={max_affordable_amount:.6g} min_amount={min_amount:.6g} fee_rate={fee_rate}" ) return amount = min(amount, max_affordable_amount) if side_levels <= 0 and min_notional > 0 and center > 0: min_amount = min_notional / center if amount >= min_amount: side_levels = 1 self._log(f"side {side} restored to 1 level because amount clears minimum: amount={amount:.6g} min_amount={min_amount:.6g}") self._log( f"prepare side {side}: market={market} center={center} levels={side_levels} amount={amount:.6g} min_notional={min_notional} existing_ids={order_ids}" ) for i in range(start_level, levels + 1): price = round(center * (1 - (step * i)) if side == "buy" else center * (1 + (step * i)), 8) min_size = (min_notional / price) if price > 0 else 0.0 if i > side_levels or amount < min_size: self._log( f"skip side {side} level {i}: amount={amount:.6g} below min_size={min_size:.6g} min_notional={min_notional} price={price}" ) continue try: self._log(f"place side {side} level {i}: price={price} amount={amount:.6g}") result = self.context.place_order(side=side, order_type="limit", amount=amount, price=price, market=market) status = None order_id = None if isinstance(result, dict): status = result.get("status") order_id = result.get("bitstamp_order_id") or result.get("order_id") or result.get("id") or result.get("client_order_id") self._log(f"place side {side} level {i} result status={status} order_id={order_id} raw={result}") orders.append({"side": side, "price": price, "amount": amount, "result": result}) if order_id is not None: order_ids.append(str(order_id)) self._log(f"seed side {side} level {i}: {price} amount {amount:.6g}") except Exception as exc: self.state["last_error"] = str(exc) self._log(f"seed side {side} level {i} failed: {exc}") continue delay = max(int(self.config.get("order_call_delay_ms", 250) or 0), 0) / 1000.0 if delay > 0: time.sleep(delay) self.state["orders"] = orders self.state["order_ids"] = order_ids self._log(f"side {side} placement complete: tracked_ids={order_ids}") self._set_grid_refresh_pause() def _top_up_missing_levels(self, center: float, live_orders: list[dict]) -> None: target_levels = int(self.config.get("grid_levels", 6) or 6) if target_levels <= 0: return for side in ("buy", "sell"): count = 0 for order in live_orders: if not isinstance(order, dict): continue if str(order.get("side") or "").lower() == side: count += 1 if 0 < count < target_levels: self._log(f"top up side {side}: have {count}, want {target_levels}") self._place_side_grid(side, center, start_level=count + 1) def _cancel_obsolete_side_orders(self, open_orders: list[dict], desired_sides: set[str]) -> list[str]: removed: list[str] = [] for order in open_orders: if not isinstance(order, dict): continue side = str(order.get("side") or "").lower() order_id = str(order.get("bitstamp_order_id") or order.get("order_id") or order.get("id") or order.get("client_order_id") or "") if not order_id or side in desired_sides: continue try: self.context.cancel_order(order_id) removed.append(order_id) self._log(f"cancelled obsolete {side} order {order_id}") except Exception as exc: self.state["last_error"] = str(exc) self._log(f"cancel obsolete {side} order {order_id} failed: {exc}") return removed def _cancel_surplus_side_orders(self, open_orders: list[dict], target_levels: int) -> list[str]: removed: list[str] = [] if target_levels <= 0: return removed for side in ("buy", "sell"): side_orders = [order for order in open_orders if isinstance(order, dict) and str(order.get("side") or "").lower() == side] if len(side_orders) <= target_levels: continue surplus = side_orders[target_levels:] for order in surplus: order_id = str(order.get("bitstamp_order_id") or order.get("order_id") or order.get("id") or order.get("client_order_id") or "") if not order_id: continue try: self.context.cancel_order(order_id) removed.append(order_id) self._log(f"cancelled surplus {side} order {order_id}") except Exception as exc: self.state["last_error"] = str(exc) self._log(f"cancel surplus {side} order {order_id} failed: {exc}") return removed def _cancel_duplicate_level_orders(self, open_orders: list[dict]) -> list[str]: removed: list[str] = [] seen: set[tuple[str, str]] = set() for order in open_orders: if not isinstance(order, dict): continue side = str(order.get("side") or "").lower() try: price_key = f"{float(order.get('price') or 0.0):.8f}" except Exception: price_key = str(order.get("price") or "") key = (side, price_key) order_id = str(order.get("bitstamp_order_id") or order.get("order_id") or order.get("id") or order.get("client_order_id") or "") if not order_id: continue if key in seen: try: self.context.cancel_order(order_id) removed.append(order_id) self._log(f"cancelled duplicate {side} level order {order_id} price={price_key}") except Exception as exc: self.state["last_error"] = str(exc) self._log(f"cancel duplicate {side} order {order_id} failed: {exc}") continue seen.add(key) return removed def _place_replacement_orders(self, vanished_orders: list[dict], price_hint: float) -> list[str]: placed: list[str] = [] if not vanished_orders: return placed market = self._market_symbol() for order in vanished_orders: if not isinstance(order, dict): continue side = str(order.get("side") or "").lower() opposite = "sell" if side == "buy" else "buy" if side == "sell" else "" if not opposite: continue try: amount = float(order.get("amount") or 0.0) price = float(order.get("price") or price_hint or 0.0) except Exception: continue if amount <= 0 or price <= 0: continue try: self._log(f"replace filled {side} order with {opposite}: price={price} amount={amount:.6g}") result = self.context.place_order(side=opposite, order_type="limit", amount=amount, price=price, market=market) order_id = None if isinstance(result, dict): order_id = result.get("bitstamp_order_id") or result.get("order_id") or result.get("id") or result.get("client_order_id") if order_id is not None: placed.append(str(order_id)) except Exception as exc: self.state["last_error"] = str(exc) self._log(f"replacement order failed for {side}→{opposite} at {price}: {exc}") return placed def _sync_open_orders_state(self) -> list[dict]: try: open_orders = self.context.get_open_orders() except Exception as exc: self.state["last_error"] = str(exc) self._log(f"open orders sync failed: {exc}") return [] if not isinstance(open_orders, list): open_orders = [] live_orders = [order for order in open_orders if isinstance(order, dict)] live_ids = [str(order.get("bitstamp_order_id") or order.get("order_id") or order.get("id") or order.get("client_order_id") or "") for order in live_orders] live_ids = [oid for oid in live_ids if oid] live_sides = [str(order.get("side") or "").lower() for order in live_orders] self.state["orders"] = live_orders self.state["order_ids"] = live_ids self.state["open_order_count"] = len(live_ids) self._log(f"sync live orders: count={len(live_ids)} sides={live_sides} ids={live_ids}") return live_orders def _cancel_orders(self, order_ids) -> None: for order_id in order_ids or []: self._log(f"dropping stale order {order_id} from state") def _reconcile_after_sync(self, previous_orders: list[dict], live_orders: list[dict], desired_sides: set[str], price: float) -> tuple[list[dict], list[str], int]: live_ids = list(self.state.get("order_ids") or []) open_order_count = len(live_ids) if self._mode() != "active": return live_orders, live_ids, open_order_count previous_ids = { str(order.get("bitstamp_order_id") or order.get("order_id") or order.get("id") or order.get("client_order_id") or "") for order in previous_orders if isinstance(order, dict) } current_ids = { str(order.get("bitstamp_order_id") or order.get("order_id") or order.get("id") or order.get("client_order_id") or "") for order in live_orders if isinstance(order, dict) } vanished_orders = [ order for order in previous_orders if isinstance(order, dict) and str(order.get("bitstamp_order_id") or order.get("order_id") or order.get("id") or order.get("client_order_id") or "") in (previous_ids - current_ids) ] if vanished_orders and not self._grid_refresh_paused(): replaced_ids = self._place_replacement_orders(vanished_orders, price) if replaced_ids: live_orders = self._sync_open_orders_state() live_ids = list(self.state.get("order_ids") or []) open_order_count = len(live_ids) surplus_cancelled = self._cancel_surplus_side_orders(live_orders, int(self.config.get("grid_levels", 6) or 6)) duplicate_cancelled = self._cancel_duplicate_level_orders(live_orders) if surplus_cancelled or duplicate_cancelled: live_orders = self._sync_open_orders_state() live_ids = list(self.state.get("order_ids") or []) open_order_count = len(live_ids) if desired_sides != {"buy", "sell"}: live_orders = self._sync_open_orders_state() live_ids = list(self.state.get("order_ids") or []) open_order_count = len(live_ids) return live_orders, live_ids, open_order_count def on_tick(self, tick): previous_orders = list(self.state.get("orders") or []) self._refresh_balance_snapshot() price = self._price() self.state["last_price"] = price self.state["last_error"] = "" self._refresh_regimes() try: live_orders = self._sync_open_orders_state() live_ids = list(self.state.get("order_ids") or []) open_order_count = len(live_ids) expected_ids = [str(oid) for oid in (self.state.get("order_ids") or []) if oid] stale_ids = [] missing_ids = [] except Exception as exc: open_order_count = -1 live_orders = [] live_ids = [] expected_ids = [] stale_ids = [] missing_ids = [] self.state["last_error"] = str(exc) self._log(f"open orders check failed: {exc}") self.state["open_order_count"] = open_order_count desired_sides = self._desired_sides() mode = self._mode() guard_active, guard_reason = self._trend_guard_status() self.state["trend_guard_active"] = guard_active if mode == "active" and guard_active: self._log(f"trend guard active: {guard_reason}") try: self.context.cancel_all_orders() except Exception as exc: self.state["last_error"] = str(exc) self._log(f"trend guard cancel failed: {exc}") self.state["last_action"] = "trend_guard" return {"action": "guard", "price": price, "reason": guard_reason} if mode != "active": if not self.state.get("seeded") or not self.state.get("center_price"): self.state["center_price"] = price self._place_grid(price) self.state["seeded"] = True self._log(f"planned grid at {price}") return {"action": "plan", "price": price} center = float(self.state.get("center_price") or price) recenter_pct = float(self.config.get("recenter_pct", 0.05) or 0.05) deviation = abs(price - center) / center if center else 0.0 if deviation >= recenter_pct: self.state["center_price"] = price self._place_grid(price) self._log(f"planned recenter to {price}") return {"action": "plan", "price": price, "deviation": deviation} self.state["last_action"] = "observe monitor" self._log(f"observe at {price} dev {deviation:.4f}") return {"action": "observe", "price": price, "deviation": deviation} if stale_ids: self._log(f"stale live orders: {stale_ids}") self._cancel_orders(stale_ids) live_ids = [oid for oid in live_ids if oid not in stale_ids] if missing_ids: self._log(f"missing tracked orders: {missing_ids}") self.state["order_ids"] = live_ids live_orders, live_ids, open_order_count = self._reconcile_after_sync(previous_orders, live_orders, desired_sides, price) if desired_sides != {"buy", "sell"}: current_sides = {str(order.get("side") or "").lower() for order in live_orders if isinstance(order, dict)} missing_side = next((side for side in desired_sides if side not in current_sides), None) if missing_side and self.state.get("center_price"): self._log(f"adding missing {missing_side} side after trade_sides change, live_sides={sorted(current_sides)} live_ids={live_ids}") self._place_side_grid(missing_side, float(self.state.get("center_price") or price)) live_orders = self._sync_open_orders_state() self._log(f"post-add sync: open_order_count={self.state.get('open_order_count', 0)} live_ids={self.state.get('order_ids') or []}") self.state["last_action"] = f"added {missing_side} side" return {"action": "add_side", "price": price, "side": missing_side} if desired_sides == {"buy", "sell"}: current_sides = {str(order.get("side") or "").lower() for order in live_orders if isinstance(order, dict)} missing_sides = [side for side in ("buy", "sell") if side not in current_sides] reconciled_sides: list[str] = [] if missing_sides and self.state.get("center_price") and not self._grid_refresh_paused(): for side in missing_sides: self._log(f"adding missing {side} side after trade_sides change, live_sides={sorted(current_sides)} live_ids={live_ids}") self._place_side_grid(side, float(self.state.get("center_price") or price)) reconciled_sides.append(side) live_orders = self._sync_open_orders_state() self._log(f"post-add sync: open_order_count={self.state.get('open_order_count', 0)} live_ids={self.state.get('order_ids') or []}") if live_orders and self.state.get("center_price") and not self._grid_refresh_paused(): self._top_up_missing_levels(float(self.state.get("center_price") or price), live_orders) live_orders = self._sync_open_orders_state() if reconciled_sides: self.state["last_action"] = f"reconciled {','.join(reconciled_sides)}" return {"action": "reconcile", "price": price, "side": ",".join(reconciled_sides)} if (not self.state.get("seeded") or not self.state.get("center_price")) and not self._grid_refresh_paused(): self.state["center_price"] = price self._place_grid(price) live_orders = self._sync_open_orders_state() self.state["seeded"] = True mode = self._mode() self._log(f"{'seeded' if mode == 'active' else 'planned'} grid at {price}") return {"action": "seed" if mode == "active" else "plan", "price": price} if (open_order_count == 0 or (expected_ids and not set(expected_ids).intersection(set(live_ids)))) and not self._grid_refresh_paused(): self._log("no open orders, reseeding grid") self.state["center_price"] = price self._place_grid(price) live_orders = self._sync_open_orders_state() mode = self._mode() self.state["last_action"] = "reseeded" if mode == "active" else f"{mode} monitor" return {"action": "reseed" if mode == "active" else "plan", "price": price} center = float(self.state.get("center_price") or price) recenter_pct = float(self.config.get("recenter_pct", 0.05) or 0.05) deviation = abs(price - center) / center if center else 0.0 if deviation >= recenter_pct and not self._grid_refresh_paused(): try: self.context.cancel_all_orders() except Exception as exc: self.state["last_error"] = str(exc) self.state["center_price"] = price self._place_grid(price) live_orders = self._sync_open_orders_state() mode = self._mode() self.state["last_action"] = "recentered" if mode == "active" else f"{mode} monitor" self._log(f"recentered grid to {price}") return {"action": "recenter" if mode == "active" else "plan", "price": price, "deviation": deviation} mode = self._mode() self.state["last_action"] = "hold" if mode == "active" else f"{mode} monitor" self._log(f"hold at {price} dev {deviation:.4f}") return {"action": "hold" if mode == "active" else "plan", "price": price, "deviation": deviation} def render(self): # Refresh the market-derived display values on render so the dashboard # reflects the same inputs the strategy would use on the next tick. live_step_pct = float(self.state.get("grid_step_pct") or 0.0) live_atr_pct = float(self.state.get("atr_percent") or 0.0) try: self._refresh_balance_snapshot() live_step_pct = self._grid_step_pct() live_atr_pct = float(self.state.get("atr_percent") or live_atr_pct) except Exception as exc: self._log(f"render refresh failed: {exc}") return { "widgets": [ {"type": "metric", "label": "market", "value": self._market_symbol()}, {"type": "metric", "label": "center", "value": round(float(self.state.get("center_price") or 0.0), 6)}, {"type": "metric", "label": "last price", "value": round(float(self.state.get("last_price") or 0.0), 6)}, {"type": "metric", "label": "state", "value": self.state.get("last_action", "idle")}, {"type": "metric", "label": "orders", "value": len(self.state.get("orders") or [])}, {"type": "metric", "label": "open orders", "value": self.state.get("open_order_count", 0)}, {"type": "metric", "label": f"ATR({self.config.get('volatility_timeframe', '1h')}) %", "value": round(live_atr_pct, 4)}, {"type": "metric", "label": "grid step %", "value": round(live_step_pct * 100.0, 4)}, {"type": "metric", "label": "1d", "value": ((self.state.get('regimes') or {}).get('1d') or {}).get('trend', {}).get('state', 'n/a')}, {"type": "metric", "label": "4h", "value": ((self.state.get('regimes') or {}).get('4h') or {}).get('trend', {}).get('state', 'n/a')}, {"type": "metric", "label": "1h", "value": ((self.state.get('regimes') or {}).get('1h') or {}).get('trend', {}).get('state', 'n/a')}, {"type": "metric", "label": "15m", "value": ((self.state.get('regimes') or {}).get('15m') or {}).get('trend', {}).get('state', 'n/a')}, {"type": "metric", "label": f"{self._base_symbol()} avail", "value": round(float(self.state.get("base_available") or 0.0), 8)}, {"type": "metric", "label": f"{self.context.counter_currency or 'USD'} avail", "value": round(float(self.state.get("counter_available") or 0.0), 8)}, *([ {"type": "metric", "label": "trend guard active", "value": "on"}, {"type": "text", "label": "trend guard reason", "value": "higher-timeframe trend conflict"}, ] if self.state.get("trend_guard_active") else []), {"type": "text", "label": "error", "value": self.state.get("last_error", "") or "none"}, {"type": "log", "label": "debug log", "lines": self.state.get("debug_log") or []}, ] }