from __future__ import annotations from dataclasses import dataclass, field from typing import Any from .exec_client import list_open_orders, query_order, cancel_all_orders, cancel_order, place_order, get_account_info, get_account_fees from .news_client import call_news_tool from .crypto_client import call_crypto_tool def _order_ref(payload: Any) -> str | None: if not isinstance(payload, dict): return None for key in ("order_id", "bitstamp_order_id", "id", "client_order_id"): value = payload.get(key) if value is None: continue text = str(value).strip() if text: return text return None def _cancelled_order_ids(cancel_result: Any) -> list[str]: if not isinstance(cancel_result, dict): return [] rows = cancel_result.get("cancelled") if not isinstance(rows, list): return [] cancelled: list[str] = [] for row in rows: if not isinstance(row, dict) or not bool(row.get("ok")): continue order_id = _order_ref(row) if order_id is not None: cancelled.append(order_id) return cancelled def _has_inconclusive_cancel_rows(cancel_result: Any) -> bool: if not isinstance(cancel_result, dict): return False rows = cancel_result.get("cancelled") if not isinstance(rows, list): return False return any(isinstance(row, dict) and not bool(row.get("ok")) for row in rows) @dataclass(frozen=True) class StrategyContext: id: str account_id: str client_id: str | None = field(default=None, repr=False) mode: str = "off" market_symbol: str | None = None base_currency: str | None = None counter_currency: str | None = None minimum_order_value: float | None = None def __getattr__(self, name: str): if name == "mode": return "active" raise AttributeError(name) def get_open_orders(self) -> Any: payload = list_open_orders(self.account_id, self.client_id) if isinstance(payload, dict) and isinstance(payload.get("orders"), list): return payload["orders"] return payload def query_order(self, order_id: str) -> Any: return query_order(self.account_id, order_id) def cancel_all_orders(self) -> Any: return cancel_all_orders(self.account_id, self.client_id) def cancel_all_orders_confirmed(self) -> dict[str, Any]: cancel_result = None cancel_error = None verification_error = None remaining_orders = None try: cancel_result = self.cancel_all_orders() except Exception as exc: cancel_error = str(exc) try: payload = self.get_open_orders() if isinstance(payload, list): remaining_orders = payload else: verification_error = f"unexpected open orders payload type: {type(payload).__name__}" except Exception as exc: verification_error = str(exc) cancelled_order_ids = _cancelled_order_ids(cancel_result) inconclusive_cancel = cancel_error is not None or _has_inconclusive_cancel_rows(cancel_result) conclusive = ( not inconclusive_cancel and remaining_orders is not None and len(remaining_orders) == 0 ) cleanup_status = "cleanup_confirmed" if conclusive else ("cleanup_failed" if cancel_error else "cleanup_partial") error = None if not conclusive: if cancel_error: error = cancel_error elif _has_inconclusive_cancel_rows(cancel_result): error = "cancel-all reported uncancelled orders" elif verification_error: error = verification_error elif remaining_orders is not None: error = f"{len(remaining_orders)} open orders remain after cancel-all" else: error = "open order verification unavailable after cancel-all" return { "ok": cancel_error is None and verification_error is None and conclusive, "conclusive": conclusive, "cleanup_status": cleanup_status, "cancelled_order_ids": cancelled_order_ids, "remaining_orders": remaining_orders, "cancel_result": cancel_result, "cancel_error": cancel_error, "verification_error": verification_error, "error": error, } def cancel_order(self, order_id: str) -> Any: return cancel_order(self.account_id, order_id) def place_order(self, **kwargs: Any) -> Any: mode = getattr(self, "mode", "active") if mode != "active": raise RuntimeError(f"place_order not allowed in {mode} mode") kwargs.setdefault("account_id", self.account_id) kwargs.setdefault("client_id", self.client_id) return place_order(kwargs) def get_price(self, symbol: str) -> Any: return call_crypto_tool("get_price", {"symbol": symbol}) def get_regime(self, symbol: str, timeframe: str = "1h") -> Any: return call_crypto_tool("get_regime", {"symbol": symbol, "timeframe": timeframe}) def get_account_info(self) -> Any: return get_account_info(self.account_id) def get_balance_snapshot(self) -> dict[str, Any]: info = self.get_account_info() balances = info.get("balances") if isinstance(info, dict) else [] if not isinstance(balances, list): balances = [] return { "account_id": self.account_id, "market_symbol": self.market_symbol, "base_currency": self.base_currency, "counter_currency": self.counter_currency, "balances": balances, } def get_open_order_snapshot(self) -> dict[str, Any]: orders = self.get_open_orders() return { "account_id": self.account_id, "market_symbol": self.market_symbol, "open_orders": orders if isinstance(orders, list) else [], } def get_strategy_snapshot(self) -> dict[str, Any]: return { "identity": { "strategy_id": self.id, "account_id": self.account_id, "market": self.market_symbol, "base_currency": self.base_currency, "quote_currency": self.counter_currency, }, "control": { "mode": getattr(self, "mode", "off"), }, "position": self.get_balance_snapshot(), "orders": self.get_open_order_snapshot(), } def _available_balance(self, asset_code: str) -> float: try: info = self.get_account_info() except Exception: return 0.0 balances = info.get("balances") if isinstance(info, dict) else [] if not isinstance(balances, list): return 0.0 wanted = str(asset_code or "").upper() for balance in balances: if not isinstance(balance, dict): continue if str(balance.get("asset_code") or "").upper() != wanted: continue try: return float(balance.get("available") if balance.get("available") is not None else balance.get("total") or 0.0) except Exception: return 0.0 return 0.0 def get_account_fees(self, market_symbol: str | None = None) -> Any: return get_account_fees(self.account_id, market_symbol) def get_fee_rates(self, market_symbol: str | None = None) -> dict[str, float]: payload = get_account_fees(self.account_id, market_symbol) if not isinstance(payload, dict): return {"maker": 0.0, "taker": 0.0} fees = payload.get("fees") if isinstance(payload.get("fees"), dict) else {} def _normalize_fee(value: object) -> float: try: rate = float(value or 0.0) except Exception: return 0.0 if rate > 0.1: rate /= 100.0 return rate try: maker = _normalize_fee(fees.get("maker")) except Exception: maker = 0.0 try: taker = _normalize_fee(fees.get("taker")) except Exception: taker = 0.0 return {"maker": maker, "taker": taker} def suggest_order_amount( self, *, side: str, price: float, levels: int, min_notional: float, fee_rate: float, quote_notional: float = 0.0, max_notional_per_order: float = 0.0, dust_collect: bool = False, order_size: float = 0.0, safety: float = 0.995, available_balances: dict[str, float] | None = None, ) -> float: """Return a conservative per-order amount for this venue/account. The returned amount is exchange-aware but strategy-agnostic, so other strategies can reuse the same sizing rules. `quote_notional` is the canonical quote-currency cap when a strategy wants quote-standard sizing. """ if levels <= 0 or price <= 0: return 0.0 side = str(side or "").strip().lower() fee_rate = max(float(fee_rate or 0.0), 0.0) quote_notional = float(quote_notional or 0.0) max_notional_per_order = float(max_notional_per_order or 0.0) order_size = float(order_size or 0.0) balance_overrides = { str(asset_code or "").upper(): max(float(amount or 0.0), 0.0) for asset_code, amount in dict(available_balances or {}).items() } min_amount = (min_notional / price) if min_notional > 0 else 0.0 if side == "buy": quote = self.counter_currency or "USD" quote_available = balance_overrides.get(str(quote).upper()) if quote_available is None: quote_available = self._available_balance(quote) if hasattr(self, "_available_balance") else 0.0 spendable_quote = quote_available * safety quote_cap = spendable_quote if max_notional_per_order <= 0 else min(spendable_quote, max_notional_per_order) if quote_notional > 0: quote_cap = min(quote_cap, quote_notional) if dust_collect and max_notional_per_order > 0: leftover_quote = max(spendable_quote - max_notional_per_order, 0.0) if 0.0 < leftover_quote < min_notional: quote_cap = spendable_quote if quote_cap <= 0: return 0.0 # `max_notional_per_order` is already a per-order cap in quote # currency, so do not dilute it by the number of grid levels. per_order_quote = quote_cap min_quote_needed = min_notional * (1 + fee_rate) if per_order_quote < min_quote_needed: return 0.0 amount = per_order_quote / (price * (1 + fee_rate)) else: base = self.base_currency or (self.market_symbol or "XRP") base_available = balance_overrides.get(str(base).upper()) if base_available is None: base_available = self._available_balance(base) if hasattr(self, "_available_balance") else 0.0 spendable_base = base_available * safety if quote_notional > 0 and price > 0: spendable_base = min(spendable_base, quote_notional / price) if max_notional_per_order > 0 and price > 0: base_cap = max_notional_per_order / price if dust_collect: leftover_base = max(spendable_base - base_cap, 0.0) if 0.0 < leftover_base * price < min_notional: spendable_base = spendable_base else: spendable_base = min(spendable_base, base_cap) else: spendable_base = min(spendable_base, base_cap) if spendable_base <= 0: return 0.0 # Same rule as above, the cap is per order, not per grid level. amount = spendable_base if amount < min_amount: return 0.0 if order_size > 0: if order_size < min_amount: return 0.0 amount = min(amount, order_size) return max(amount, 0.0) def get_news(self, **kwargs: Any) -> Any: return call_news_tool("search", kwargs)