| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325 |
- from __future__ import annotations
- from dataclasses import dataclass, field
- from typing import Any
- from .exec_client import list_open_orders, query_order, cancel_all_orders, cancel_order, place_order, get_account_info, get_account_fees
- from .news_client import call_news_tool
- from .crypto_client import call_crypto_tool
- def _order_ref(payload: Any) -> str | None:
- if not isinstance(payload, dict):
- return None
- for key in ("order_id", "bitstamp_order_id", "id", "client_order_id"):
- value = payload.get(key)
- if value is None:
- continue
- text = str(value).strip()
- if text:
- return text
- return None
- def _cancelled_order_ids(cancel_result: Any) -> list[str]:
- if not isinstance(cancel_result, dict):
- return []
- rows = cancel_result.get("cancelled")
- if not isinstance(rows, list):
- return []
- cancelled: list[str] = []
- for row in rows:
- if not isinstance(row, dict) or not bool(row.get("ok")):
- continue
- order_id = _order_ref(row)
- if order_id is not None:
- cancelled.append(order_id)
- return cancelled
- def _has_inconclusive_cancel_rows(cancel_result: Any) -> bool:
- if not isinstance(cancel_result, dict):
- return False
- rows = cancel_result.get("cancelled")
- if not isinstance(rows, list):
- return False
- return any(isinstance(row, dict) and not bool(row.get("ok")) for row in rows)
- @dataclass(frozen=True)
- class StrategyContext:
- id: str
- account_id: str
- client_id: str | None = field(default=None, repr=False)
- mode: str = "off"
- market_symbol: str | None = None
- base_currency: str | None = None
- counter_currency: str | None = None
- minimum_order_value: float | None = None
- def __getattr__(self, name: str):
- if name == "mode":
- return "active"
- raise AttributeError(name)
- def get_open_orders(self) -> Any:
- payload = list_open_orders(self.account_id, self.client_id)
- if isinstance(payload, dict) and isinstance(payload.get("orders"), list):
- return payload["orders"]
- return payload
- def query_order(self, order_id: str) -> Any:
- return query_order(self.account_id, order_id)
- def cancel_all_orders(self) -> Any:
- return cancel_all_orders(self.account_id, self.client_id)
- def cancel_all_orders_confirmed(self) -> dict[str, Any]:
- cancel_result = None
- cancel_error = None
- verification_error = None
- remaining_orders = None
- try:
- cancel_result = self.cancel_all_orders()
- except Exception as exc:
- cancel_error = str(exc)
- try:
- payload = self.get_open_orders()
- if isinstance(payload, list):
- remaining_orders = payload
- else:
- verification_error = f"unexpected open orders payload type: {type(payload).__name__}"
- except Exception as exc:
- verification_error = str(exc)
- cancelled_order_ids = _cancelled_order_ids(cancel_result)
- inconclusive_cancel = cancel_error is not None or _has_inconclusive_cancel_rows(cancel_result)
- conclusive = (
- not inconclusive_cancel
- and remaining_orders is not None
- and len(remaining_orders) == 0
- )
- cleanup_status = "cleanup_confirmed" if conclusive else ("cleanup_failed" if cancel_error else "cleanup_partial")
- error = None
- if not conclusive:
- if cancel_error:
- error = cancel_error
- elif _has_inconclusive_cancel_rows(cancel_result):
- error = "cancel-all reported uncancelled orders"
- elif verification_error:
- error = verification_error
- elif remaining_orders is not None:
- error = f"{len(remaining_orders)} open orders remain after cancel-all"
- else:
- error = "open order verification unavailable after cancel-all"
- return {
- "ok": cancel_error is None and verification_error is None and conclusive,
- "conclusive": conclusive,
- "cleanup_status": cleanup_status,
- "cancelled_order_ids": cancelled_order_ids,
- "remaining_orders": remaining_orders,
- "cancel_result": cancel_result,
- "cancel_error": cancel_error,
- "verification_error": verification_error,
- "error": error,
- }
- def cancel_order(self, order_id: str) -> Any:
- return cancel_order(self.account_id, order_id)
- def place_order(self, **kwargs: Any) -> Any:
- mode = getattr(self, "mode", "active")
- if mode != "active":
- raise RuntimeError(f"place_order not allowed in {mode} mode")
- kwargs.setdefault("account_id", self.account_id)
- kwargs.setdefault("client_id", self.client_id)
- return place_order(kwargs)
- def get_price(self, symbol: str) -> Any:
- return call_crypto_tool("get_price", {"symbol": symbol})
- def get_regime(self, symbol: str, timeframe: str = "1h") -> Any:
- return call_crypto_tool("get_regime", {"symbol": symbol, "timeframe": timeframe})
- def get_account_info(self) -> Any:
- return get_account_info(self.account_id)
- def get_balance_snapshot(self) -> dict[str, Any]:
- info = self.get_account_info()
- balances = info.get("balances") if isinstance(info, dict) else []
- if not isinstance(balances, list):
- balances = []
- return {
- "account_id": self.account_id,
- "market_symbol": self.market_symbol,
- "base_currency": self.base_currency,
- "counter_currency": self.counter_currency,
- "balances": balances,
- }
- def get_open_order_snapshot(self) -> dict[str, Any]:
- orders = self.get_open_orders()
- return {
- "account_id": self.account_id,
- "market_symbol": self.market_symbol,
- "open_orders": orders if isinstance(orders, list) else [],
- }
- def get_strategy_snapshot(self) -> dict[str, Any]:
- return {
- "identity": {
- "strategy_id": self.id,
- "account_id": self.account_id,
- "market": self.market_symbol,
- "base_currency": self.base_currency,
- "quote_currency": self.counter_currency,
- },
- "control": {
- "mode": getattr(self, "mode", "off"),
- },
- "position": self.get_balance_snapshot(),
- "orders": self.get_open_order_snapshot(),
- }
- def _available_balance(self, asset_code: str) -> float:
- try:
- info = self.get_account_info()
- except Exception:
- return 0.0
- balances = info.get("balances") if isinstance(info, dict) else []
- if not isinstance(balances, list):
- return 0.0
- wanted = str(asset_code or "").upper()
- for balance in balances:
- if not isinstance(balance, dict):
- continue
- if str(balance.get("asset_code") or "").upper() != wanted:
- continue
- try:
- return float(balance.get("available") if balance.get("available") is not None else balance.get("total") or 0.0)
- except Exception:
- return 0.0
- return 0.0
- def get_account_fees(self, market_symbol: str | None = None) -> Any:
- return get_account_fees(self.account_id, market_symbol)
- def get_fee_rates(self, market_symbol: str | None = None) -> dict[str, float]:
- payload = get_account_fees(self.account_id, market_symbol)
- if not isinstance(payload, dict):
- return {"maker": 0.0, "taker": 0.0}
- fees = payload.get("fees") if isinstance(payload.get("fees"), dict) else {}
- def _normalize_fee(value: object) -> float:
- try:
- rate = float(value or 0.0)
- except Exception:
- return 0.0
- if rate > 0.1:
- rate /= 100.0
- return rate
- try:
- maker = _normalize_fee(fees.get("maker"))
- except Exception:
- maker = 0.0
- try:
- taker = _normalize_fee(fees.get("taker"))
- except Exception:
- taker = 0.0
- return {"maker": maker, "taker": taker}
- def suggest_order_amount(
- self,
- *,
- side: str,
- price: float,
- levels: int,
- min_notional: float,
- fee_rate: float,
- quote_notional: float = 0.0,
- max_notional_per_order: float = 0.0,
- dust_collect: bool = False,
- order_size: float = 0.0,
- safety: float = 0.995,
- available_balances: dict[str, float] | None = None,
- ) -> float:
- """Return a conservative per-order amount for this venue/account.
- The returned amount is exchange-aware but strategy-agnostic, so other
- strategies can reuse the same sizing rules. `quote_notional` is the
- canonical quote-currency cap when a strategy wants quote-standard sizing.
- """
- if levels <= 0 or price <= 0:
- return 0.0
- side = str(side or "").strip().lower()
- fee_rate = max(float(fee_rate or 0.0), 0.0)
- quote_notional = float(quote_notional or 0.0)
- max_notional_per_order = float(max_notional_per_order or 0.0)
- order_size = float(order_size or 0.0)
- balance_overrides = {
- str(asset_code or "").upper(): max(float(amount or 0.0), 0.0)
- for asset_code, amount in dict(available_balances or {}).items()
- }
- min_amount = (min_notional / price) if min_notional > 0 else 0.0
- if side == "buy":
- quote = self.counter_currency or "USD"
- quote_available = balance_overrides.get(str(quote).upper())
- if quote_available is None:
- quote_available = self._available_balance(quote) if hasattr(self, "_available_balance") else 0.0
- spendable_quote = quote_available * safety
- quote_cap = spendable_quote if max_notional_per_order <= 0 else min(spendable_quote, max_notional_per_order)
- if quote_notional > 0:
- quote_cap = min(quote_cap, quote_notional)
- if dust_collect and max_notional_per_order > 0:
- leftover_quote = max(spendable_quote - max_notional_per_order, 0.0)
- if 0.0 < leftover_quote < min_notional:
- quote_cap = spendable_quote
- if quote_cap <= 0:
- return 0.0
- # `max_notional_per_order` is already a per-order cap in quote
- # currency, so do not dilute it by the number of grid levels.
- per_order_quote = quote_cap
- min_quote_needed = min_notional * (1 + fee_rate)
- if per_order_quote < min_quote_needed:
- return 0.0
- amount = per_order_quote / (price * (1 + fee_rate))
- else:
- base = self.base_currency or (self.market_symbol or "XRP")
- base_available = balance_overrides.get(str(base).upper())
- if base_available is None:
- base_available = self._available_balance(base) if hasattr(self, "_available_balance") else 0.0
- spendable_base = base_available * safety
- if quote_notional > 0 and price > 0:
- spendable_base = min(spendable_base, quote_notional / price)
- if max_notional_per_order > 0 and price > 0:
- base_cap = max_notional_per_order / price
- if dust_collect:
- leftover_base = max(spendable_base - base_cap, 0.0)
- if 0.0 < leftover_base * price < min_notional:
- spendable_base = spendable_base
- else:
- spendable_base = min(spendable_base, base_cap)
- else:
- spendable_base = min(spendable_base, base_cap)
- if spendable_base <= 0:
- return 0.0
- # Same rule as above, the cap is per order, not per grid level.
- amount = spendable_base
- if amount < min_amount:
- return 0.0
- if order_size > 0:
- if order_size < min_amount:
- return 0.0
- amount = min(amount, order_size)
- return max(amount, 0.0)
- def get_news(self, **kwargs: Any) -> Any:
- return call_news_tool("search", kwargs)
|