Explorar o código

Improve grid fill handling and heartbeat logs

Lukas Goldschmidt hai 1 mes
pai
achega
a14865c3d8
Modificáronse 2 ficheiros con 124 adicións e 21 borrados
  1. 123 21
      strategies/grid_trader.py
  2. 1 0
      strategies/stop_loss_trader.py

+ 123 - 21
strategies/grid_trader.py

@@ -9,12 +9,6 @@ from src.trader_mcp.strategy_sdk import Strategy
 class Strategy(Strategy):
     LABEL = "Grid Trader"
     TICK_MINUTES = 0.2
-    # NOTE:
-    # This strategy is currently using a protective workaround for stale order state,
-    # because exec-mcp can temporarily report order records that do not reflect the
-    # clean post-reset strategy state. The grid prefers its own fresh persisted state
-    # first, so the real exchange behavior stays testable while exec-mcp is improved.
-    # Expect the reconciliation behavior to change again once exec-mcp is fixed.
     CONFIG_SCHEMA = {
         "grid_levels": {"type": "int", "default": 6, "min": 1, "max": 20},
         "grid_step_pct": {"type": "float", "default": 0.012, "min": 0.001, "max": 0.1},
@@ -48,6 +42,7 @@ class Strategy(Strategy):
         "trend_guard_active": {"type": "bool", "default": False},
         "regimes_updated_at": {"type": "string", "default": ""},
         "account_snapshot_updated_at": {"type": "string", "default": ""},
+        "grid_refresh_pending_until": {"type": "string", "default": ""},
     }
 
     def init(self):
@@ -65,6 +60,7 @@ class Strategy(Strategy):
             "trend_guard_active": False,
             "regimes_updated_at": "",
             "account_snapshot_updated_at": "",
+            "grid_refresh_pending_until": "",
         }
 
     def _log(self, message: str) -> None:
@@ -74,6 +70,16 @@ class Strategy(Strategy):
         state["debug_log"] = log[-12:]
         self.state = state
 
+    def _set_grid_refresh_pause(self, seconds: float = 30.0) -> None:
+        self.state["grid_refresh_pending_until"] = (datetime.now(timezone.utc).timestamp() + max(seconds, 0.0))
+
+    def _grid_refresh_paused(self) -> bool:
+        try:
+            until = float(self.state.get("grid_refresh_pending_until") or 0.0)
+        except Exception:
+            until = 0.0
+        return until > datetime.now(timezone.utc).timestamp()
+
     def _base_symbol(self) -> str:
         return (self.context.base_currency or self.context.market_symbol or "XRP").split("/")[0].upper()
 
@@ -326,6 +332,7 @@ class Strategy(Strategy):
         self.state["orders"] = orders
         self.state["order_ids"] = order_ids
         self.state["last_action"] = "seeded grid"
+        self._set_grid_refresh_pause()
 
     def _place_side_grid(self, side: str, center: float, *, start_level: int = 1) -> None:
         levels = int(self.config.get("grid_levels", 6) or 6)
@@ -394,6 +401,7 @@ class Strategy(Strategy):
         self.state["orders"] = orders
         self.state["order_ids"] = order_ids
         self._log(f"side {side} placement complete: tracked_ids={order_ids}")
+        self._set_grid_refresh_pause()
 
     def _top_up_missing_levels(self, center: float, live_orders: list[dict]) -> None:
         target_levels = int(self.config.get("grid_levels", 6) or 6)
@@ -428,6 +436,87 @@ class Strategy(Strategy):
                 self._log(f"cancel obsolete {side} order {order_id} failed: {exc}")
         return removed
 
