from __future__ import annotations from dataclasses import dataclass, asdict from datetime import datetime, timedelta, timezone from pathlib import Path from typing import Any import json import sys from .candles import Candle, load_candles_csv, resample_candles, slice_through, timeframe_seconds from .indicators import atr, bollinger, ema, macd_histogram, rsi, sma, vwap def _bootstrap_hermes_imports() -> None: import sys root = Path(__file__).resolve().parents[3] src = root / "src" if str(src) not in sys.path: sys.path.insert(0, str(src)) _bootstrap_hermes_imports() from hermes_mcp.decision_engine import make_decision # noqa: E402 from hermes_mcp.narrative_engine import build_narrative # noqa: E402 from hermes_mcp.state_engine import synthesize_state # noqa: E402 DEFAULT_TIMEFRAMES = ("1m", "5m", "15m", "1h", "4h", "1d") @dataclass(frozen=True) class ReplayConfig: market_symbol: str = "XRPUSD" base_currency: str = "XRP" quote_currency: str = "USD" account_id: str = "sim-account" fee_rate: float = 0.004 horizon_bars: int = 30 timeframes: tuple[str, ...] = DEFAULT_TIMEFRAMES base_balance: float = 500.0 quote_balance: float = 500.0 inventory_state: str = "balanced" rebalance_needed: bool = False @dataclass(frozen=True) class ReplayRow: timestamp: str close: float decision_mode: str decision_action: str target_strategy: str | None confidence: float future_return_pct: float | None fee_adjusted_future_return_pct: float | None score: float reason_summary: str payload: dict[str, Any] def _group_by_timeframe(candles: list[Candle], timeframe: str) -> list[Candle]: if timeframe == "1m": return candles return resample_candles(candles, timeframe) def _window_regime(candles: list[Candle], timeframe: str) -> dict[str, Any] | None: if not candles: return None closes = [c.close for c in candles] highs = [c.high for c in candles] lows = [c.low for c in candles] volumes = [c.volume for c in candles] ema_fast = ema(closes, 8) ema_slow = ema(closes, 21) sma_long = sma(closes, 50) price = closes[-1] rsi_value = rsi(closes, 14) macd_value = macd_histogram(closes) atr_value = atr(highs, lows, closes, 14) middle, upper, lower = bollinger(closes, 20, 2.0) vwap_value = vwap(highs, lows, closes, volumes) atr_percent = None if atr_value is not None and price: atr_percent = (atr_value / price) * 100.0 reversal_direction = "none" reversal_score = 0.0 if rsi_value is not None: if rsi_value >= 70: reversal_direction = "down" reversal_score = min((rsi_value - 70.0) * 2.0, 100.0) elif rsi_value <= 30: reversal_direction = "up" reversal_score = min((30.0 - rsi_value) * 2.0, 100.0) if ema_fast is not None and ema_slow is not None: if ema_fast > ema_slow: trend_strength = min(((ema_fast - ema_slow) / price) * 100.0 if price else 0.0, 5.0) trend_direction = "bullish" else: trend_strength = min(((ema_slow - ema_fast) / price) * 100.0 if price else 0.0, 5.0) trend_direction = "bearish" else: trend_strength = 0.0 trend_direction = "mixed" if upper is not None and lower is not None and price: band_span = max(upper - lower, 1e-9) band_pos = (price - lower) / band_span if band_pos >= 0.85: price_location = "near_upper_band" elif band_pos <= 0.15: price_location = "near_lower_band" elif band_pos >= 0.6: price_location = "upper_half" elif band_pos <= 0.4: price_location = "lower_half" else: price_location = "centered" else: price_location = "unknown" regime = { "timeframe": timeframe, "price": round(price, 8), "trend": { "ema_fast": round(ema_fast, 8) if ema_fast is not None else None, "ema_slow": round(ema_slow, 8) if ema_slow is not None else None, "sma_long": round(sma_long, 8) if sma_long is not None else None, }, "momentum": { "rsi": round(rsi_value, 4) if rsi_value is not None else None, "macd_histogram": round(macd_value, 8) if macd_value is not None else None, }, "volatility": { "atr": round(atr_value, 8) if atr_value is not None else None, "atr_percent": round(atr_percent, 8) if atr_percent is not None else None, }, "bands": { "bollinger": { "middle": round(middle, 8) if middle is not None else None, "upper": round(upper, 8) if upper is not None else None, "lower": round(lower, 8) if lower is not None else None, } }, "vwap": round(vwap_value, 8) if vwap_value is not None else None, "reversal": { "direction": reversal_direction, "score": round(reversal_score, 4), }, "meta": { "trend_direction": trend_direction, "trend_strength": round(trend_strength, 6), "price_location": price_location, "candle_count": len(candles), }, } return regime def build_regimes(candles: list[Candle], timeframes: tuple[str, ...] = DEFAULT_TIMEFRAMES) -> list[dict[str, Any]]: regimes: list[dict[str, Any]] = [] for timeframe in timeframes: if timeframe == "1m": tf_candles = candles else: tf_candles = _group_by_timeframe(candles, timeframe) if tf_candles: regimes.append(_window_regime(tf_candles, timeframe)) return [r for r in regimes if r is not None] def _wallet_state(config: ReplayConfig, close: float) -> dict[str, Any]: base_value = config.base_balance * close quote_value = config.quote_balance total_value = base_value + quote_value base_ratio = base_value / total_value if total_value else 0.5 quote_ratio = quote_value / total_value if total_value else 0.5 imbalance = abs(base_ratio - 0.5) return { "inventory_state": config.inventory_state, "rebalance_needed": config.rebalance_needed, "grid_ready": config.inventory_state == "balanced", "base_ratio": round(base_ratio, 4), "quote_ratio": round(quote_ratio, 4), "imbalance_score": round(imbalance, 4), } def _strategies(config: ReplayConfig) -> list[dict[str, Any]]: return [ { "id": "grid-1", "strategy_type": "grid_trader", "mode": "active", "account_id": config.account_id, "market_symbol": config.market_symbol, "state": {}, "config": {}, }, { "id": "trend-1", "strategy_type": "trend_follower", "mode": "off", "account_id": config.account_id, "market_symbol": config.market_symbol, "state": {}, "config": {"trade_side": "both"}, }, { "id": "protect-1", "strategy_type": "exposure_protector", "mode": "off", "account_id": config.account_id, "market_symbol": config.market_symbol, "state": {}, "config": {}, }, ] def _future_return(candles: list[Candle], index: int, horizon_bars: int) -> float | None: future_index = index + horizon_bars if future_index >= len(candles): return None start = candles[index].close end = candles[future_index].close if start == 0: return None return ((end - start) / start) * 100.0 def _fee_adjusted_return(future_return_pct: float | None, fee_rate: float) -> float | None: if future_return_pct is None: return None return future_return_pct - (fee_rate * 100.0 * 2.0) def _direction_from_decision(decision_action: str, narrative: dict[str, Any]) -> str | None: if "trend" in decision_action: breakout = narrative.get("grid_breakout_pressure") if isinstance(narrative.get("grid_breakout_pressure"), dict) else {} meso_bias = str(breakout.get("meso_bias") or "") if meso_bias in {"bullish", "bearish"}: return meso_bias stance = str(narrative.get("stance") or "") if "bullish" in stance: return "bullish" if "bearish" in stance: return "bearish" return None def _score_row(decision_action: str, future_return_pct: float | None, fee_adjusted_return_pct: float | None) -> float: if future_return_pct is None or fee_adjusted_return_pct is None: return 0.0 if decision_action == "keep_grid": return 1.0 if abs(future_return_pct) < 0.25 else -abs(fee_adjusted_return_pct) / 10.0 if "trend" in decision_action: return fee_adjusted_return_pct / 5.0 if "protect" in decision_action or "rebalance" in decision_action: return max(0.0, 0.5 - abs(future_return_pct) / 10.0) return future_return_pct / 10.0 def run_replay(*, candles: list[Candle], config: ReplayConfig, lookback_bars: int = 2000, progress_every: int = 2000) -> list[ReplayRow]: if len(candles) < 50: return [] rows: list[ReplayRow] = [] start_index = max(50, lookback_bars) total = max(0, (len(candles) - config.horizon_bars) - start_index) for i, index in enumerate(range(start_index, len(candles) - config.horizon_bars)): if progress_every and (i % progress_every == 0): print(f"replay {i}/{total}", file=sys.stderr) window = candles[max(0, index - lookback_bars + 1) : index + 1] current = candles[index] regimes = build_regimes(window, config.timeframes) concern = { "id": "sim-concern", "account_id": config.account_id, "market_symbol": config.market_symbol, "base_currency": config.base_currency, "quote_currency": config.quote_currency, } account_info = { "balances": [ {"asset_code": config.base_currency, "available": config.base_balance}, {"asset_code": config.quote_currency, "available": config.quote_balance}, ] } state_payload = synthesize_state(concern=concern, regimes=regimes, account_info=account_info) narrative = build_narrative(concern=concern, state_payload=state_payload.payload) wallet_state = _wallet_state(config, current.close) decision = make_decision( concern=concern, narrative_payload=narrative.payload, wallet_state=wallet_state, strategies=_strategies(config), history_window={ "window_seconds": timeframe_seconds("1m") * config.horizon_bars, "recent_states": [], }, ) future_return_pct = _future_return(candles, index, config.horizon_bars) fee_adjusted = _fee_adjusted_return(future_return_pct, config.fee_rate) score = _score_row(decision.action, future_return_pct, fee_adjusted) rows.append( ReplayRow( timestamp=current.timestamp.isoformat(), close=current.close, decision_mode=decision.mode, decision_action=decision.action, target_strategy=decision.target_strategy, confidence=decision.confidence, future_return_pct=future_return_pct, fee_adjusted_future_return_pct=fee_adjusted, score=score, reason_summary=decision.reason_summary, payload={ "decision": decision.payload, "state": state_payload.payload, "narrative": narrative.payload, }, ) ) return rows def rows_to_jsonl(rows: list[ReplayRow]) -> str: return "\n".join(json.dumps(asdict(row), ensure_ascii=False) for row in rows)