from __future__ import annotations from datetime import datetime, timezone from src.trader_mcp.strategy_sdk import Strategy from src.trader_mcp.logging_utils import log_event class Strategy(Strategy): LABEL = "Trend Follower" STRATEGY_PROFILE = { "expects": { "trend": "strong", "volatility": "moderate", "event_risk": "low", "liquidity": "normal", }, "avoids": { "trend": "range", "volatility": "chaotic", "event_risk": "high", "liquidity": "thin", }, "risk_profile": "growth", "capabilities": ["directional_continuation", "momentum_following", "inventory_accumulation"], "role": "primary", "inventory_behavior": "accumulative_long", "requires_rebalance_before_start": False, "requires_rebalance_before_stop": False, "safe_when_unbalanced": True, "can_run_with": ["exposure_protector"], } TICK_MINUTES = 0.5 CONFIG_SCHEMA = { "trend_timeframe": {"type": "string", "default": "1h"}, "trend_strength_min": {"type": "float", "default": 0.65, "min": 0.0, "max": 1.0}, "entry_offset_pct": {"type": "float", "default": 0.003, "min": 0.0, "max": 1.0}, "exit_offset_pct": {"type": "float", "default": 0.002, "min": 0.0, "max": 1.0}, "order_size": {"type": "float", "default": 0.0, "min": 0.0}, "max_order_size": {"type": "float", "default": 0.0, "min": 0.0}, "fee_rate": {"type": "float", "default": 0.0025, "min": 0.0, "max": 0.05}, "cooldown_ticks": {"type": "int", "default": 2, "min": 0, "max": 1000}, "debug_orders": {"type": "bool", "default": True}, } STATE_SCHEMA = { "last_price": {"type": "float", "default": 0.0}, "last_action": {"type": "string", "default": "idle"}, "last_error": {"type": "string", "default": ""}, "debug_log": {"type": "list", "default": []}, "last_signal": {"type": "string", "default": "neutral"}, "last_strength": {"type": "float", "default": 0.0}, "cooldown_remaining": {"type": "int", "default": 0}, "last_order_at": {"type": "float", "default": 0.0}, "last_order_price": {"type": "float", "default": 0.0}, "base_available": {"type": "float", "default": 0.0}, "counter_available": {"type": "float", "default": 0.0}, } def init(self): return { "last_price": 0.0, "last_action": "idle", "last_error": "", "debug_log": ["init trend follower"], "last_signal": "neutral", "last_strength": 0.0, "cooldown_remaining": 0, "last_order_at": 0.0, "last_order_price": 0.0, "base_available": 0.0, "counter_available": 0.0, } def _log(self, message: str) -> None: state = getattr(self, "state", {}) or {} log = list(state.get("debug_log") or []) log.append(message) state["debug_log"] = log[-12:] self.state = state log_event("trend", message) def _base_symbol(self) -> str: return (self.context.base_currency or self.context.market_symbol or "XRP").split("/")[0].upper() def _market_symbol(self) -> str: return self.context.market_symbol or f"{self._base_symbol().lower()}usd" def _price(self) -> float: payload = self.context.get_price(self._base_symbol()) return float(payload.get("price") or 0.0) def _live_fee_rate(self) -> float: try: payload = self.context.get_fee_rates(self._market_symbol()) return float(payload.get("maker") or payload.get("taker") or 0.0) except Exception as exc: self._log(f"fee lookup failed: {exc}") return float(self.config.get("fee_rate", 0.0025) or 0.0) def _refresh_balance_snapshot(self) -> None: try: info = self.context.get_account_info() except Exception as exc: self._log(f"balance refresh failed: {exc}") return balances = info.get("balances") if isinstance(info, dict) else [] if not isinstance(balances, list): return base = self._base_symbol() quote = str(self.context.counter_currency or "USD").upper() for balance in balances: if not isinstance(balance, dict): continue asset = str(balance.get("asset_code") or "").upper() try: available = float(balance.get("available") if balance.get("available") is not None else balance.get("total") or 0.0) except Exception: continue if asset == base: self.state["base_available"] = available if asset == quote: self.state["counter_available"] = available def _inventory_ratio(self, price: float) -> float: base_value = float(self.state.get("base_available") or 0.0) * price counter_value = float(self.state.get("counter_available") or 0.0) total = base_value + counter_value if total <= 0: return 0.5 return base_value / total def _supervision(self) -> dict: price = float(self.state.get("last_price") or 0.0) ratio = self._inventory_ratio(price if price > 0 else 1.0) last_error = str(self.state.get("last_error") or "") strength = float(self.state.get("last_strength") or 0.0) signal = str(self.state.get("last_signal") or "neutral") if ratio >= 0.88: pressure = "base_heavy" elif ratio <= 0.12: pressure = "quote_heavy" elif ratio >= 0.68: pressure = "base_biased" elif ratio <= 0.32: pressure = "quote_biased" else: pressure = "balanced" return { "health": "degraded" if last_error else "healthy", "degraded": bool(last_error), "inventory_pressure": pressure, "capacity_available": strength >= float(self.config.get("trend_strength_min", 0.65) or 0.65), "last_reason": last_error or f"signal={signal}, strength={strength:.3f}, base_ratio={ratio:.3f}", "trend_strength": strength, "signal": signal, } def _trend_snapshot(self) -> dict: tf = str(self.config.get("trend_timeframe", "1h") or "1h") try: return self.context.get_regime(self._base_symbol(), tf) except Exception as exc: self._log(f"trend lookup failed: {exc}") return {"error": str(exc)} def apply_policy(self): policy = super().apply_policy() risk = str(policy.get("risk_posture") or "normal").lower() priority = str(policy.get("priority") or "normal").lower() strength_map = {"cautious": 0.8, "normal": 0.65, "assertive": 0.5} entry_map = {"cautious": 0.002, "normal": 0.003, "assertive": 0.005} exit_map = {"cautious": 0.0015, "normal": 0.002, "assertive": 0.003} cooldown_map = {"cautious": 4, "normal": 2, "assertive": 1} size_map = {"cautious": 0.5, "normal": 1.0, "assertive": 1.5} if priority in {"low", "background"}: risk = "cautious" elif priority in {"high", "urgent"}: risk = "assertive" self.config["trend_strength_min"] = float(self.config.get("trend_strength_min") or strength_map.get(risk, 0.65)) self.config["entry_offset_pct"] = float(self.config.get("entry_offset_pct") or entry_map.get(risk, 0.003)) self.config["exit_offset_pct"] = float(self.config.get("exit_offset_pct") or exit_map.get(risk, 0.002)) self.config["cooldown_ticks"] = int(self.config.get("cooldown_ticks") or cooldown_map.get(risk, 2)) self.config["order_size"] = float(self.config.get("order_size") or size_map.get(risk, 1.0)) self.state["policy_derived"] = { "trend_strength_min": self.config["trend_strength_min"], "entry_offset_pct": self.config["entry_offset_pct"], "exit_offset_pct": self.config["exit_offset_pct"], "cooldown_ticks": self.config["cooldown_ticks"], "order_size": self.config["order_size"], } return policy def _trend_strength(self) -> tuple[str, float]: regime = self._trend_snapshot() trend = regime.get("trend") or {} momentum = regime.get("momentum") or {} direction = str(trend.get("state") or trend.get("direction") or "unknown") strength = self._coerce_strength(trend.get("strength")) if strength is None: strength = self._derive_strength_from_regime(direction=direction, trend=trend, momentum=momentum, regime=regime) return direction, strength def _coerce_strength(self, value) -> float | None: try: if value is None: return None return max(0.0, min(1.0, float(value))) except Exception: return None def _derive_strength_from_regime(self, *, direction: str, trend: dict, momentum: dict, regime: dict) -> float: direction = str(direction or "unknown").lower() score = 0.0 if direction in {"bull", "up", "long"}: score += 0.45 elif direction in {"bear", "down", "short"}: score += 0.45 else: return 0.0 momentum_state = str(momentum.get("state") or "").lower() if direction in {"bull", "up", "long"} and momentum_state == "bull": score += 0.2 elif direction in {"bear", "down", "short"} and momentum_state == "bear": score += 0.2 try: rsi = float(momentum.get("rsi") or 0.0) except Exception: rsi = 0.0 if direction in {"bull", "up", "long"}: if rsi >= 60: score += 0.2 elif rsi >= 52: score += 0.1 else: if 0 < rsi <= 40: score += 0.2 elif 0 < rsi <= 48: score += 0.1 try: macd_hist = float(momentum.get("macd_histogram") or 0.0) except Exception: macd_hist = 0.0 if direction in {"bull", "up", "long"} and macd_hist > 0: score += 0.1 elif direction in {"bear", "down", "short"} and macd_hist < 0: score += 0.1 try: ema_fast = float(trend.get("ema_fast") or 0.0) ema_slow = float(trend.get("ema_slow") or 0.0) except Exception: ema_fast = 0.0 ema_slow = 0.0 if direction in {"bull", "up", "long"} and ema_fast > ema_slow > 0: score += 0.05 elif direction in {"bear", "down", "short"} and 0 < ema_fast < ema_slow: score += 0.05 return max(0.0, min(1.0, round(score, 4))) def _suggest_amount(self, price: float) -> float: min_notional = float(self.context.minimum_order_value or 0.0) max_order = float(self.config.get("max_order_size", 0.0) or 0.0) if hasattr(self.context, "suggest_order_amount"): fee_rate = self._live_fee_rate() return float(self.context.suggest_order_amount( side="buy" if str(self.state.get("last_signal") or "").lower() in {"bull", "up", "long"} else "sell", price=price, levels=1, min_notional=min_notional, fee_rate=fee_rate, max_notional_per_order=(max_order * price) if max_order > 0 else 0.0, order_size=float(self.config.get("order_size", 0.0) or 0.0), ) or 0.0) amount = float(self.config.get("order_size", 0.0) or 0.0) if max_order > 0: amount = min(amount, max_order) return max(amount, 0.0) def on_tick(self, tick): self.state["last_error"] = "" self._log(f"tick alive price={self.state.get('last_price') or 0.0}") self._refresh_balance_snapshot() price = self._price() self.state["last_price"] = price if int(self.state.get("cooldown_remaining") or 0) > 0: self.state["cooldown_remaining"] = int(self.state.get("cooldown_remaining") or 0) - 1 self.state["last_action"] = "cooldown" return {"action": "cooldown", "price": price} direction, strength = self._trend_strength() self.state["last_signal"] = direction self.state["last_strength"] = strength if strength < float(self.config.get("trend_strength_min", 0.65) or 0.65): self.state["last_action"] = "hold" return {"action": "hold", "price": price, "reason": "trend too weak", "strength": strength} amount = self._suggest_amount(price) if amount <= 0: self.state["last_action"] = "hold" return {"action": "hold", "price": price, "reason": "no usable size"} side = "buy" if direction in {"bull", "up", "long"} else "sell" offset = float(self.config.get("entry_offset_pct", 0.003) or 0.0) if side == "buy": order_price = round(price * (1 + offset), 8) else: order_price = round(price * (1 - offset), 8) try: if self.config.get("debug_orders", True): self._log(f"{side} trend amount={amount:.6g} price={order_price} strength={strength:.3f}") result = self.context.place_order( side=side, order_type="limit", amount=amount, price=order_price, market=self._market_symbol(), ) self.state["cooldown_remaining"] = int(self.config.get("cooldown_ticks", 2) or 2) self.state["last_order_at"] = datetime.now(timezone.utc).timestamp() self.state["last_order_price"] = order_price self.state["last_action"] = f"{side}_trend" return {"action": side, "price": order_price, "amount": amount, "result": result, "strength": strength} except Exception as exc: self.state["last_error"] = str(exc) self._log(f"trend order failed: {exc}") self.state["last_action"] = "error" return {"action": "error", "price": price, "error": str(exc)} def report(self): snapshot = self.context.get_strategy_snapshot() if hasattr(self.context, "get_strategy_snapshot") else {} return { "identity": snapshot.get("identity", {}), "control": snapshot.get("control", {}), "fit": dict(getattr(self, "STRATEGY_PROFILE", {}) or {}), "position": snapshot.get("position", {}), "state": { "last_price": self.state.get("last_price", 0.0), "last_action": self.state.get("last_action", "idle"), "last_signal": self.state.get("last_signal", "neutral"), "last_strength": self.state.get("last_strength", 0.0), "cooldown_remaining": self.state.get("cooldown_remaining", 0), "base_available": self.state.get("base_available", 0.0), "counter_available": self.state.get("counter_available", 0.0), }, "assessment": { "confidence": None, "uncertainty": None, "reason": "trend capture", "warnings": [], "policy": dict(self.config.get("policy") or {}), }, "execution": snapshot.get("execution", {}), "supervision": self._supervision(), } def render(self): return { "widgets": [ {"type": "metric", "label": "market", "value": self._market_symbol()}, {"type": "metric", "label": "price", "value": round(float(self.state.get("last_price") or 0.0), 6)}, {"type": "metric", "label": "signal", "value": self.state.get("last_signal", "neutral")}, {"type": "metric", "label": "strength", "value": round(float(self.state.get("last_strength") or 0.0), 4)}, {"type": "metric", "label": "state", "value": self.state.get("last_action", "idle")}, {"type": "metric", "label": "cooldown", "value": int(self.state.get("cooldown_remaining") or 0)}, {"type": "text", "label": "error", "value": self.state.get("last_error", "") or "none"}, {"type": "log", "label": "debug log", "lines": self.state.get("debug_log") or []}, ] }