+    def _cancel_surplus_side_orders(self, open_orders: list[dict], target_levels: int) -> list[str]:
+        removed: list[str] = []
+        if target_levels <= 0:
+            return removed
+        for side in ("buy", "sell"):
+            side_orders = [order for order in open_orders if isinstance(order, dict) and str(order.get("side") or "").lower() == side]
+            if len(side_orders) <= target_levels:
+                continue
+            surplus = side_orders[target_levels:]
+            for order in surplus:
+                order_id = str(order.get("bitstamp_order_id") or order.get("order_id") or order.get("id") or order.get("client_order_id") or "")
+                if not order_id:
+                    continue
+                try:
+                    self.context.cancel_order(order_id)
+                    removed.append(order_id)
+                    self._log(f"cancelled surplus {side} order {order_id}")
+                except Exception as exc:
+                    self.state["last_error"] = str(exc)
+                    self._log(f"cancel surplus {side} order {order_id} failed: {exc}")
+        return removed
+
+    def _cancel_duplicate_level_orders(self, open_orders: list[dict]) -> list[str]:
+        removed: list[str] = []
+        seen: set[tuple[str, str]] = set()
+        for order in open_orders:
+            if not isinstance(order, dict):
+                continue
+            side = str(order.get("side") or "").lower()
+            try:
+                price_key = f"{float(order.get('price') or 0.0):.8f}"
+            except Exception:
+                price_key = str(order.get("price") or "")
+            key = (side, price_key)
+            order_id = str(order.get("bitstamp_order_id") or order.get("order_id") or order.get("id") or order.get("client_order_id") or "")
+            if not order_id:
+                continue
+            if key in seen:
+                try:
+                    self.context.cancel_order(order_id)
+                    removed.append(order_id)
+                    self._log(f"cancelled duplicate {side} level order {order_id} price={price_key}")
+                except Exception as exc:
+                    self.state["last_error"] = str(exc)
+                    self._log(f"cancel duplicate {side} order {order_id} failed: {exc}")
+                continue
+            seen.add(key)
+        return removed
+
+    def _place_replacement_orders(self, vanished_orders: list[dict], price_hint: float) -> list[str]:
+        placed: list[str] = []
+        if not vanished_orders:
+            return placed
+        market = self._market_symbol()
+        for order in vanished_orders:
+            if not isinstance(order, dict):
+                continue
+            side = str(order.get("side") or "").lower()
+            opposite = "sell" if side == "buy" else "buy" if side == "sell" else ""
+            if not opposite:
+                continue
+            try:
+                amount = float(order.get("amount") or 0.0)
+                price = float(order.get("price") or price_hint or 0.0)
+            except Exception:
+                continue
+            if amount <= 0 or price <= 0:
+                continue
+            try:
+                self._log(f"replace filled {side} order with {opposite}: price={price} amount={amount:.6g}")
+                result = self.context.place_order(side=opposite, order_type="limit", amount=amount, price=price, market=market)
+                order_id = None
+                if isinstance(result, dict):
+                    order_id = result.get("bitstamp_order_id") or result.get("order_id") or result.get("id") or result.get("client_order_id")
+                if order_id is not None:
+                    placed.append(str(order_id))
+            except Exception as exc:
+                self.state["last_error"] = str(exc)
+                self._log(f"replacement order failed for {side}→{opposite} at {price}: {exc}")
+        return placed
+
     def _sync_open_orders_state(self) -> list[dict]:
         try:
             open_orders = self.context.get_open_orders()
@@ -455,6 +544,7 @@ class Strategy(Strategy):
             self._log(f"dropping stale order {order_id} from state")
 
     def on_tick(self, tick):
+        previous_orders = list(self.state.get("orders") or [])
         self._refresh_balance_snapshot()
         price = self._price()
         self.state["last_price"] = price
@@ -478,16 +568,6 @@ class Strategy(Strategy):
             self.state["last_error"] = str(exc)
             self._log(f"open orders check failed: {exc}")
 
-        # Workaround: after a reset, trust the fresh strategy state first.
-        # This prevents stale exec-mcp records from blocking the next clean test.
-        if not (self.state.get("order_ids") or []):
-            live_orders = []
-            live_ids = []
-            open_order_count = 0
-            expected_ids = []
-            stale_ids = []
-            missing_ids = []
-
         self.state["open_order_count"] = open_order_count
         desired_sides = self._desired_sides()
 
@@ -541,6 +621,28 @@ class Strategy(Strategy):
             live_ids = list(self.state.get("order_ids") or [])
             open_order_count = len(live_ids)
 
