from __future__ import annotations import time from datetime import datetime, timezone from src.trader_mcp.strategy_sizing import suggest_quote_sized_amount from src.trader_mcp.strategy_sdk import Strategy from src.trader_mcp.logging_utils import log_event class Strategy(Strategy): LABEL = "Grid Trader" STRATEGY_PROFILE = { "expects": { "trend": "none", "volatility": "low", "event_risk": "low", "liquidity": "normal", }, "avoids": { "trend": "strong", "volatility": "expanding", "event_risk": "high", "liquidity": "thin", }, "risk_profile": "medium", "capabilities": ["mean_reversion", "range_harvesting", "two_sided_inventory"], "role": "primary", "inventory_behavior": "balanced", "requires_rebalance_before_start": False, "requires_rebalance_before_stop": False, "safe_when_unbalanced": False, "can_run_with": ["exposure_protector"], } TICK_MINUTES = 0.50 CONFIG_SCHEMA = { "grid_levels": {"type": "int", "default": 6, "min": 1, "max": 20}, "grid_step_pct": {"type": "float", "default": 0.012, "min": 0.001, "max": 0.1}, "volatility_timeframe": {"type": "string", "default": "1h"}, "volatility_multiplier": {"type": "float", "default": 0.5, "min": 0.0, "max": 10.0}, "grid_step_min_pct": {"type": "float", "default": 0.005, "min": 0.0001, "max": 0.5}, "grid_step_max_pct": {"type": "float", "default": 0.03, "min": 0.0001, "max": 1.0}, "inventory_rebalance_step_factor": {"type": "float", "default": 0.15, "min": 0.0, "max": 0.9}, "order_notional_quote": {"type": "float", "default": 0.0, "min": 0.0}, "max_order_notional_quote": {"type": "float", "default": 0.0, "min": 0.0}, "recenter_pct": {"type": "float", "default": 0.05, "min": 0.0, "max": 0.5}, "recenter_atr_multiplier": {"type": "float", "default": 0.35, "min": 0.0, "max": 10.0}, "recenter_min_pct": {"type": "float", "default": 0.0025, "min": 0.0, "max": 0.5}, "recenter_max_pct": {"type": "float", "default": 0.03, "min": 0.0, "max": 0.5}, "trade_sides": {"type": "string", "default": "both"}, "dust_collect": {"type": "bool", "default": False}, "order_call_delay_ms": {"type": "int", "default": 250, "min": 0, "max": 10000}, "debug_orders": {"type": "bool", "default": True}, } STATE_SCHEMA = { "center_price": {"type": "float", "default": 0.0}, "last_price": {"type": "float", "default": 0.0}, "seeded": {"type": "bool", "default": False}, "last_action": {"type": "string", "default": "idle"}, "last_error": {"type": "string", "default": ""}, "cleanup_status": {"type": "string", "default": ""}, "orders": {"type": "list", "default": []}, "order_ids": {"type": "list", "default": []}, "debug_log": {"type": "list", "default": []}, "base_available": {"type": "float", "default": 0.0}, "counter_available": {"type": "float", "default": 0.0}, "grid_step_pct_buy": {"type": "float", "default": 0.0}, "grid_step_pct_sell": {"type": "float", "default": 0.0}, "inventory_skew_side": {"type": "string", "default": "none"}, "inventory_skew_ratio": {"type": "float", "default": 0.5}, "inventory_skew_imbalance": {"type": "float", "default": 0.0}, "inventory_skew_reduction_pct": {"type": "float", "default": 0.0}, "regimes_updated_at": {"type": "string", "default": ""}, "account_snapshot_updated_at": {"type": "string", "default": ""}, "last_balance_log_signature": {"type": "string", "default": ""}, "last_balance_log_at": {"type": "string", "default": ""}, "grid_refresh_pending_until": {"type": "string", "default": ""}, "mismatch_ticks": {"type": "int", "default": 0}, "recovery_cooldown_until": {"type": "string", "default": ""}, } def init(self): return { "center_price": 0.0, "last_price": 0.0, "seeded": False, "last_action": "idle", "last_error": "", "cleanup_status": "", "orders": [], "order_ids": [], "debug_log": ["init cancel all orders"], "base_available": 0.0, "counter_available": 0.0, "grid_step_pct_buy": 0.0, "grid_step_pct_sell": 0.0, "inventory_skew_side": "none", "inventory_skew_ratio": 0.5, "inventory_skew_imbalance": 0.0, "inventory_skew_reduction_pct": 0.0, "regimes_updated_at": "", "account_snapshot_updated_at": "", "last_balance_log_signature": "", "last_balance_log_at": "", "grid_refresh_pending_until": "", "mismatch_ticks": 0, "recovery_cooldown_until": "", } def _log(self, message: str) -> None: state = getattr(self, "state", {}) or {} log = list(state.get("debug_log") or []) log.append(message) state["debug_log"] = log[-12:] self.state = state log_event("grid", message) def _log_decision(self, action: str, **fields) -> None: parts = [action] for key, value in fields.items(): parts.append(f"{key}={value}") self._log(", ".join(parts)) def _set_grid_refresh_pause(self, seconds: float = 30.0) -> None: self.state["grid_refresh_pending_until"] = (datetime.now(timezone.utc).timestamp() + max(seconds, 0.0)) def _grid_refresh_paused(self) -> bool: try: until = float(self.state.get("grid_refresh_pending_until") or 0.0) except Exception: until = 0.0 return until > datetime.now(timezone.utc).timestamp() def _recovery_paused(self) -> bool: try: until = float(self.state.get("recovery_cooldown_until") or 0.0) except Exception: until = 0.0 return until > datetime.now(timezone.utc).timestamp() def _trip_recovery_pause(self, seconds: float = 30.0) -> None: self.state["recovery_cooldown_until"] = (datetime.now(timezone.utc).timestamp() + max(seconds, 0.0)) def _tracked_order_id(self, order: dict | object) -> str: if not isinstance(order, dict): return "" for key in ("bitstamp_order_id", "order_id", "id", "client_order_id"): value = order.get(key) if value is not None and str(value).strip(): return str(value).strip() result = order.get("result") if isinstance(result, dict): for key in ("bitstamp_order_id", "order_id", "id", "client_order_id"): value = result.get(key) if value is not None and str(value).strip(): return str(value).strip() return "" def _drop_tracked_orders(self, order_ids: list[str] | set[str]) -> int: drop_ids = {str(order_id).strip() for order_id in (order_ids or []) if str(order_id).strip()} if not drop_ids: return 0 tracked_orders = list(self.state.get("orders") or []) tracked_ids = [str(order_id).strip() for order_id in (self.state.get("order_ids") or []) if str(order_id).strip()] kept_orders = [ order for order in tracked_orders if self._tracked_order_id(order) not in drop_ids ] kept_ids = [order_id for order_id in tracked_ids if order_id not in drop_ids] removed = len(tracked_ids) - len(kept_ids) self.state["orders"] = kept_orders self.state["order_ids"] = kept_ids self.state["open_order_count"] = len(kept_ids) return max(removed, 0) def _cancel_all_orders_conclusive(self, failure_prefix: str) -> bool: strict_cancel = getattr(self.context, "cancel_all_orders_confirmed", None) if callable(strict_cancel): result = strict_cancel() cancelled_order_ids = result.get("cancelled_order_ids") or [] removed = self._drop_tracked_orders(cancelled_order_ids) cleanup_status = str(result.get("cleanup_status") or ("cleanup_confirmed" if bool(result.get("conclusive")) else "cleanup_partial")) self.state["cleanup_status"] = cleanup_status if removed > 0: self._log(f"cleanup removed tracked orders: ids={cancelled_order_ids}") if bool(result.get("conclusive")): return True error = str(result.get("error") or "cancel-all inconclusive") else: try: self.context.cancel_all_orders() self.state["cleanup_status"] = "cleanup_confirmed" return True except Exception as exc: self.state["cleanup_status"] = "cleanup_failed" error = str(exc) self.state["last_error"] = error self._log(f"{failure_prefix}: {error}") return False def _recover_grid(self, price: float) -> None: self._log(f"recovery mode: cancel all and rebuild from {price}") if not self._cancel_all_orders_conclusive("recovery cancel-all failed"): self.state["last_action"] = "recovery cleanup pending" return self.state["orders"] = [] self.state["order_ids"] = [] self.state["open_order_count"] = 0 self.state["center_price"] = price self.state["seeded"] = True self._place_grid(price) self._sync_open_orders_state() self.state["mismatch_ticks"] = 0 self._trip_recovery_pause() def _order_count_mismatch(self, tracked_ids: list[str], live_orders: list[dict]) -> bool: live_ids = [str(order.get("bitstamp_order_id") or order.get("order_id") or order.get("id") or order.get("client_order_id") or "") for order in live_orders if isinstance(order, dict)] live_ids = [oid for oid in live_ids if oid] if len(live_ids) != len([oid for oid in tracked_ids if oid]): return True return False def _base_symbol(self) -> str: return (self.context.base_currency or self.context.market_symbol or "XRP").split("/")[0].upper() def _market_symbol(self) -> str: return self.context.market_symbol or f"{self._base_symbol().lower()}usd" def _live_fee_rates(self) -> tuple[float, float]: try: payload = self.context.get_fee_rates(self._market_symbol()) maker = float(payload.get("maker") or 0.0) taker = float(payload.get("taker") or 0.0) return maker, taker except Exception as exc: self._log(f"fee lookup failed: {exc}") return 0.0, 0.0 def _live_fee_rate(self) -> float: _maker, taker = self._live_fee_rates() return taker def _mode(self) -> str: return getattr(self.context, "mode", "active") or "active" def apply_policy(self): policy = super().apply_policy() risk = str(policy.get("risk_posture") or "normal").lower() priority = str(policy.get("priority") or "normal").lower() step_map = {"cautious": 0.008, "normal": 0.012, "assertive": 0.018} recenter_map = {"cautious": 0.035, "normal": 0.05, "assertive": 0.07} levels_map = {"cautious": 4, "normal": 6, "assertive": 8} delay_map = {"cautious": 500, "normal": 250, "assertive": 120} if priority in {"low", "background"}: risk = "cautious" elif priority in {"high", "urgent"}: risk = "assertive" self.config["grid_step_pct"] = step_map.get(risk, 0.012) self.config["recenter_pct"] = recenter_map.get(risk, 0.05) if self.config.get("grid_levels") in {None, "", 0}: self.config["grid_levels"] = levels_map.get(risk, 6) else: try: self.config["grid_levels"] = max(int(self.config.get("grid_levels") or 0), 1) except Exception: self.config["grid_levels"] = levels_map.get(risk, 6) self.config["order_call_delay_ms"] = delay_map.get(risk, 250) self.state["policy_derived"] = { "grid_step_pct": self.config["grid_step_pct"], "recenter_pct": self.config["recenter_pct"], "grid_levels": self.config["grid_levels"], "order_call_delay_ms": self.config["order_call_delay_ms"], } return policy def _price(self) -> float: payload = self.context.get_price(self._base_symbol()) return float(payload.get("price") or 0.0) def _regime_snapshot(self) -> dict: timeframes = ["1d", "4h", "1h", "15m"] snapshot = {} for tf in timeframes: try: snapshot[tf] = self.context.get_regime(self._base_symbol(), tf) except Exception as exc: snapshot[tf] = {"error": str(exc)} return snapshot def _refresh_regimes(self) -> None: self.state["regimes"] = self._regime_snapshot() self.state["regimes_updated_at"] = datetime.now(timezone.utc).isoformat() def _recenter_threshold_pct(self) -> float: base_threshold = float(self.config.get("recenter_pct", 0.05) or 0.05) atr_multiplier = float(self.config.get("recenter_atr_multiplier", 0.35) or 0.0) min_threshold = float(self.config.get("recenter_min_pct", 0.0025) or 0.0) max_threshold = float(self.config.get("recenter_max_pct", 0.03) or 1.0) try: tf = str(self.config.get("volatility_timeframe", "1h") or "1h") regime = self.context.get_regime(self._base_symbol(), tf) short_regime = self.context.get_regime(self._base_symbol(), "15m") atr_pct = float((regime or {}).get("volatility", {}).get("atr_percent") or 0.0) short_atr_pct = float((short_regime or {}).get("volatility", {}).get("atr_percent") or 0.0) atr_pct = max(atr_pct, short_atr_pct) except Exception: atr_pct = 0.0 threshold = (atr_pct / 100.0) * atr_multiplier if atr_pct > 0 else base_threshold threshold = max(threshold, min_threshold) threshold = min(threshold, max_threshold) self.state["recenter_pct_live"] = threshold self.state["recenter_atr_percent"] = atr_pct return threshold def _grid_step_pct(self) -> float: base_step = float(self.config.get("grid_step_pct", 0.012) or 0.012) tf = str(self.config.get("volatility_timeframe", "1h") or "1h") multiplier = float(self.config.get("volatility_multiplier", 0.5) or 0.0) min_step = float(self.config.get("grid_step_min_pct", 0.005) or 0.0) max_step = float(self.config.get("grid_step_max_pct", 0.03) or 1.0) try: regime = self.context.get_regime(self._base_symbol(), tf) short_regime = self.context.get_regime(self._base_symbol(), "15m") tf_atr_pct = float((regime or {}).get("volatility", {}).get("atr_percent") or 0.0) atr_pct = float((regime or {}).get("volatility", {}).get("atr_percent") or 0.0) short_atr_pct = float((short_regime or {}).get("volatility", {}).get("atr_percent") or 0.0) atr_pct = max(atr_pct, short_atr_pct) self.state["regimes"] = self._regime_snapshot() except Exception as exc: self._log(f"regime fetch failed: {exc}") tf_atr_pct = 0.0 atr_pct = 0.0 short_atr_pct = 0.0 adaptive = (atr_pct / 100.0) * multiplier if atr_pct > 0 else base_step step = adaptive if atr_pct > 0 else base_step step = max(step, min_step) step = min(step, max_step) self.state["grid_step_pct"] = step self.state["atr_percent_tf"] = tf_atr_pct self.state["atr_percent_15m"] = short_atr_pct self.state["atr_percent"] = atr_pct return step def _inventory_rebalance_profile( self, price: float, *, base_total: float | None = None, quote_total: float | None = None, ) -> dict[str, float | str]: ratio_price = price if price > 0 else float(self.state.get("last_price") or self.state.get("center_price") or 1.0) if base_total is None or quote_total is None: live_orders = list(self.state.get("orders") or []) reserved_quote = sum( float(order.get("price") or 0.0) * float(order.get("amount") or 0.0) for order in live_orders if isinstance(order, dict) and str(order.get("side") or "").lower() == "buy" ) reserved_base = sum( float(order.get("amount") or 0.0) for order in live_orders if isinstance(order, dict) and str(order.get("side") or "").lower() == "sell" ) if base_total is None: base_total = max(float(self.state.get("base_available") or 0.0), 0.0) + reserved_base if quote_total is None: quote_total = max(float(self.state.get("counter_available") or 0.0), 0.0) + reserved_quote # Rebalance bias must see the whole wallet, not only the free slice. base_value = max(float(base_total or 0.0), 0.0) * ratio_price counter_value = max(float(quote_total or 0.0), 0.0) total = base_value + counter_value ratio = base_value / total if total > 0 else 0.5 imbalance = min(abs(ratio - 0.5) * 2.0, 1.0) factor = float(self.config.get("inventory_rebalance_step_factor", 0.15) or 0.0) factor = min(max(factor, 0.0), 0.9) favored_side = "sell" if ratio > 0.5 else "buy" if ratio < 0.5 else "none" reduction = factor * imbalance if favored_side in {"buy", "sell"} else 0.0 self.state["inventory_skew_side"] = favored_side self.state["inventory_skew_ratio"] = ratio self.state["inventory_skew_imbalance"] = imbalance self.state["inventory_skew_reduction_pct"] = reduction return { "ratio": ratio, "imbalance": imbalance, "favored_side": favored_side, "reduction": reduction, } def _effective_grid_steps( self, price: float, *, base_total: float | None = None, quote_total: float | None = None, ) -> dict[str, float | str]: base_step = float(self.state.get("grid_step_pct") or self._grid_step_pct()) min_step = float(self.config.get("grid_step_min_pct", 0.005) or 0.0) profile = self._inventory_rebalance_profile(price, base_total=base_total, quote_total=quote_total) favored_side = str(profile.get("favored_side") or "none") reduction = float(profile.get("reduction") or 0.0) buy_step = base_step sell_step = base_step if favored_side == "buy": buy_step = max(base_step * (1.0 - reduction), min_step) elif favored_side == "sell": sell_step = max(base_step * (1.0 - reduction), min_step) self.state["grid_step_pct_buy"] = buy_step self.state["grid_step_pct_sell"] = sell_step return { "base": base_step, "buy": buy_step, "sell": sell_step, "favored_side": favored_side, "reduction": reduction, "ratio": float(profile.get("ratio") or 0.5), "imbalance": float(profile.get("imbalance") or 0.0), } def _config_warning(self) -> str | None: recenter_pct = float(self.state.get("recenter_pct_live") or self._recenter_threshold_pct()) steps = self._effective_grid_steps(float(self.state.get("last_price") or self.state.get("center_price") or 0.0)) grid_step_pct = min(float(steps.get("buy") or 0.0), float(steps.get("sell") or 0.0)) if grid_step_pct <= 0: return None ratio = recenter_pct / grid_step_pct # If the recenter threshold is too close to the first step, the grid # can keep rebuilding before it has a fair chance to trade. if ratio <= 1.0: return f"warning: recenter threshold ({recenter_pct:.4f}) is <= grid step ({grid_step_pct:.4f}), it may recenter before trading" if ratio < 1.5: return f"warning: recenter threshold ({recenter_pct:.4f}) is only {ratio:.2f}x the grid step ({grid_step_pct:.4f}), consider widening it" return None def _inventory_ratio(self, price: float) -> float: base_value = float(self.state.get("base_available") or 0.0) * price counter_value = float(self.state.get("counter_available") or 0.0) total = base_value + counter_value if total <= 0: return 0.5 return base_value / total def _supervision(self) -> dict: price = float(self.state.get("last_price") or 0.0) ratio = self._inventory_ratio(price if price > 0 else 1.0) step_profile = self._effective_grid_steps(price) last_error = str(self.state.get("last_error") or "") config_warning = self._config_warning() regime_1h = (((self.state.get("regimes") or {}).get("1h") or {}).get("trend") or {}).get("state") center_price = float(self.state.get("center_price") or self.state.get("last_price") or 0.0) if ratio >= 0.88: pressure = "base_side_depleted" elif ratio <= 0.12: pressure = "quote_side_depleted" elif ratio >= 0.65: pressure = "base_heavy" elif ratio <= 0.35: pressure = "quote_heavy" else: pressure = "balanced" if price > 0 and center_price > 0: if price > center_price: market_bias = "bullish" adverse_side = "sell" elif price < center_price: market_bias = "bearish" adverse_side = "buy" else: market_bias = "flat" adverse_side = "unknown" else: market_bias = "unknown" adverse_side = "unknown" open_orders = self.state.get("orders") or [] order_distribution = {"buy": {"count": 0, "notional_quote": 0.0}, "sell": {"count": 0, "notional_quote": 0.0}} adverse_side_nearest_distance_pct = None for order in open_orders: if not isinstance(order, dict): continue side = str(order.get("side") or "").lower() if side not in order_distribution: continue try: order_price = float(order.get("price") or 0.0) amount = float(order.get("amount") or order.get("amount_remaining") or 0.0) except Exception: continue if order_price <= 0 or amount <= 0: continue order_distribution[side]["count"] += 1 order_distribution[side]["notional_quote"] += amount * order_price if side == adverse_side and price > 0: distance_pct = abs(order_price - price) / price * 100.0 if adverse_side_nearest_distance_pct is None or distance_pct < adverse_side_nearest_distance_pct: adverse_side_nearest_distance_pct = distance_pct adverse_count = int(order_distribution.get(adverse_side, {}).get("count") or 0) if adverse_side in order_distribution else 0 adverse_notional = float(order_distribution.get(adverse_side, {}).get("notional_quote") or 0.0) if adverse_side in order_distribution else 0.0 concerns = [] if adverse_side in {"buy", "sell"} and adverse_count > 0: concerns.append(f"{adverse_side} ladder exposed to {market_bias} drift") if pressure in {"base_side_depleted", "quote_side_depleted"}: concerns.append(f"inventory pressure={pressure}") if config_warning: concerns.append(config_warning) side_capacity = { "buy": pressure not in {"quote_side_depleted"}, "sell": pressure not in {"base_side_depleted"}, } return { "health": "degraded" if last_error or config_warning else "healthy", "degraded": bool(last_error or config_warning), "inventory_pressure": pressure, "inventory_ratio": round(ratio, 4), "inventory_rebalance_side": step_profile.get("favored_side", "none"), "inventory_rebalance_reduction_pct": round(float(step_profile.get("reduction") or 0.0) * 100.0, 4), "grid_step_pct": { "base": round(float(step_profile.get("base") or 0.0), 6), "buy": round(float(step_profile.get("buy") or 0.0), 6), "sell": round(float(step_profile.get("sell") or 0.0), 6), }, "capacity_available": pressure == "balanced", "side_capacity": side_capacity, "market_bias": market_bias, "adverse_side": adverse_side, "adverse_side_open_order_count": adverse_count, "adverse_side_open_order_notional_quote": round(adverse_notional, 4), "adverse_side_nearest_distance_pct": round(adverse_side_nearest_distance_pct, 4) if adverse_side_nearest_distance_pct is not None else None, "open_order_distribution": order_distribution, "concerns": concerns, "last_reason": last_error or config_warning or f"base_ratio={ratio:.3f}, trend_1h={regime_1h or 'unknown'}", } def _available_balance(self, asset_code: str) -> float: try: info = self.context.get_account_info() except Exception as exc: self._log(f"account info failed: {exc}") # A failed balance read makes this tick unsuitable for shape decisions. self.state["balance_shape_inconclusive"] = True return 0.0 balances = info.get("balances") if isinstance(info, dict) else [] if not isinstance(balances, list): self.state["balance_shape_inconclusive"] = True return 0.0 wanted = str(asset_code or "").upper() for balance in balances: if not isinstance(balance, dict): continue if str(balance.get("asset_code") or "").upper() != wanted: continue try: return float(balance.get("available") if balance.get("available") is not None else balance.get("total") or 0.0) except Exception: self.state["balance_shape_inconclusive"] = True return 0.0 self.state["balance_shape_inconclusive"] = True return 0.0 def _refresh_balance_snapshot(self) -> bool: try: info = self.context.get_account_info() except Exception as exc: self._log(f"balance refresh failed: {exc}") self.state["balance_shape_inconclusive"] = True return False balances = info.get("balances") if isinstance(info, dict) else [] if not isinstance(balances, list): self.state["balance_shape_inconclusive"] = True return False base = self._base_symbol() quote = self.context.counter_currency or "USD" for balance in balances: if not isinstance(balance, dict): continue asset = str(balance.get("asset_code") or "").upper() try: available = float(balance.get("available") if balance.get("available") is not None else balance.get("total") or 0.0) except Exception: self.state["balance_shape_inconclusive"] = True continue if asset == base: self.state["base_available"] = available if asset == str(quote).upper(): self.state["counter_available"] = available self.state["account_snapshot_updated_at"] = datetime.now(timezone.utc).isoformat() signature = f"{base}:{self.state.get('base_available', 0.0):.8f}|{quote}:{self.state.get('counter_available', 0.0):.8f}" last_signature = str(self.state.get("last_balance_log_signature") or "") last_logged_at = str(self.state.get("last_balance_log_at") or "") now_iso = self.state["account_snapshot_updated_at"] should_log = signature != last_signature or not last_logged_at if not should_log: try: from datetime import datetime as _dt elapsed = (_dt.fromisoformat(now_iso) - _dt.fromisoformat(last_logged_at)).total_seconds() should_log = elapsed >= 60 except Exception: should_log = True if should_log: self.state["last_balance_log_signature"] = signature self.state["last_balance_log_at"] = now_iso self._log_decision( "balance snapshot", base=base, base_available=f"{self.state.get('base_available', 0.0):.6g}", quote=quote, quote_available=f"{self.state.get('counter_available', 0.0):.6g}", updated_at=now_iso, ) return True def _side_allowed(self, side: str) -> bool: selected = str(self.config.get("trade_sides", "both") or "both").strip().lower() if selected == "both": return True return selected == side def _desired_sides(self) -> set[str]: selected = str(self.config.get("trade_sides", "both") or "both").strip().lower() if selected == "both": return {"buy", "sell"} if selected in {"buy", "sell"}: return {selected} return {"buy", "sell"} def _suggest_amount( self, side: str, price: float, levels: int, min_notional: float, *, available_balances: dict[str, float] | None = None, ) -> float: return suggest_quote_sized_amount( self.context, side=side, price=price, levels=levels, min_notional=min_notional, fee_rate=self._live_fee_rate(), order_notional_quote=float(self.config.get("order_notional_quote") or self.config.get("order_size") or 0.0), max_order_notional_quote=float(self.config.get("max_order_notional_quote") or self.config.get("max_notional_per_order") or 0.0), dust_collect=bool(self.config.get("dust_collect", False)), order_size=0.0, available_balances=available_balances, ) def _grid_extreme_price(self, center: float, side: str, levels: int) -> float: step_profile = self._effective_grid_steps(center) step = float(step_profile.get(side) or step_profile.get("base") or 0.0) if center <= 0 or levels <= 0 or step <= 0: return center if side == "buy": return round(center * (1 - (step * levels)), 8) return round(center * (1 + (step * levels)), 8) def _resource_total_for_side(self, side: str, base_total: float, quote_total: float) -> float: if side == "buy": return max(float(quote_total or 0.0), 0.0) return max(float(base_total or 0.0), 0.0) def _resource_cost_for_order(self, side: str, amount: float, price: float, fee_rate: float) -> float: if side == "buy": return max(amount, 0.0) * max(price, 0.0) * (1.0 + max(fee_rate, 0.0)) return max(amount, 0.0) def _inventory_totals_from_live_orders(self, live_orders: list[dict]) -> tuple[float, float]: reserved_quote = sum( float(order.get("price") or 0.0) * float(order.get("amount") or 0.0) for order in live_orders if isinstance(order, dict) and str(order.get("side") or "").lower() == "buy" ) reserved_base = sum( float(order.get("amount") or 0.0) for order in live_orders if isinstance(order, dict) and str(order.get("side") or "").lower() == "sell" ) base_total = max(float(self.state.get("base_available") or 0.0), 0.0) + reserved_base quote_total = max(float(self.state.get("counter_available") or 0.0), 0.0) + reserved_quote return base_total, quote_total def _planned_side_orders( self, side: str, center: float, expected_levels: int, min_notional: float, fee_rate: float, *, step: float, base_total: float, quote_total: float, ) -> dict: empty = {"amount": 0.0, "orders": [], "skipped": []} if not self._side_allowed(side): return empty if expected_levels <= 0 or center <= 0 or step <= 0: return empty base_symbol = self._base_symbol() quote_symbol = str(self.context.counter_currency or "USD").upper() balances = { base_symbol: max(float(base_total or 0.0), 0.0), quote_symbol: max(float(quote_total or 0.0), 0.0), } # Ask the shared sizing layer for a venue-valid amount once, then # walk the ladder outward until we either fill the target or run out. reference_price = round(center * (1 - (step * expected_levels)) if side == "buy" else center * (1 + (step * expected_levels)), 8) amount = self._suggest_amount( side, reference_price, max(expected_levels, 1), min_notional, available_balances=balances, ) if amount <= 0: return empty spendable_total = self._resource_total_for_side(side, base_total, quote_total) * 0.995 total_cost = 0.0 planned_orders = [] skipped = [] max_index = max(expected_levels * 4, expected_levels + 8, 12) for level_index in range(1, max_index + 1): # Skip inner levels that fail min-size, but keep pushing outward. price = round(center * (1 - (step * level_index)) if side == "buy" else center * (1 + (step * level_index)), 8) if price <= 0: break min_size = (min_notional / price) if min_notional > 0 else 0.0 if amount < min_size: skipped.append({"level": level_index, "reason": "below minimum size", "price": price}) if side == "buy": break continue cost = self._resource_cost_for_order(side, amount, price, fee_rate) if total_cost + cost > spendable_total + 1e-9: break total_cost += cost planned_orders.append({"side": side, "price": price, "amount": amount, "level": level_index}) if len(planned_orders) >= expected_levels: break return {"amount": amount, "orders": planned_orders, "skipped": skipped} def _plan_grid(self, center: float, *, base_total: float | None = None, quote_total: float | None = None) -> dict: center = float(center or 0.0) levels = int(self.config.get("grid_levels", 6) or 6) min_notional = float(self.context.minimum_order_value or 0.0) fee_rate = self._live_fee_rate() # One planner feeds both seeding and shape checking, so they never # invent different notions of the "correct" grid. step_profile = self._effective_grid_steps(center, base_total=base_total, quote_total=quote_total) buy_step = float(step_profile.get("buy") or step_profile.get("base") or 0.0) sell_step = float(step_profile.get("sell") or step_profile.get("base") or 0.0) base_total = max(float(self.state.get("base_available") if base_total is None else base_total) or 0.0, 0.0) quote_total = max(float(self.state.get("counter_available") if quote_total is None else quote_total) or 0.0, 0.0) buy_plan = self._planned_side_orders( "buy", center, levels, min_notional, fee_rate, step=buy_step, base_total=base_total, quote_total=quote_total, ) sell_plan = self._planned_side_orders( "sell", center, levels, min_notional, fee_rate, step=sell_step, base_total=base_total, quote_total=quote_total, ) orders = [*buy_plan["orders"], *sell_plan["orders"]] return { "center": center, "buy_orders": buy_plan["orders"], "sell_orders": sell_plan["orders"], "orders": orders, "buy_skipped": buy_plan["skipped"], "sell_skipped": sell_plan["skipped"], "counts": {"buy": len(buy_plan["orders"]), "sell": len(sell_plan["orders"])}, } def _place_grid(self, center: float) -> None: center = self._maybe_refresh_center(center) mode = self._mode() market = self._market_symbol() orders = [] order_ids = [] def _capture_order_id(result): if isinstance(result, dict): return result.get("bitstamp_order_id") or result.get("order_id") or result.get("id") or result.get("client_order_id") return None plan = self._plan_grid(center) for side, skipped in (("buy", plan.get("buy_skipped") or []), ("sell", plan.get("sell_skipped") or [])): for skipped_level in skipped: self._log(f"seed level {skipped_level.get('level')} {side} skipped: {skipped_level.get('reason')}") for planned_order in plan.get("orders") or []: side = str(planned_order.get("side") or "").lower() level = int(planned_order.get("level") or 0) price = float(planned_order.get("price") or 0.0) amount = float(planned_order.get("amount") or 0.0) if price <= 0 or amount <= 0: continue if mode != "active": orders.append({"side": side, "price": price, "amount": amount, "result": {"simulated": True}}) self._log(f"plan level {level}: {side} {price} amount {amount:.6g}") continue try: result = self.context.place_order(side=side, order_type="limit", amount=amount, price=price, market=market) orders.append({"side": side, "price": price, "amount": amount, "result": result}) order_id = _capture_order_id(result) if order_id is not None: order_ids.append(str(order_id)) self._log(f"seed level {level}: {side} {price} amount {amount:.6g}") delay = max(int(self.config.get("order_call_delay_ms", 250) or 0), 0) / 1000.0 if delay > 0: time.sleep(delay) self._refresh_balance_snapshot() except Exception as exc: # best effort for first draft self.state["last_error"] = str(exc) self._log(f"seed level {level} {side} failed: {exc}") self.state["orders"] = orders self.state["order_ids"] = order_ids self.state["last_action"] = "seeded grid" self._set_grid_refresh_pause() def _current_market_anchor(self, fallback: float = 0.0) -> float: try: live_price = float(self._price() or 0.0) except Exception as exc: self._log(f"live price refresh failed during rebuild: {exc}") live_price = 0.0 return live_price if live_price > 0 else fallback def _recenter_and_rebuild_from_fill(self, fill_price: float, market_price: float = 0.0) -> None: """Treat a fill as a forced re-anchor and rebuild from the latest market price.""" anchor_price = self._current_market_anchor(market_price or fill_price) if anchor_price <= 0: return self._log(f"fill rebuild anchor resolved: fill={fill_price} market={anchor_price}") self._recenter_and_rebuild_from_price(anchor_price, "fill rebuild") def _recenter_and_rebuild_from_price(self, price: float, reason: str) -> None: if price <= 0: return current = float(self.state.get("center_price") or 0.0) self._log(f"{reason}: recenter from {current} to {price}") if not self._cancel_all_orders_conclusive(f"{reason} cancel-all failed"): self.state["last_action"] = f"{reason} cleanup pending" return # Give the exchange a moment to release balance before we rebuild. time.sleep(3.0) self._refresh_balance_snapshot() self.state["center_price"] = price self.state["seeded"] = True self._place_grid(price) # Use the freshly placed live orders as the tracked snapshot so the # next tick compares against the rebuilt grid, not the pre-rebuild set. self._sync_open_orders_state() self._refresh_balance_snapshot() self._set_grid_refresh_pause() def on_stop(self): self._log("stopping: cancel all open orders") if self._cancel_all_orders_conclusive("stop cancel-all failed"): self.state["orders"] = [] self.state["order_ids"] = [] self.state["open_order_count"] = 0 self.state["last_action"] = "stopped" else: self.state["last_action"] = "stop cleanup pending" def _maybe_refresh_center(self, price: float) -> float: if price <= 0: return price current = float(self.state.get("center_price") or 0.0) if current <= 0: self.state["center_price"] = price return price deviation = abs(price - current) / current if current else 0.0 threshold = self._recenter_threshold_pct() if deviation >= threshold: self._log(f"recenter anchor from {current} to {price} dev={deviation:.4f} threshold={threshold:.4f}") self.state["center_price"] = price return price return current def _sync_open_orders_state(self) -> list[dict]: try: open_orders = self.context.get_open_orders() except Exception as exc: self.state["last_error"] = str(exc) self._log(f"open orders sync failed: {exc}") return [] if not isinstance(open_orders, list): open_orders = [] live_orders = [order for order in open_orders if isinstance(order, dict)] live_ids = [str(order.get("bitstamp_order_id") or order.get("order_id") or order.get("id") or order.get("client_order_id") or "") for order in live_orders] live_ids = [oid for oid in live_ids if oid] live_sides = [str(order.get("side") or "").lower() for order in live_orders] self.state["orders"] = live_orders self.state["order_ids"] = live_ids self.state["open_order_count"] = len(live_ids) self._log(f"sync live orders: count={len(live_ids)} sides={live_sides} ids={live_ids}") return live_orders def _cancel_orders(self, order_ids) -> None: for order_id in order_ids or []: self._log(f"dropping stale order {order_id} from state") def _reconcile_after_sync(self, previous_orders: list[dict], live_orders: list[dict], desired_sides: set[str], price: float) -> tuple[list[dict], list[str], int]: live_ids = list(self.state.get("order_ids") or []) open_order_count = len(live_ids) if self._mode() != "active": return live_orders, live_ids, open_order_count previous_ids = { str(order.get("bitstamp_order_id") or order.get("order_id") or order.get("id") or order.get("client_order_id") or "") for order in previous_orders if isinstance(order, dict) } current_ids = { str(order.get("bitstamp_order_id") or order.get("order_id") or order.get("id") or order.get("client_order_id") or "") for order in live_orders if isinstance(order, dict) } vanished_orders = [ order for order in previous_orders if isinstance(order, dict) and str(order.get("bitstamp_order_id") or order.get("order_id") or order.get("id") or order.get("client_order_id") or "") in (previous_ids - current_ids) ] if vanished_orders and not self._grid_refresh_paused(): for order in vanished_orders: order_id = str(order.get("bitstamp_order_id") or order.get("order_id") or order.get("id") or order.get("client_order_id") or "") if not order_id: continue try: payload = self.context.query_order(order_id) except Exception as exc: self._log(f"order status query failed for {order_id}: {exc}") continue raw = payload.get("raw") if isinstance(payload, dict) else {} if not isinstance(raw, dict): raw = {} status = str(payload.get("status") or raw.get("status") or order.get("status") or "").strip().lower() if status in {"finished", "filled", "closed"}: fill_price = 0.0 for candidate in (raw.get("price"), order.get("price"), price): try: fill_price = float(candidate or 0.0) except Exception: fill_price = 0.0 if fill_price > 0: break if fill_price > 0: self._log(f"filled order {order_id} detected via exec status={status}, recentering from fill={fill_price} market={price}") self._recenter_and_rebuild_from_fill(fill_price, price) live_orders = self._sync_open_orders_state() live_ids = list(self.state.get("order_ids") or []) open_order_count = len(live_ids) return live_orders, live_ids, open_order_count if status in {"cancelled", "expired", "missing"}: self._log(f"vanished order {order_id} resolved as {status}") continue return live_orders, live_ids, open_order_count def on_tick(self, tick): previous_orders = list(self.state.get("orders") or []) tracked_ids_before_sync = list(self.state.get("order_ids") or []) rebuild_done = False self.state["balance_shape_inconclusive"] = False balance_refresh_ok = self._refresh_balance_snapshot() price = self._price() self.state["last_price"] = price self.state["last_error"] = "" self._refresh_regimes() try: live_orders = self._sync_open_orders_state() live_ids = list(self.state.get("order_ids") or []) open_order_count = len(live_ids) expected_ids = [str(oid) for oid in tracked_ids_before_sync if oid] stale_ids = [] missing_ids = [] except Exception as exc: open_order_count = -1 live_orders = [] live_ids = [] expected_ids = [] stale_ids = [] missing_ids = [] self.state["last_error"] = str(exc) self._log(f"open orders check failed: {exc}") self.state["open_order_count"] = open_order_count if not balance_refresh_ok: self._log("balance refresh unavailable, skipping rebuild checks this tick") self.state["last_action"] = "hold" return {"action": "hold", "price": price, "reason": "balance refresh unavailable"} desired_sides = self._desired_sides() mode = self._mode() if mode != "active": cleanup_pending = False if open_order_count > 0: self._log("observe mode: cancel all open orders") if self._cancel_all_orders_conclusive("observe cancel failed"): self.state["orders"] = [] self.state["order_ids"] = [] self.state["open_order_count"] = 0 else: cleanup_pending = True if not self.state.get("seeded") or not self.state.get("center_price"): self.state["center_price"] = price self.state["seeded"] = True self.state["last_action"] = "observe cleanup pending" if cleanup_pending else "observe monitor" self._log(f"observe at {price} dev 0.0000") return {"action": "observe", "price": price, "deviation": 0.0, "cleanup_pending": cleanup_pending} center = float(self.state.get("center_price") or price) recenter_pct = float(self.config.get("recenter_pct", 0.05) or 0.05) deviation = abs(price - center) / center if center else 0.0 if deviation >= recenter_pct: self.state["center_price"] = price self.state["last_action"] = "observe cleanup pending" if cleanup_pending else "observe monitor" self._log(f"observe at {price} dev {deviation:.4f}") return {"action": "observe", "price": price, "deviation": deviation, "cleanup_pending": cleanup_pending} self.state["last_action"] = "observe cleanup pending" if cleanup_pending else "observe monitor" self._log(f"observe at {price} dev {deviation:.4f}") return {"action": "observe", "price": price, "deviation": deviation, "cleanup_pending": cleanup_pending} if stale_ids: self._log(f"stale live orders: {stale_ids}") self._cancel_orders(stale_ids) live_ids = [oid for oid in live_ids if oid not in stale_ids] if missing_ids: self._log(f"missing tracked orders: {missing_ids}") self.state["order_ids"] = live_ids missing_tracked = bool(set(expected_ids) - set(live_ids)) center = self._maybe_refresh_center(float(self.state.get("center_price") or price)) recenter_pct = self._recenter_threshold_pct() deviation = abs(price - center) / center if center else 0.0 if mode == "active" and deviation >= recenter_pct and not self._grid_refresh_paused(): if rebuild_done: return {"action": "hold", "price": price} self._log(f"recenter needed at price={price} center={center} dev={deviation:.4f} threshold={recenter_pct:.4f}") rebuild_done = True self._recenter_and_rebuild_from_price(price, "recenter") live_orders = self._sync_open_orders_state() live_ids = list(self.state.get("order_ids") or []) open_order_count = len(live_ids) self.state["last_action"] = "recentered" return {"action": "recenter", "price": price, "deviation": deviation} live_orders, live_ids, open_order_count = self._reconcile_after_sync(previous_orders, live_orders, desired_sides, price) if self._grid_refresh_paused(): mode = self._mode() self.state["last_action"] = "hold" if mode == "active" else f"{mode} monitor" self._log(f"grid refresh paused, holding at {price} dev {deviation:.4f}") return {"action": "hold" if mode == "active" else "plan", "price": price, "deviation": deviation, "refresh_paused": True} if desired_sides != {"buy", "sell"}: self._log("single-side mode is disabled for this strategy, forcing full-grid rebuilds only") current_buy = sum(1 for order in live_orders if isinstance(order, dict) and str(order.get("side") or "").lower() == "buy") current_sell = sum(1 for order in live_orders if isinstance(order, dict) and str(order.get("side") or "").lower() == "sell") total_base, total_quote = self._inventory_totals_from_live_orders(live_orders) planned_grid = self._plan_grid(center, base_total=total_base, quote_total=total_quote) target_buy = int((planned_grid.get("counts") or {}).get("buy") or 0) target_sell = int((planned_grid.get("counts") or {}).get("sell") or 0) balance_shape_inconclusive = bool(self.state.get("balance_shape_inconclusive")) # Shape means side counts here. Exact ids are handled by the tracked-order path below. grid_not_as_expected = current_buy != target_buy or current_sell != target_sell if balance_shape_inconclusive: self._log("balance info not conclusive, skipping grid shape rebuild checks this tick") elif grid_not_as_expected: if rebuild_done: return {"action": "hold", "price": price} self._log( f"grid shape mismatch, rebuilding full grid: live_buy={current_buy} live_sell={current_sell} target_buy={target_buy} target_sell={target_sell}" ) rebuild_done = True self.state["center_price"] = price self._recenter_and_rebuild_from_price(price, "grid shape rebuild") live_orders = self._sync_open_orders_state() mode = self._mode() self.state["last_action"] = "reseeded" if mode == "active" else f"{mode} monitor" return {"action": "reseed" if mode == "active" else "plan", "price": price} if not balance_shape_inconclusive and self._order_count_mismatch(tracked_ids_before_sync, live_orders): if rebuild_done: return {"action": "hold", "price": price} self._log(f"grid mismatch detected, rebuilding full grid: tracked={len(tracked_ids_before_sync)} live={len(live_orders)}") rebuild_done = True self.state["center_price"] = price self._recenter_and_rebuild_from_price(price, "grid mismatch rebuild") live_orders = self._sync_open_orders_state() mode = self._mode() self.state["last_action"] = "reseeded" if mode == "active" else f"{mode} monitor" return {"action": "reseed" if mode == "active" else "plan", "price": price} if (not self.state.get("seeded") or not self.state.get("center_price")) and not self._grid_refresh_paused(): self.state["center_price"] = price self._place_grid(price) live_orders = self._sync_open_orders_state() self.state["seeded"] = True self._set_grid_refresh_pause() mode = self._mode() self._log(f"{'seeded' if mode == 'active' else 'planned'} grid at {price}") return {"action": "seed" if mode == "active" else "plan", "price": price} if not balance_shape_inconclusive and ((open_order_count == 0) or missing_tracked): if rebuild_done: return {"action": "hold", "price": price} self._log("missing tracked order(s), rebuilding full grid") rebuild_done = True self.state["center_price"] = price self._recenter_and_rebuild_from_price(price, "missing order rebuild") live_orders = self._sync_open_orders_state() mode = self._mode() self.state["last_action"] = "reseeded" if mode == "active" else f"{mode} monitor" return {"action": "reseed" if mode == "active" else "plan", "price": price} mode = self._mode() self.state["last_action"] = "hold" if mode == "active" else f"{mode} monitor" self._log(f"hold at {price} dev {deviation:.4f}") return {"action": "hold" if mode == "active" else "plan", "price": price, "deviation": deviation} def report(self): snapshot = self.context.get_strategy_snapshot() if hasattr(self.context, "get_strategy_snapshot") else {} supervision = self._supervision() warnings = [w for w in [self._config_warning(), *(supervision.get("concerns") or [])] if w] return { "identity": snapshot.get("identity", {}), "control": snapshot.get("control", {}), "fit": dict(getattr(self, "STRATEGY_PROFILE", {}) or {}), "position": { "balances": { "base_available": self.state.get("base_available", 0.0), "counter_available": self.state.get("counter_available", 0.0), }, "open_orders": self.state.get("orders") or [], "exposure": "grid", }, "state": { "center_price": self.state.get("center_price", 0.0), "last_price": self.state.get("last_price", 0.0), "last_action": self.state.get("last_action", "idle"), "open_order_count": self.state.get("open_order_count", 0), "grid_step_pct": self.state.get("grid_step_pct", 0.0), "grid_step_pct_buy": self.state.get("grid_step_pct_buy", 0.0), "grid_step_pct_sell": self.state.get("grid_step_pct_sell", 0.0), "inventory_skew_side": self.state.get("inventory_skew_side", "none"), "inventory_skew_ratio": self.state.get("inventory_skew_ratio", 0.5), "inventory_skew_reduction_pct": self.state.get("inventory_skew_reduction_pct", 0.0), "regimes_updated_at": self.state.get("regimes_updated_at", ""), }, "assessment": { "confidence": None, "uncertainty": None, "reason": "structure-based grid management", "warnings": warnings, "policy": dict(self.config.get("policy") or {}), }, "execution": snapshot.get("execution", {}), "supervision": supervision, } def render(self): # Refresh the market-derived display values on render so the dashboard # reflects the same inputs the strategy would use on the next tick. live_step_pct = float(self.state.get("grid_step_pct") or 0.0) live_buy_step_pct = float(self.state.get("grid_step_pct_buy") or 0.0) live_sell_step_pct = float(self.state.get("grid_step_pct_sell") or 0.0) live_atr_pct = float(self.state.get("atr_percent") or 0.0) try: self._refresh_balance_snapshot() live_step_pct = self._grid_step_pct() step_profile = self._effective_grid_steps(float(self.state.get("last_price") or self.state.get("center_price") or 0.0)) live_buy_step_pct = float(step_profile.get("buy") or live_step_pct) live_sell_step_pct = float(step_profile.get("sell") or live_step_pct) live_atr_pct = float(self.state.get("atr_percent") or live_atr_pct) except Exception as exc: self._log(f"render refresh failed: {exc}") return { "widgets": [ {"type": "metric", "label": "market", "value": self._market_symbol()}, {"type": "metric", "label": "center", "value": round(float(self.state.get("center_price") or 0.0), 6)}, {"type": "metric", "label": "last price", "value": round(float(self.state.get("last_price") or 0.0), 6)}, {"type": "metric", "label": "state", "value": self.state.get("last_action", "idle")}, {"type": "metric", "label": "orders", "value": len(self.state.get("orders") or [])}, {"type": "metric", "label": "open orders", "value": self.state.get("open_order_count", 0)}, {"type": "metric", "label": f"ATR({self.config.get('volatility_timeframe', '1h')}) %", "value": round(live_atr_pct, 4)}, {"type": "metric", "label": "grid step %", "value": round(live_step_pct * 100.0, 4)}, {"type": "metric", "label": "buy step %", "value": round(live_buy_step_pct * 100.0, 4)}, {"type": "metric", "label": "sell step %", "value": round(live_sell_step_pct * 100.0, 4)}, {"type": "metric", "label": "rebalance bias", "value": self.state.get("inventory_skew_side", "none")}, {"type": "metric", "label": "1d", "value": ((self.state.get('regimes') or {}).get('1d') or {}).get('trend', {}).get('state', 'n/a')}, {"type": "metric", "label": "4h", "value": ((self.state.get('regimes') or {}).get('4h') or {}).get('trend', {}).get('state', 'n/a')}, {"type": "metric", "label": "1h", "value": ((self.state.get('regimes') or {}).get('1h') or {}).get('trend', {}).get('state', 'n/a')}, {"type": "metric", "label": "15m", "value": ((self.state.get('regimes') or {}).get('15m') or {}).get('trend', {}).get('state', 'n/a')}, {"type": "metric", "label": f"{self._base_symbol()} avail", "value": round(float(self.state.get("base_available") or 0.0), 8)}, {"type": "metric", "label": f"{self.context.counter_currency or 'USD'} avail", "value": round(float(self.state.get("counter_available") or 0.0), 8)}, *([ {"type": "text", "label": "config warning", "value": warning}, ] if (warning := self._config_warning()) else []), {"type": "text", "label": "error", "value": self.state.get("last_error", "") or "none"}, {"type": "log", "label": "debug log", "lines": self.state.get("debug_log") or []}, ] }