| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336 |
- from __future__ import annotations
- from datetime import datetime, timezone
- from src.trader_mcp.strategy_sizing import cap_amount_to_balance_target, suggest_quote_sized_amount
- from src.trader_mcp.strategy_sdk import Strategy
- from src.trader_mcp.logging_utils import log_event
- class Strategy(Strategy):
- LABEL = "Trend Follower"
- STRATEGY_PROFILE = {
- "expects": {
- "trend": "strong",
- "volatility": "moderate",
- "event_risk": "low",
- "liquidity": "normal",
- },
- "avoids": {
- "trend": "range",
- "volatility": "chaotic",
- "event_risk": "high",
- "liquidity": "thin",
- },
- "risk_profile": "growth",
- "capabilities": ["directional_continuation", "momentum_following", "inventory_accumulation"],
- "role": "primary",
- "inventory_behavior": "accumulative_long",
- "requires_rebalance_before_start": False,
- "requires_rebalance_before_stop": False,
- "safe_when_unbalanced": True,
- "can_run_with": ["exposure_protector"],
- }
- TICK_MINUTES = 0.5
- CONFIG_SCHEMA = {
- "trade_side": {"type": "string", "default": "both"},
- "balance_target": {"type": "float", "default": 1.0, "min": 0.0, "max": 1.0},
- "entry_offset_pct": {"type": "float", "default": 0.003, "min": 0.0, "max": 1.0},
- "exit_offset_pct": {"type": "float", "default": 0.002, "min": 0.0, "max": 1.0},
- "order_notional_quote": {"type": "float", "default": 0.0, "min": 0.0},
- "max_order_notional_quote": {"type": "float", "default": 0.0, "min": 0.0},
- "dust_collect": {"type": "bool", "default": False},
- "cooldown_ticks": {"type": "int", "default": 2, "min": 0, "max": 1000},
- "debug_orders": {"type": "bool", "default": True},
- }
- STATE_SCHEMA = {
- "last_price": {"type": "float", "default": 0.0},
- "last_action": {"type": "string", "default": "idle"},
- "last_error": {"type": "string", "default": ""},
- "debug_log": {"type": "list", "default": []},
- "trade_side": {"type": "string", "default": "both"},
- "cooldown_remaining": {"type": "int", "default": 0},
- "last_order_at": {"type": "float", "default": 0.0},
- "last_order_price": {"type": "float", "default": 0.0},
- "base_available": {"type": "float", "default": 0.0},
- "counter_available": {"type": "float", "default": 0.0},
- }
- def init(self):
- return {
- "last_price": 0.0,
- "last_action": "idle",
- "last_error": "",
- "debug_log": ["init trend follower"],
- "trade_side": "both",
- "cooldown_remaining": 0,
- "last_order_at": 0.0,
- "last_order_price": 0.0,
- "base_available": 0.0,
- "counter_available": 0.0,
- }
- def _log(self, message: str) -> None:
- state = getattr(self, "state", {}) or {}
- log = list(state.get("debug_log") or [])
- log.append(message)
- state["debug_log"] = log[-12:]
- self.state = state
- log_event("trend", message)
- def _base_symbol(self) -> str:
- return (self.context.base_currency or self.context.market_symbol or "XRP").split("/")[0].upper()
- def _market_symbol(self) -> str:
- return self.context.market_symbol or f"{self._base_symbol().lower()}usd"
- def _trade_side(self) -> str:
- side = str(self.config.get("trade_side") or "both").strip().lower()
- return side if side in {"buy", "sell", "both"} else "both"
- def _price(self) -> float:
- payload = self.context.get_price(self._base_symbol())
- return float(payload.get("price") or 0.0)
- def _live_fee_rate(self) -> float:
- try:
- payload = self.context.get_fee_rates(self._market_symbol())
- return float(payload.get("maker") or payload.get("taker") or 0.0)
- except Exception as exc:
- self._log(f"fee lookup failed: {exc}")
- return 0.0
- def _refresh_balance_snapshot(self) -> None:
- try:
- info = self.context.get_account_info()
- except Exception as exc:
- self._log(f"balance refresh failed: {exc}")
- return
- balances = info.get("balances") if isinstance(info, dict) else []
- if not isinstance(balances, list):
- return
- base = self._base_symbol()
- quote = str(self.context.counter_currency or "USD").upper()
- for balance in balances:
- if not isinstance(balance, dict):
- continue
- asset = str(balance.get("asset_code") or "").upper()
- try:
- available = float(balance.get("available") if balance.get("available") is not None else balance.get("total") or 0.0)
- except Exception:
- continue
- if asset == base:
- self.state["base_available"] = available
- if asset == quote:
- self.state["counter_available"] = available
- def _supervision(self) -> dict:
- last_error = str(self.state.get("last_error") or "")
- side = self._trade_side()
- pressure = "balanced" if side in {"buy", "sell"} else "unknown"
- balance_target = self._balance_target()
- base_ratio = self._account_value_ratio(float(self.state.get("last_price") or 0.0))
- quote_ratio = max(0.0, 1.0 - base_ratio)
- entry_offset_pct = float(self.config.get("entry_offset_pct") or 0.003)
- exit_offset_pct = float(self.config.get("exit_offset_pct") or 0.002)
- last_order_at = float(self.state.get("last_order_at") or 0.0)
- now_ts = datetime.now(timezone.utc).timestamp()
- last_order_age_seconds = round(max(now_ts - last_order_at, 0.0), 3) if last_order_at > 0 else None
- if entry_offset_pct <= 0.0015:
- chasing_risk = "elevated"
- elif entry_offset_pct <= 0.0035:
- chasing_risk = "moderate"
- else:
- chasing_risk = "low"
- concerns = []
- if side == "both":
- concerns.append("generic trend instance relies on Hermes for direction")
- if chasing_risk == "elevated":
- concerns.append("entry offset is tight and may chase price")
- return {
- "health": "degraded" if last_error else "healthy",
- "degraded": bool(last_error),
- "inventory_pressure": pressure,
- "capacity_available": side in {"buy", "sell"},
- "trade_side": side,
- "balance_target": round(balance_target, 6),
- "base_ratio": round(base_ratio, 6),
- "quote_ratio": round(quote_ratio, 6),
- "entry_offset_pct": round(entry_offset_pct, 6),
- "exit_offset_pct": round(exit_offset_pct, 6),
- "order_notional_quote": float(self.config.get("order_notional_quote") or 0.0),
- "max_order_notional_quote": float(self.config.get("max_order_notional_quote") or 0.0),
- "dust_collect": bool(self.config.get("dust_collect", False)),
- "last_order_age_seconds": last_order_age_seconds,
- "last_order_price": float(self.state.get("last_order_price") or 0.0),
- "chasing_risk": chasing_risk,
- "concerns": concerns,
- "last_reason": last_error or f"trade_side={side}",
- }
- def apply_policy(self):
- policy = super().apply_policy()
- quote_notional = float(self.config.get("order_notional_quote") or 0.0)
- max_quote_notional = float(self.config.get("max_order_notional_quote") or 0.0)
- self.state["policy_derived"] = {
- "trade_side": self._trade_side(),
- "balance_target": self._balance_target(),
- "entry_offset_pct": float(self.config.get("entry_offset_pct") or 0.003),
- "exit_offset_pct": float(self.config.get("exit_offset_pct") or 0.002),
- "cooldown_ticks": int(self.config.get("cooldown_ticks") or 2),
- "order_notional_quote": quote_notional,
- "max_order_notional_quote": max_quote_notional,
- "dust_collect": bool(self.config.get("dust_collect", False)),
- }
- return policy
- def _balance_target(self) -> float:
- try:
- target = float(self.config.get("balance_target") if self.config.get("balance_target") is not None else 1.0)
- except Exception:
- return 1.0
- return min(max(target, 0.0), 1.0)
- def _account_value_ratio(self, price: float) -> float:
- if price <= 0:
- return 0.5
- base_value = float(self.state.get("base_available") or 0.0) * price
- counter_value = float(self.state.get("counter_available") or 0.0)
- total = base_value + counter_value
- if total <= 0:
- return 0.5
- return base_value / total
- def _balance_target_reached(self, side: str, price: float) -> bool:
- target = self._balance_target()
- if target >= 1.0:
- return False
- base_ratio = self._account_value_ratio(price)
- if side == "buy":
- return base_ratio >= target
- if side == "sell":
- return (1.0 - base_ratio) >= target
- return False
- def _suggest_amount(self, price: float, side: str) -> float:
- min_notional = float(self.context.minimum_order_value or 0.0)
- fee_rate = self._live_fee_rate()
- amount = suggest_quote_sized_amount(
- self.context,
- side=side,
- price=price,
- levels=1,
- min_notional=min_notional,
- fee_rate=fee_rate,
- order_notional_quote=float(self.config.get("order_notional_quote") or 0.0),
- max_order_notional_quote=float(self.config.get("max_order_notional_quote") or 0.0),
- dust_collect=bool(self.config.get("dust_collect", False)),
- )
- return cap_amount_to_balance_target(
- suggested_amount=amount,
- side=side,
- price=price,
- fee_rate=fee_rate,
- balance_target=self._balance_target(),
- base_available=float(self.state.get("base_available") or 0.0),
- counter_available=float(self.state.get("counter_available") or 0.0),
- min_notional=min_notional,
- )
- def on_tick(self, tick):
- self.state["last_error"] = ""
- self._log(f"tick alive price={self.state.get('last_price') or 0.0}")
- self._refresh_balance_snapshot()
- price = self._price()
- self.state["last_price"] = price
- if int(self.state.get("cooldown_remaining") or 0) > 0:
- self.state["cooldown_remaining"] = int(self.state.get("cooldown_remaining") or 0) - 1
- self.state["last_action"] = "cooldown"
- return {"action": "cooldown", "price": price}
- side = self._trade_side()
- if side not in {"buy", "sell"}:
- self.state["last_action"] = "hold"
- return {"action": "hold", "price": price, "reason": "trade_side must be buy or sell"}
- if self._balance_target_reached(side, price):
- self.state["last_action"] = "target_reached"
- return {"action": "hold", "price": price, "reason": "balance target reached"}
- amount = self._suggest_amount(price, side)
- if amount <= 0:
- self.state["last_action"] = "hold"
- return {"action": "hold", "price": price, "reason": "no usable size"}
- offset = float(self.config.get("entry_offset_pct", 0.003) or 0.0)
- if side == "buy":
- order_price = round(price * (1 + offset), 8)
- else:
- order_price = round(price * (1 - offset), 8)
- try:
- if self.config.get("debug_orders", True):
- self._log(f"{side} trend amount={amount:.6g} price={order_price}")
- result = self.context.place_order(
- side=side,
- order_type="limit",
- amount=amount,
- price=order_price,
- market=self._market_symbol(),
- )
- self.state["cooldown_remaining"] = int(self.config.get("cooldown_ticks", 2) or 2)
- self.state["last_order_at"] = datetime.now(timezone.utc).timestamp()
- self.state["last_order_price"] = order_price
- self.state["last_action"] = f"{side}_trend"
- return {"action": side, "price": order_price, "amount": amount, "result": result}
- except Exception as exc:
- self.state["last_error"] = str(exc)
- self._log(f"trend order failed: {exc}")
- self.state["last_action"] = "error"
- return {"action": "error", "price": price, "error": str(exc)}
- def report(self):
- snapshot = self.context.get_strategy_snapshot() if hasattr(self.context, "get_strategy_snapshot") else {}
- return {
- "identity": snapshot.get("identity", {}),
- "control": snapshot.get("control", {}),
- "fit": dict(getattr(self, "STRATEGY_PROFILE", {}) or {}),
- "position": snapshot.get("position", {}),
- "state": {
- "last_price": self.state.get("last_price", 0.0),
- "last_action": self.state.get("last_action", "idle"),
- "trade_side": self._trade_side(),
- "balance_target": self._balance_target(),
- "order_notional_quote": float(self.config.get("order_notional_quote") or 0.0),
- "max_order_notional_quote": float(self.config.get("max_order_notional_quote") or 0.0),
- "dust_collect": bool(self.config.get("dust_collect", False)),
- "cooldown_remaining": self.state.get("cooldown_remaining", 0),
- "base_available": self.state.get("base_available", 0.0),
- "counter_available": self.state.get("counter_available", 0.0),
- },
- "assessment": {
- "confidence": None,
- "uncertainty": None,
- "reason": "trend capture",
- "warnings": [w for w in (self._supervision().get("concerns") or []) if w],
- "policy": dict(self.config.get("policy") or {}),
- },
- "execution": snapshot.get("execution", {}),
- "supervision": self._supervision(),
- }
- def render(self):
- return {
- "widgets": [
- {"type": "metric", "label": "market", "value": self._market_symbol()},
- {"type": "metric", "label": "price", "value": round(float(self.state.get("last_price") or 0.0), 6)},
- {"type": "metric", "label": "side", "value": self._trade_side()},
- {"type": "metric", "label": "balance target", "value": round(self._balance_target(), 6)},
- {"type": "metric", "label": "quote notional", "value": round(float(self.config.get("order_notional_quote") or 0.0), 6)},
- {"type": "metric", "label": "dust collect", "value": bool(self.config.get("dust_collect", False))},
- {"type": "metric", "label": "state", "value": self.state.get("last_action", "idle")},
- {"type": "metric", "label": "cooldown", "value": int(self.state.get("cooldown_remaining") or 0)},
- {"type": "text", "label": "error", "value": self.state.get("last_error", "") or "none"},
- {"type": "log", "label": "debug log", "lines": self.state.get("debug_log") or []},
- ]
- }
|