from __future__ import annotations import time from datetime import datetime, timezone from src.trader_mcp.strategy_sdk import Strategy from src.trader_mcp.logging_utils import log_event class Strategy(Strategy): LABEL = "Grid Trader" TICK_MINUTES = 1.0 CONFIG_SCHEMA = { "grid_levels": {"type": "int", "default": 6, "min": 1, "max": 20}, "grid_step_pct": {"type": "float", "default": 0.012, "min": 0.001, "max": 0.1}, "volatility_timeframe": {"type": "string", "default": "1h"}, "volatility_multiplier": {"type": "float", "default": 0.5, "min": 0.0, "max": 10.0}, "grid_step_min_pct": {"type": "float", "default": 0.005, "min": 0.0001, "max": 0.5}, "grid_step_max_pct": {"type": "float", "default": 0.03, "min": 0.0001, "max": 1.0}, "order_size": {"type": "float", "default": 0.0, "min": 0.0}, "inventory_cap_pct": {"type": "float", "default": 0.7, "min": 0.0, "max": 1.0}, "recenter_pct": {"type": "float", "default": 0.05, "min": 0.0, "max": 0.5}, "recenter_atr_multiplier": {"type": "float", "default": 0.35, "min": 0.0, "max": 10.0}, "recenter_min_pct": {"type": "float", "default": 0.0025, "min": 0.0, "max": 0.5}, "recenter_max_pct": {"type": "float", "default": 0.03, "min": 0.0, "max": 0.5}, "center_shift_factor": {"type": "float", "default": 0.3333333333, "min": 0.0, "max": 1.0}, "fee_rate": {"type": "float", "default": 0.0025, "min": 0.0, "max": 0.05}, "trade_sides": {"type": "string", "default": "both"}, "max_notional_per_order": {"type": "float", "default": 0.0, "min": 0.0}, "dust_collect": {"type": "bool", "default": False}, "order_call_delay_ms": {"type": "int", "default": 250, "min": 0, "max": 10000}, "enable_trend_guard": {"type": "bool", "default": True}, "trend_guard_reversal_max": {"type": "float", "default": 0.25, "min": 0.0, "max": 1.0}, "debug_orders": {"type": "bool", "default": True}, "use_all_available": {"type": "bool", "default": True}, } STATE_SCHEMA = { "center_price": {"type": "float", "default": 0.0}, "last_price": {"type": "float", "default": 0.0}, "seeded": {"type": "bool", "default": False}, "last_action": {"type": "string", "default": "idle"}, "last_error": {"type": "string", "default": ""}, "orders": {"type": "list", "default": []}, "order_ids": {"type": "list", "default": []}, "debug_log": {"type": "list", "default": []}, "base_available": {"type": "float", "default": 0.0}, "counter_available": {"type": "float", "default": 0.0}, "trend_guard_active": {"type": "bool", "default": False}, "regimes_updated_at": {"type": "string", "default": ""}, "account_snapshot_updated_at": {"type": "string", "default": ""}, "last_balance_log_signature": {"type": "string", "default": ""}, "last_balance_log_at": {"type": "string", "default": ""}, "grid_refresh_pending_until": {"type": "string", "default": ""}, "mismatch_ticks": {"type": "int", "default": 0}, "recovery_cooldown_until": {"type": "string", "default": ""}, } def init(self): return { "center_price": 0.0, "last_price": 0.0, "seeded": False, "last_action": "idle", "last_error": "", "orders": [], "order_ids": [], "debug_log": ["init cancel all orders"], "base_available": 0.0, "counter_available": 0.0, "trend_guard_active": False, "regimes_updated_at": "", "account_snapshot_updated_at": "", "last_balance_log_signature": "", "last_balance_log_at": "", "grid_refresh_pending_until": "", "mismatch_ticks": 0, "recovery_cooldown_until": "", } def _log(self, message: str) -> None: state = getattr(self, "state", {}) or {} log = list(state.get("debug_log") or []) log.append(message) state["debug_log"] = log[-12:] self.state = state log_event("grid", message) def _log_decision(self, action: str, **fields) -> None: parts = [action] for key, value in fields.items(): parts.append(f"{key}={value}") self._log(", ".join(parts)) def _set_grid_refresh_pause(self, seconds: float = 30.0) -> None: self.state["grid_refresh_pending_until"] = (datetime.now(timezone.utc).timestamp() + max(seconds, 0.0)) def _grid_refresh_paused(self) -> bool: try: until = float(self.state.get("grid_refresh_pending_until") or 0.0) except Exception: until = 0.0 return until > datetime.now(timezone.utc).timestamp() def _recovery_paused(self) -> bool: try: until = float(self.state.get("recovery_cooldown_until") or 0.0) except Exception: until = 0.0 return until > datetime.now(timezone.utc).timestamp() def _trip_recovery_pause(self, seconds: float = 30.0) -> None: self.state["recovery_cooldown_until"] = (datetime.now(timezone.utc).timestamp() + max(seconds, 0.0)) def _recover_grid(self, price: float) -> None: self._log(f"recovery mode: cancel all and rebuild from {price}") try: self.context.cancel_all_orders() except Exception as exc: self.state["last_error"] = str(exc) self._log(f"recovery cancel-all failed: {exc}") self.state["orders"] = [] self.state["order_ids"] = [] self.state["open_order_count"] = 0 self.state["center_price"] = price self.state["seeded"] = True self._place_grid(price) self._sync_open_orders_state() self.state["mismatch_ticks"] = 0 self._trip_recovery_pause() def _order_count_mismatch(self, tracked_ids: list[str], live_orders: list[dict]) -> bool: live_ids = [str(order.get("bitstamp_order_id") or order.get("order_id") or order.get("id") or order.get("client_order_id") or "") for order in live_orders if isinstance(order, dict)] live_ids = [oid for oid in live_ids if oid] if len(live_ids) != len([oid for oid in tracked_ids if oid]): return True return False def _base_symbol(self) -> str: return (self.context.base_currency or self.context.market_symbol or "XRP").split("/")[0].upper() def _market_symbol(self) -> str: return self.context.market_symbol or f"{self._base_symbol().lower()}usd" def _live_fee_rates(self) -> tuple[float, float]: try: payload = self.context.get_fee_rates(self._market_symbol()) maker = float(payload.get("maker") or 0.0) taker = float(payload.get("taker") or 0.0) return maker, taker except Exception as exc: self._log(f"fee lookup failed: {exc}") fallback = float(self.config.get("fee_rate", 0.0025) or 0.0) return fallback, fallback def _live_fee_rate(self) -> float: _maker, taker = self._live_fee_rates() return taker def _mode(self) -> str: return getattr(self.context, "mode", "active") or "active" def _price(self) -> float: payload = self.context.get_price(self._base_symbol()) return float(payload.get("price") or 0.0) def _regime_snapshot(self) -> dict: timeframes = ["1d", "4h", "1h", "15m"] snapshot = {} for tf in timeframes: try: snapshot[tf] = self.context.get_regime(self._base_symbol(), tf) except Exception as exc: snapshot[tf] = {"error": str(exc)} return snapshot def _refresh_regimes(self) -> None: self.state["regimes"] = self._regime_snapshot() self.state["regimes_updated_at"] = datetime.now(timezone.utc).isoformat() def _trend_guard_status(self) -> tuple[bool, str]: if not bool(self.config.get("enable_trend_guard", True)): return False, "disabled" reversal_max = float(self.config.get("trend_guard_reversal_max", 0.25) or 0.0) regimes = self.state.get("regimes") or self._regime_snapshot() d1 = (regimes.get("1d") or {}) if isinstance(regimes, dict) else {} h4 = (regimes.get("4h") or {}) if isinstance(regimes, dict) else {} d1_trend = str((d1.get("trend") or {}).get("state") or "unknown") h4_trend = str((h4.get("trend") or {}).get("state") or "unknown") d1_rev = float((d1.get("reversal") or {}).get("score") or 0.0) h4_rev = float((h4.get("reversal") or {}).get("score") or 0.0) strong_trend = d1_trend in {"bull", "bear"} and d1_trend == h4_trend weak_reversal = max(d1_rev, h4_rev) <= reversal_max active = bool(strong_trend and weak_reversal) reason = f"1d={d1_trend} 4h={h4_trend} rev={max(d1_rev, h4_rev):.3f}" return active, reason def _recenter_threshold_pct(self) -> float: base_threshold = float(self.config.get("recenter_pct", 0.05) or 0.05) atr_multiplier = float(self.config.get("recenter_atr_multiplier", 0.35) or 0.0) min_threshold = float(self.config.get("recenter_min_pct", 0.0025) or 0.0) max_threshold = float(self.config.get("recenter_max_pct", 0.03) or 1.0) try: tf = str(self.config.get("volatility_timeframe", "1h") or "1h") regime = self.context.get_regime(self._base_symbol(), tf) short_regime = self.context.get_regime(self._base_symbol(), "15m") atr_pct = float((regime or {}).get("volatility", {}).get("atr_percent") or 0.0) short_atr_pct = float((short_regime or {}).get("volatility", {}).get("atr_percent") or 0.0) atr_pct = max(atr_pct, short_atr_pct) except Exception: atr_pct = 0.0 threshold = (atr_pct / 100.0) * atr_multiplier if atr_pct > 0 else base_threshold threshold = max(threshold, min_threshold) threshold = min(threshold, max_threshold) self.state["recenter_pct_live"] = threshold self.state["recenter_atr_percent"] = atr_pct return threshold def _grid_step_pct(self) -> float: base_step = float(self.config.get("grid_step_pct", 0.012) or 0.012) tf = str(self.config.get("volatility_timeframe", "1h") or "1h") multiplier = float(self.config.get("volatility_multiplier", 0.5) or 0.0) min_step = float(self.config.get("grid_step_min_pct", 0.005) or 0.0) max_step = float(self.config.get("grid_step_max_pct", 0.03) or 1.0) try: regime = self.context.get_regime(self._base_symbol(), tf) short_regime = self.context.get_regime(self._base_symbol(), "15m") tf_atr_pct = float((regime or {}).get("volatility", {}).get("atr_percent") or 0.0) atr_pct = float((regime or {}).get("volatility", {}).get("atr_percent") or 0.0) short_atr_pct = float((short_regime or {}).get("volatility", {}).get("atr_percent") or 0.0) atr_pct = max(atr_pct, short_atr_pct) self.state["regimes"] = self._regime_snapshot() except Exception as exc: self._log(f"regime fetch failed: {exc}") tf_atr_pct = 0.0 atr_pct = 0.0 short_atr_pct = 0.0 adaptive = (atr_pct / 100.0) * multiplier if atr_pct > 0 else base_step step = adaptive if atr_pct > 0 else base_step step = max(step, min_step) step = min(step, max_step) self.state["grid_step_pct"] = step self.state["atr_percent_tf"] = tf_atr_pct self.state["atr_percent_15m"] = short_atr_pct self.state["atr_percent"] = atr_pct return step def _config_warning(self) -> str | None: recenter_pct = float(self.state.get("recenter_pct_live") or self._recenter_threshold_pct()) grid_step_pct = float(self.state.get("grid_step_pct") or self._grid_step_pct()) if grid_step_pct <= 0: return None ratio = recenter_pct / grid_step_pct # If the recenter threshold is too close to the first step, the grid # can keep rebuilding before it has a fair chance to trade. if ratio <= 1.0: return f"warning: recenter threshold ({recenter_pct:.4f}) is <= grid step ({grid_step_pct:.4f}), it may recenter before trading" if ratio < 1.5: return f"warning: recenter threshold ({recenter_pct:.4f}) is only {ratio:.2f}x the grid step ({grid_step_pct:.4f}), consider widening it" return None def _available_balance(self, asset_code: str) -> float: try: info = self.context.get_account_info() except Exception as exc: self._log(f"account info failed: {exc}") return 0.0 balances = info.get("balances") if isinstance(info, dict) else [] if not isinstance(balances, list): return 0.0 wanted = str(asset_code or "").upper() for balance in balances: if not isinstance(balance, dict): continue if str(balance.get("asset_code") or "").upper() != wanted: continue try: return float(balance.get("available") if balance.get("available") is not None else balance.get("total") or 0.0) except Exception: return 0.0 return 0.0 def _refresh_balance_snapshot(self) -> None: try: info = self.context.get_account_info() except Exception as exc: self._log(f"balance refresh failed: {exc}") return balances = info.get("balances") if isinstance(info, dict) else [] if not isinstance(balances, list): return base = self._base_symbol() quote = self.context.counter_currency or "USD" for balance in balances: if not isinstance(balance, dict): continue asset = str(balance.get("asset_code") or "").upper() try: available = float(balance.get("available") if balance.get("available") is not None else balance.get("total") or 0.0) except Exception: continue if asset == base: self.state["base_available"] = available if asset == str(quote).upper(): self.state["counter_available"] = available self.state["account_snapshot_updated_at"] = datetime.now(timezone.utc).isoformat() signature = f"{base}:{self.state.get('base_available', 0.0):.8f}|{quote}:{self.state.get('counter_available', 0.0):.8f}" last_signature = str(self.state.get("last_balance_log_signature") or "") last_logged_at = str(self.state.get("last_balance_log_at") or "") now_iso = self.state["account_snapshot_updated_at"] should_log = signature != last_signature or not last_logged_at if not should_log: try: from datetime import datetime as _dt elapsed = (_dt.fromisoformat(now_iso) - _dt.fromisoformat(last_logged_at)).total_seconds() should_log = elapsed >= 60 except Exception: should_log = True if should_log: self.state["last_balance_log_signature"] = signature self.state["last_balance_log_at"] = now_iso self._log_decision( "balance snapshot", base=base, base_available=f"{self.state.get('base_available', 0.0):.6g}", quote=quote, quote_available=f"{self.state.get('counter_available', 0.0):.6g}", updated_at=now_iso, ) def _supported_levels(self, side: str, price: float, min_notional: float) -> int: if min_notional <= 0 or price <= 0: return 0 safety = 0.995 fee_rate = self._live_fee_rate() if side == "buy": quote = self.context.counter_currency or "USD" quote_available = self._available_balance(quote) self.state["counter_available"] = quote_available usable_notional = quote_available * safety return max(int(usable_notional / min_notional), 0) base = self._base_symbol() base_available = self._available_balance(base) self.state["base_available"] = base_available usable_notional = base_available * safety * price / (1 + fee_rate) return max(int(usable_notional / min_notional), 0) def _side_allowed(self, side: str) -> bool: selected = str(self.config.get("trade_sides", "both") or "both").strip().lower() if selected == "both": return True return selected == side def _desired_sides(self) -> set[str]: selected = str(self.config.get("trade_sides", "both") or "both").strip().lower() if selected == "both": return {"buy", "sell"} if selected in {"buy", "sell"}: return {selected} return {"buy", "sell"} def _suggest_amount(self, side: str, price: float, levels: int, min_notional: float) -> float: return self.context.suggest_order_amount( side=side, price=price, levels=levels, min_notional=min_notional, fee_rate=self._live_fee_rate(), max_notional_per_order=float(self.config.get("max_notional_per_order", 0.0) or 0.0), dust_collect=bool(self.config.get("dust_collect", False)), inventory_cap_pct=float(self.config.get("inventory_cap_pct", 0.0) or 0.0), order_size=float(self.config.get("order_size", 0.0) or 0.0), ) def _place_grid(self, center: float) -> None: center = self._maybe_refresh_center(center) mode = self._mode() levels = int(self.config.get("grid_levels", 6) or 6) step = self._grid_step_pct() min_notional = float(self.context.minimum_order_value or 0.0) market = self._market_symbol() orders = [] order_ids = [] def _capture_order_id(result): if isinstance(result, dict): return result.get("bitstamp_order_id") or result.get("order_id") or result.get("id") or result.get("client_order_id") return None buy_levels = min(levels, self._supported_levels("buy", center, min_notional)) if (mode == "active" and self._side_allowed("buy")) else (levels if self._side_allowed("buy") else 0) sell_levels = min(levels, self._supported_levels("sell", center, min_notional)) if (mode == "active" and self._side_allowed("sell")) else (levels if self._side_allowed("sell") else 0) buy_amount = self._suggest_amount("buy", center, max(buy_levels, 1), min_notional) sell_amount = self._suggest_amount("sell", center, max(sell_levels, 1), min_notional) for i in range(1, levels + 1): buy_price = round(center * (1 - (step * i)), 8) sell_price = round(center * (1 + (step * i)), 8) if mode != "active": orders.append({"side": "buy", "price": buy_price, "amount": buy_amount, "result": {"simulated": True}}) orders.append({"side": "sell", "price": sell_price, "amount": sell_amount, "result": {"simulated": True}}) self._log(f"plan level {i}: buy {buy_price} amount {buy_amount:.6g} / sell {sell_price} amount {sell_amount:.6g}") continue if i > buy_levels and i > sell_levels: self._log(f"skip level {i}: no capacity on either side") continue min_size_buy = (min_notional / buy_price) if buy_price > 0 else 0.0 min_size_sell = (min_notional / sell_price) if sell_price > 0 else 0.0 try: if i <= buy_levels and buy_amount >= min_size_buy: buy = self.context.place_order(side="buy", order_type="limit", amount=buy_amount, price=buy_price, market=market) orders.append({"side": "buy", "price": buy_price, "amount": buy_amount, "result": buy}) buy_id = _capture_order_id(buy) if buy_id is not None: order_ids.append(str(buy_id)) if i <= sell_levels and sell_amount >= min_size_sell: sell = self.context.place_order(side="sell", order_type="limit", amount=sell_amount, price=sell_price, market=market) orders.append({"side": "sell", "price": sell_price, "amount": sell_amount, "result": sell}) sell_id = _capture_order_id(sell) if sell_id is not None: order_ids.append(str(sell_id)) self._log(f"seed level {i}: buy {buy_price} amount {buy_amount:.6g} / sell {sell_price} amount {sell_amount:.6g}") except Exception as exc: # best effort for first draft self.state["last_error"] = str(exc) self._log(f"seed level {i} failed: {exc}") continue delay = max(int(self.config.get("order_call_delay_ms", 250) or 0), 0) / 1000.0 if delay > 0: time.sleep(delay) self.state["orders"] = orders self.state["order_ids"] = order_ids self.state["last_action"] = "seeded grid" self._set_grid_refresh_pause() def _place_side_grid(self, side: str, center: float, *, start_level: int = 1) -> None: center = self._maybe_refresh_center(center) levels = int(self.config.get("grid_levels", 6) or 6) step = self._grid_step_pct() min_notional = float(self.context.minimum_order_value or 0.0) fee_rate = self._live_fee_rate() safety = 0.995 market = self._market_symbol() orders = list(self.state.get("orders") or []) order_ids = list(self.state.get("order_ids") or []) placement_levels = max(levels - max(start_level, 1) + 1, 0) side_levels = min(placement_levels, self._supported_levels(side, center, min_notional)) amount = self._suggest_amount(side, center, max(side_levels, 1), min_notional) if side == "buy": quote = self.context.counter_currency or "USD" quote_available = self._available_balance(quote) max_affordable_amount = (quote_available * safety) / (center * (1 + fee_rate)) if center > 0 else 0.0 min_amount = (min_notional / center) if center > 0 and min_notional > 0 else 0.0 if max_affordable_amount < min_amount: self._log_decision( f"skip side {side}", reason="insufficient_counter_balance", quote=f"{quote_available:.6g}", max_affordable_amount=f"{max_affordable_amount:.6g}", min_amount=f"{min_amount:.6g}", fee_rate=f"{fee_rate:.6g}", ) return amount = min(amount, max_affordable_amount) if side_levels <= 0 and min_notional > 0 and center > 0: min_amount = min_notional / center if amount >= min_amount: side_levels = 1 self._log(f"side {side} restored to 1 level because amount clears minimum: amount={amount:.6g} min_amount={min_amount:.6g}") self._log( f"prepare side {side}, market={market}, center={center}, levels={side_levels}, start_level={start_level}, amount={amount:.6g}, min_notional={min_notional}, existing_ids={order_ids}" ) for i in range(start_level, levels + 1): price = round(center * (1 - (step * i)) if side == "buy" else center * (1 + (step * i)), 8) min_size = (min_notional / price) if price > 0 else 0.0 relative_level = i - start_level + 1 if relative_level > side_levels or amount < min_size: self._log_decision( f"skip side {side} level {i}", reason="below_min_size", amount=f"{amount:.6g}", min_size=f"{min_size:.6g}", min_notional=min_notional, price=price, ) continue try: self._log_decision(f"place side {side} level {i}", price=price, amount=f"{amount:.6g}") result = self.context.place_order(side=side, order_type="limit", amount=amount, price=price, market=market) status = None order_id = None if isinstance(result, dict): status = result.get("status") order_id = result.get("bitstamp_order_id") or result.get("order_id") or result.get("id") or result.get("client_order_id") self._log_decision(f"place side {side} level {i} result", status=status, order_id=order_id) orders.append({"side": side, "price": price, "amount": amount, "result": result}) if order_id is not None: order_ids.append(str(order_id)) self._log_decision(f"seed side {side} level {i}", price=price, amount=f"{amount:.6g}") except Exception as exc: self.state["last_error"] = str(exc) self._log_decision(f"seed side {side} level {i} failed", error=str(exc)) continue delay = max(int(self.config.get("order_call_delay_ms", 250) or 0), 0) / 1000.0 if delay > 0: time.sleep(delay) self.state["orders"] = orders self.state["order_ids"] = order_ids self._log_decision(f"side {side} placement complete", tracked_ids=order_ids) self._set_grid_refresh_pause() def _top_up_missing_levels(self, center: float, live_orders: list[dict]) -> None: center = self._maybe_refresh_center(center) target_levels = int(self.config.get("grid_levels", 6) or 6) if target_levels <= 0: return for side in ("buy", "sell"): count = 0 for order in live_orders: if not isinstance(order, dict): continue if str(order.get("side") or "").lower() == side: count += 1 if 0 < count < target_levels: self._log(f"top up side {side}: have {count}, want {target_levels}") self._place_side_grid(side, center, start_level=count + 1) def _cancel_obsolete_side_orders(self, open_orders: list[dict], desired_sides: set[str]) -> list[str]: removed: list[str] = [] for order in open_orders: if not isinstance(order, dict): continue side = str(order.get("side") or "").lower() order_id = str(order.get("bitstamp_order_id") or order.get("order_id") or order.get("id") or order.get("client_order_id") or "") if not order_id or side in desired_sides: continue try: self.context.cancel_order(order_id) removed.append(order_id) self._log(f"cancelled obsolete {side} order {order_id}") except Exception as exc: self.state["last_error"] = str(exc) self._log(f"cancel obsolete {side} order {order_id} failed: {exc}") return removed def _cancel_surplus_side_orders(self, open_orders: list[dict], target_levels: int) -> list[str]: removed: list[str] = [] if target_levels <= 0: return removed for side in ("buy", "sell"): side_orders = [order for order in open_orders if isinstance(order, dict) and str(order.get("side") or "").lower() == side] if len(side_orders) <= target_levels: continue surplus = side_orders[target_levels:] for order in surplus: order_id = str(order.get("bitstamp_order_id") or order.get("order_id") or order.get("id") or order.get("client_order_id") or "") if not order_id: continue try: self.context.cancel_order(order_id) removed.append(order_id) self._log(f"cancelled surplus {side} order {order_id}") except Exception as exc: self.state["last_error"] = str(exc) self._log(f"cancel surplus {side} order {order_id} failed: {exc}") return removed def _cancel_duplicate_level_orders(self, open_orders: list[dict]) -> list[str]: removed: list[str] = [] seen: set[tuple[str, str]] = set() for order in open_orders: if not isinstance(order, dict): continue side = str(order.get("side") or "").lower() try: price_key = f"{float(order.get('price') or 0.0):.8f}" except Exception: price_key = str(order.get("price") or "") key = (side, price_key) order_id = str(order.get("bitstamp_order_id") or order.get("order_id") or order.get("id") or order.get("client_order_id") or "") if not order_id: continue if key in seen: try: self.context.cancel_order(order_id) removed.append(order_id) self._log(f"cancelled duplicate {side} level order {order_id} price={price_key}") except Exception as exc: self.state["last_error"] = str(exc) self._log(f"cancel duplicate {side} order {order_id} failed: {exc}") continue seen.add(key) return removed def _place_replacement_orders(self, vanished_orders: list[dict], price_hint: float) -> list[str]: placed: list[str] = [] if not vanished_orders: return placed market = self._market_symbol() for order in vanished_orders: if not isinstance(order, dict): continue side = str(order.get("side") or "").lower() opposite = "sell" if side == "buy" else "buy" if side == "sell" else "" if not opposite: continue try: amount = float(order.get("amount") or 0.0) price = float(order.get("price") or price_hint or 0.0) except Exception: continue if amount <= 0 or price <= 0: continue try: self._log(f"replace filled {side} order with {opposite}: price={price} amount={amount:.6g}") result = self.context.place_order(side=opposite, order_type="limit", amount=amount, price=price, market=market) order_id = None if isinstance(result, dict): order_id = result.get("bitstamp_order_id") or result.get("order_id") or result.get("id") or result.get("client_order_id") if order_id is not None: placed.append(str(order_id)) except Exception as exc: self.state["last_error"] = str(exc) self._log(f"replacement order failed for {side}→{opposite} at {price}: {exc}") return placed def _recenter_and_rebuild_from_fill(self, fill_price: float) -> None: fill_price = self._maybe_refresh_center(fill_price) """Treat a fill as the new market anchor and rebuild the full grid from there.""" if fill_price <= 0: return self._recenter_and_rebuild_from_price(fill_price, "fill rebuild") def _shifted_center_price(self, current_center: float, price: float) -> float: if current_center <= 0: return price if price <= 0: return current_center factor = float(self.config.get("center_shift_factor", 1.0 / 3.0) or 0.0) factor = max(0.0, min(1.0, factor)) return price + (current_center - price) * factor def _recenter_and_rebuild_from_price(self, price: float, reason: str) -> None: current = float(self.state.get("center_price") or 0.0) if price <= 0: return new_center = self._shifted_center_price(current, price) self._log(f"{reason}: shift center from {current} to {new_center} using price={price}") try: self.context.cancel_all_orders() except Exception as exc: self.state["last_error"] = str(exc) self._log(f"{reason} cancel-all failed: {exc}") self.state["center_price"] = new_center self.state["seeded"] = True self._place_grid(new_center) self._set_grid_refresh_pause() def _maybe_refresh_center(self, price: float) -> float: if price <= 0: return price current = float(self.state.get("center_price") or 0.0) if current <= 0: self.state["center_price"] = price return price deviation = abs(price - current) / current if current else 0.0 threshold = self._recenter_threshold_pct() if deviation >= threshold: self._log(f"recenter anchor from {current} to {price} dev={deviation:.4f} threshold={threshold:.4f}") self.state["center_price"] = price return price return current def _sync_open_orders_state(self) -> list[dict]: try: open_orders = self.context.get_open_orders() except Exception as exc: self.state["last_error"] = str(exc) self._log(f"open orders sync failed: {exc}") return [] if not isinstance(open_orders, list): open_orders = [] live_orders = [order for order in open_orders if isinstance(order, dict)] live_ids = [str(order.get("bitstamp_order_id") or order.get("order_id") or order.get("id") or order.get("client_order_id") or "") for order in live_orders] live_ids = [oid for oid in live_ids if oid] live_sides = [str(order.get("side") or "").lower() for order in live_orders] self.state["orders"] = live_orders self.state["order_ids"] = live_ids self.state["open_order_count"] = len(live_ids) self._log(f"sync live orders: count={len(live_ids)} sides={live_sides} ids={live_ids}") return live_orders def _cancel_orders(self, order_ids) -> None: for order_id in order_ids or []: self._log(f"dropping stale order {order_id} from state") def _reconcile_after_sync(self, previous_orders: list[dict], live_orders: list[dict], desired_sides: set[str], price: float) -> tuple[list[dict], list[str], int]: live_ids = list(self.state.get("order_ids") or []) open_order_count = len(live_ids) if self._mode() != "active": return live_orders, live_ids, open_order_count previous_ids = { str(order.get("bitstamp_order_id") or order.get("order_id") or order.get("id") or order.get("client_order_id") or "") for order in previous_orders if isinstance(order, dict) } current_ids = { str(order.get("bitstamp_order_id") or order.get("order_id") or order.get("id") or order.get("client_order_id") or "") for order in live_orders if isinstance(order, dict) } vanished_orders = [ order for order in previous_orders if isinstance(order, dict) and str(order.get("bitstamp_order_id") or order.get("order_id") or order.get("id") or order.get("client_order_id") or "") in (previous_ids - current_ids) ] if vanished_orders and not self._grid_refresh_paused(): for order in vanished_orders: order_id = str(order.get("bitstamp_order_id") or order.get("order_id") or order.get("id") or order.get("client_order_id") or "") if not order_id: continue try: payload = self.context.query_order(order_id) except Exception as exc: self._log(f"order status query failed for {order_id}: {exc}") continue raw = payload.get("raw") if isinstance(payload, dict) else {} if not isinstance(raw, dict): raw = {} status = str(payload.get("status") or raw.get("status") or order.get("status") or "").strip().lower() if status in {"finished", "filled", "closed"}: fill_price = 0.0 for candidate in (raw.get("price"), order.get("price"), price): try: fill_price = float(candidate or 0.0) except Exception: fill_price = 0.0 if fill_price > 0: break if fill_price > 0: self._log(f"filled order {order_id} detected via exec status={status}, recentering at {fill_price}") self._recenter_and_rebuild_from_fill(fill_price) live_orders = self._sync_open_orders_state() live_ids = list(self.state.get("order_ids") or []) open_order_count = len(live_ids) return live_orders, live_ids, open_order_count if status in {"cancelled", "expired", "missing"}: self._log(f"vanished order {order_id} resolved as {status}") continue surplus_cancelled = self._cancel_surplus_side_orders(live_orders, int(self.config.get("grid_levels", 6) or 6)) duplicate_cancelled = self._cancel_duplicate_level_orders(live_orders) if surplus_cancelled or duplicate_cancelled: live_orders = self._sync_open_orders_state() live_ids = list(self.state.get("order_ids") or []) open_order_count = len(live_ids) if desired_sides != {"buy", "sell"}: live_orders = self._sync_open_orders_state() live_ids = list(self.state.get("order_ids") or []) open_order_count = len(live_ids) return live_orders, live_ids, open_order_count def on_tick(self, tick): previous_orders = list(self.state.get("orders") or []) tracked_ids_before_sync = list(self.state.get("order_ids") or []) self._refresh_balance_snapshot() price = self._price() self.state["last_price"] = price self.state["last_error"] = "" self._refresh_regimes() try: live_orders = self._sync_open_orders_state() live_ids = list(self.state.get("order_ids") or []) open_order_count = len(live_ids) expected_ids = [str(oid) for oid in (self.state.get("order_ids") or []) if oid] stale_ids = [] missing_ids = [] except Exception as exc: open_order_count = -1 live_orders = [] live_ids = [] expected_ids = [] stale_ids = [] missing_ids = [] self.state["last_error"] = str(exc) self._log(f"open orders check failed: {exc}") self.state["open_order_count"] = open_order_count desired_sides = self._desired_sides() mode = self._mode() guard_active, guard_reason = self._trend_guard_status() self.state["trend_guard_active"] = guard_active if mode == "active" and guard_active: self._log(f"trend guard active: {guard_reason}") try: self.context.cancel_all_orders() except Exception as exc: self.state["last_error"] = str(exc) self._log(f"trend guard cancel failed: {exc}") self.state["last_action"] = "trend_guard" return {"action": "guard", "price": price, "reason": guard_reason} if mode != "active": if not self.state.get("seeded") or not self.state.get("center_price"): self.state["center_price"] = price self._place_grid(price) self.state["seeded"] = True self._log(f"planned grid at {price}") return {"action": "plan", "price": price} center = float(self.state.get("center_price") or price) recenter_pct = float(self.config.get("recenter_pct", 0.05) or 0.05) deviation = abs(price - center) / center if center else 0.0 if deviation >= recenter_pct: self.state["center_price"] = price self._place_grid(price) self._log(f"planned recenter to {price}") return {"action": "plan", "price": price, "deviation": deviation} self.state["last_action"] = "observe monitor" self._log(f"observe at {price} dev {deviation:.4f}") return {"action": "observe", "price": price, "deviation": deviation} if stale_ids: self._log(f"stale live orders: {stale_ids}") self._cancel_orders(stale_ids) live_ids = [oid for oid in live_ids if oid not in stale_ids] if missing_ids: self._log(f"missing tracked orders: {missing_ids}") self.state["order_ids"] = live_ids if self._order_count_mismatch(tracked_ids_before_sync, live_orders): self.state["mismatch_ticks"] = int(self.state.get("mismatch_ticks") or 0) + 1 self._log(f"order count mismatch detected: tracked={len(tracked_ids_before_sync)} live={len(live_orders)} ticks={self.state['mismatch_ticks']}") if self.state["mismatch_ticks"] >= 2 and not self._recovery_paused() and self._mode() == "active": self._recover_grid(price) return {"action": "recovery", "price": price} else: self.state["mismatch_ticks"] = 0 center = self._maybe_refresh_center(float(self.state.get("center_price") or price)) recenter_pct = self._recenter_threshold_pct() deviation = abs(price - center) / center if center else 0.0 if mode == "active" and deviation >= recenter_pct and not self._grid_refresh_paused(): self._log(f"recenter needed at price={price} center={center} dev={deviation:.4f} threshold={recenter_pct:.4f}") self._recenter_and_rebuild_from_price(price, "recenter") live_orders = self._sync_open_orders_state() live_ids = list(self.state.get("order_ids") or []) open_order_count = len(live_ids) self.state["last_action"] = "recentered" return {"action": "recenter", "price": price, "deviation": deviation} live_orders, live_ids, open_order_count = self._reconcile_after_sync(previous_orders, live_orders, desired_sides, price) if desired_sides != {"buy", "sell"}: current_sides = {str(order.get("side") or "").lower() for order in live_orders if isinstance(order, dict)} missing_side = next((side for side in desired_sides if side not in current_sides), None) if missing_side and self.state.get("center_price"): self._log(f"adding missing {missing_side} side after trade_sides change, live_sides={sorted(current_sides)} live_ids={live_ids}") self._place_side_grid(missing_side, float(self.state.get("center_price") or price)) live_orders = self._sync_open_orders_state() self._log(f"post-add sync: open_order_count={self.state.get('open_order_count', 0)} live_ids={self.state.get('order_ids') or []}") self.state["last_action"] = f"added {missing_side} side" return {"action": "add_side", "price": price, "side": missing_side} if desired_sides == {"buy", "sell"}: current_sides = {str(order.get("side") or "").lower() for order in live_orders if isinstance(order, dict)} tracked_sides = {str(order.get("side") or "").lower() for order in previous_orders if isinstance(order, dict)} missing_sides = [side for side in ("buy", "sell") if side not in current_sides] reconciled_sides: list[str] = [] has_live_grid = bool(live_orders) or bool(live_ids) or bool(tracked_sides) # If the grid is empty because both sides were skipped, do not keep # trying to "restore" a missing side every tick. Let the normal # reseed path decide when to try again. if missing_sides and has_live_grid and self.state.get("center_price") and not self._grid_refresh_paused(): for side in missing_sides: if current_sides or tracked_sides: self._log(f"adding missing {side} side, live_sides={sorted(current_sides)} tracked_sides={sorted(tracked_sides)} live_ids={live_ids}") self._place_side_grid(side, float(self.state.get("center_price") or price)) reconciled_sides.append(side) live_orders = self._sync_open_orders_state() self._log(f"post-add sync: open_order_count={self.state.get('open_order_count', 0)} live_ids={self.state.get('order_ids') or []}") if live_orders and self.state.get("center_price") and not self._grid_refresh_paused(): self._top_up_missing_levels(float(self.state.get("center_price") or price), live_orders) live_orders = self._sync_open_orders_state() if reconciled_sides: self.state["last_action"] = f"reconciled {','.join(reconciled_sides)}" return {"action": "reconcile", "price": price, "side": ",".join(reconciled_sides)} if (not self.state.get("seeded") or not self.state.get("center_price")) and not self._grid_refresh_paused(): self.state["center_price"] = price self._place_grid(price) live_orders = self._sync_open_orders_state() self.state["seeded"] = True mode = self._mode() self._log(f"{'seeded' if mode == 'active' else 'planned'} grid at {price}") return {"action": "seed" if mode == "active" else "plan", "price": price} if (open_order_count == 0 or (expected_ids and not set(expected_ids).intersection(set(live_ids)))) and not self._grid_refresh_paused(): self._log("no open orders, reseeding grid") self.state["center_price"] = price self._place_grid(price) live_orders = self._sync_open_orders_state() mode = self._mode() self.state["last_action"] = "reseeded" if mode == "active" else f"{mode} monitor" return {"action": "reseed" if mode == "active" else "plan", "price": price} mode = self._mode() self.state["last_action"] = "hold" if mode == "active" else f"{mode} monitor" self._log(f"hold at {price} dev {deviation:.4f}") return {"action": "hold" if mode == "active" else "plan", "price": price, "deviation": deviation} def render(self): # Refresh the market-derived display values on render so the dashboard # reflects the same inputs the strategy would use on the next tick. live_step_pct = float(self.state.get("grid_step_pct") or 0.0) live_atr_pct = float(self.state.get("atr_percent") or 0.0) try: self._refresh_balance_snapshot() live_step_pct = self._grid_step_pct() live_atr_pct = float(self.state.get("atr_percent") or live_atr_pct) except Exception as exc: self._log(f"render refresh failed: {exc}") return { "widgets": [ {"type": "metric", "label": "market", "value": self._market_symbol()}, {"type": "metric", "label": "center", "value": round(float(self.state.get("center_price") or 0.0), 6)}, {"type": "metric", "label": "last price", "value": round(float(self.state.get("last_price") or 0.0), 6)}, {"type": "metric", "label": "state", "value": self.state.get("last_action", "idle")}, {"type": "metric", "label": "orders", "value": len(self.state.get("orders") or [])}, {"type": "metric", "label": "open orders", "value": self.state.get("open_order_count", 0)}, {"type": "metric", "label": f"ATR({self.config.get('volatility_timeframe', '1h')}) %", "value": round(live_atr_pct, 4)}, {"type": "metric", "label": "grid step %", "value": round(live_step_pct * 100.0, 4)}, {"type": "metric", "label": "1d", "value": ((self.state.get('regimes') or {}).get('1d') or {}).get('trend', {}).get('state', 'n/a')}, {"type": "metric", "label": "4h", "value": ((self.state.get('regimes') or {}).get('4h') or {}).get('trend', {}).get('state', 'n/a')}, {"type": "metric", "label": "1h", "value": ((self.state.get('regimes') or {}).get('1h') or {}).get('trend', {}).get('state', 'n/a')}, {"type": "metric", "label": "15m", "value": ((self.state.get('regimes') or {}).get('15m') or {}).get('trend', {}).get('state', 'n/a')}, {"type": "metric", "label": f"{self._base_symbol()} avail", "value": round(float(self.state.get("base_available") or 0.0), 8)}, {"type": "metric", "label": f"{self.context.counter_currency or 'USD'} avail", "value": round(float(self.state.get("counter_available") or 0.0), 8)}, *([ {"type": "metric", "label": "trend guard active", "value": "on"}, {"type": "text", "label": "trend guard reason", "value": "higher-timeframe trend conflict"}, ] if self.state.get("trend_guard_active") else []), *([ {"type": "text", "label": "config warning", "value": warning}, ] if (warning := self._config_warning()) else []), {"type": "text", "label": "error", "value": self.state.get("last_error", "") or "none"}, {"type": "log", "label": "debug log", "lines": self.state.get("debug_log") or []}, ] }