+        previous_ids = {str(order.get("bitstamp_order_id") or order.get("order_id") or order.get("id") or order.get("client_order_id") or "") for order in previous_orders if isinstance(order, dict)}
+        current_ids = {str(order.get("bitstamp_order_id") or order.get("order_id") or order.get("id") or order.get("client_order_id") or "") for order in live_orders if isinstance(order, dict)}
+        vanished_orders = [order for order in previous_orders if isinstance(order, dict) and str(order.get("bitstamp_order_id") or order.get("order_id") or order.get("id") or order.get("client_order_id") or "") in (previous_ids - current_ids)]
+        if vanished_orders and self._mode() == "active" and not self._grid_refresh_paused():
+            replaced_ids = self._place_replacement_orders(vanished_orders, price)
+            if replaced_ids:
+                live_orders = self._sync_open_orders_state()
+                live_ids = list(self.state.get("order_ids") or [])
+                open_order_count = len(live_ids)
+
+        surplus_cancelled = self._cancel_surplus_side_orders(live_orders, int(self.config.get("grid_levels", 6) or 6))
+        if surplus_cancelled:
+            live_orders = self._sync_open_orders_state()
+            live_ids = list(self.state.get("order_ids") or [])
+            open_order_count = len(live_ids)
+
+        duplicate_cancelled = self._cancel_duplicate_level_orders(live_orders)
+        if duplicate_cancelled:
+            live_orders = self._sync_open_orders_state()
+            live_ids = list(self.state.get("order_ids") or [])
+            open_order_count = len(live_ids)
+
         if desired_sides != {"buy", "sell"}:
             current_sides = {str(order.get("side") or "").lower() for order in live_orders if isinstance(order, dict)}
             missing_side = next((side for side in desired_sides if side not in current_sides), None)
@@ -556,21 +658,21 @@ class Strategy(Strategy):
             current_sides = {str(order.get("side") or "").lower() for order in live_orders if isinstance(order, dict)}
             missing_sides = [side for side in ("buy", "sell") if side not in current_sides]
             reconciled_sides: list[str] = []
-            if missing_sides and self.state.get("center_price"):
+            if missing_sides and self.state.get("center_price") and not self._grid_refresh_paused():
                 for side in missing_sides:
                     self._log(f"adding missing {side} side after trade_sides change, live_sides={sorted(current_sides)} live_ids={live_ids}")
                     self._place_side_grid(side, float(self.state.get("center_price") or price))
                     reconciled_sides.append(side)
                 live_orders = self._sync_open_orders_state()
                 self._log(f"post-add sync: open_order_count={self.state.get('open_order_count', 0)} live_ids={self.state.get('order_ids') or []}")
-            if live_orders and self.state.get("center_price"):
+            if live_orders and self.state.get("center_price") and not self._grid_refresh_paused():
                 self._top_up_missing_levels(float(self.state.get("center_price") or price), live_orders)
                 live_orders = self._sync_open_orders_state()
                 if reconciled_sides:
                     self.state["last_action"] = f"reconciled {','.join(reconciled_sides)}"
                     return {"action": "reconcile", "price": price, "side": ",".join(reconciled_sides)}
 
-        if not self.state.get("seeded") or not self.state.get("center_price"):
+        if (not self.state.get("seeded") or not self.state.get("center_price")) and not self._grid_refresh_paused():
             self.state["center_price"] = price
             self._place_grid(price)
             live_orders = self._sync_open_orders_state()
@@ -579,7 +681,7 @@ class Strategy(Strategy):
             self._log(f"{'seeded' if mode == 'active' else 'planned'} grid at {price}")
             return {"action": "seed" if mode == "active" else "plan", "price": price}
 
-        if open_order_count == 0 or (expected_ids and not set(expected_ids).intersection(set(live_ids))):
+        if (open_order_count == 0 or (expected_ids and not set(expected_ids).intersection(set(live_ids)))) and not self._grid_refresh_paused():
             self._log("no open orders, reseeding grid")
             self.state["center_price"] = price
             self._place_grid(price)
@@ -592,7 +694,7 @@ class Strategy(Strategy):
         recenter_pct = float(self.config.get("recenter_pct", 0.05) or 0.05)
         deviation = abs(price - center) / center if center else 0.0
 
-        if deviation >= recenter_pct:
+        if deviation >= recenter_pct and not self._grid_refresh_paused():
             try:
                 self.context.cancel_all_orders()
             except Exception as exc:

+ 1 - 0
strategies/stop_loss_trader.py

@@ -161,6 +161,7 @@ class Strategy(Strategy):
 
     def on_tick(self, tick):
         self.state["last_error"] = ""
+        self._log(f"tick alive price={self.state.get('last_price') or 0.0}")
         self._refresh_balance_snapshot()
         self._refresh_regimes()
         price = self._price()