| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334 |
- from __future__ import annotations
- from dataclasses import dataclass, asdict
- from datetime import datetime, timedelta, timezone
- from pathlib import Path
- from typing import Any
- import json
- import sys
- from .candles import Candle, load_candles_csv, resample_candles, slice_through, timeframe_seconds
- from .indicators import atr, bollinger, ema, macd_histogram, rsi, sma, vwap
- def _bootstrap_hermes_imports() -> None:
- import sys
- root = Path(__file__).resolve().parents[3]
- src = root / "src"
- if str(src) not in sys.path:
- sys.path.insert(0, str(src))
- _bootstrap_hermes_imports()
- from hermes_mcp.decision_engine import make_decision # noqa: E402
- from hermes_mcp.narrative_engine import build_narrative # noqa: E402
- from hermes_mcp.state_engine import synthesize_state # noqa: E402
- DEFAULT_TIMEFRAMES = ("1m", "5m", "15m", "1h", "4h", "1d")
- @dataclass(frozen=True)
- class ReplayConfig:
- market_symbol: str = "XRPUSD"
- base_currency: str = "XRP"
- quote_currency: str = "USD"
- account_id: str = "sim-account"
- fee_rate: float = 0.004
- horizon_bars: int = 30
- timeframes: tuple[str, ...] = DEFAULT_TIMEFRAMES
- base_balance: float = 500.0
- quote_balance: float = 500.0
- inventory_state: str = "balanced"
- rebalance_needed: bool = False
- @dataclass(frozen=True)
- class ReplayRow:
- timestamp: str
- close: float
- decision_mode: str
- decision_action: str
- target_strategy: str | None
- confidence: float
- future_return_pct: float | None
- fee_adjusted_future_return_pct: float | None
- score: float
- reason_summary: str
- payload: dict[str, Any]
- def _group_by_timeframe(candles: list[Candle], timeframe: str) -> list[Candle]:
- if timeframe == "1m":
- return candles
- return resample_candles(candles, timeframe)
- def _window_regime(candles: list[Candle], timeframe: str) -> dict[str, Any] | None:
- if not candles:
- return None
- closes = [c.close for c in candles]
- highs = [c.high for c in candles]
- lows = [c.low for c in candles]
- volumes = [c.volume for c in candles]
- ema_fast = ema(closes, 8)
- ema_slow = ema(closes, 21)
- sma_long = sma(closes, 50)
- price = closes[-1]
- rsi_value = rsi(closes, 14)
- macd_value = macd_histogram(closes)
- atr_value = atr(highs, lows, closes, 14)
- middle, upper, lower = bollinger(closes, 20, 2.0)
- vwap_value = vwap(highs, lows, closes, volumes)
- atr_percent = None
- if atr_value is not None and price:
- atr_percent = (atr_value / price) * 100.0
- reversal_direction = "none"
- reversal_score = 0.0
- if rsi_value is not None:
- if rsi_value >= 70:
- reversal_direction = "down"
- reversal_score = min((rsi_value - 70.0) * 2.0, 100.0)
- elif rsi_value <= 30:
- reversal_direction = "up"
- reversal_score = min((30.0 - rsi_value) * 2.0, 100.0)
- if ema_fast is not None and ema_slow is not None:
- if ema_fast > ema_slow:
- trend_strength = min(((ema_fast - ema_slow) / price) * 100.0 if price else 0.0, 5.0)
- trend_direction = "bullish"
- else:
- trend_strength = min(((ema_slow - ema_fast) / price) * 100.0 if price else 0.0, 5.0)
- trend_direction = "bearish"
- else:
- trend_strength = 0.0
- trend_direction = "mixed"
- if upper is not None and lower is not None and price:
- band_span = max(upper - lower, 1e-9)
- band_pos = (price - lower) / band_span
- if band_pos >= 0.85:
- price_location = "near_upper_band"
- elif band_pos <= 0.15:
- price_location = "near_lower_band"
- elif band_pos >= 0.6:
- price_location = "upper_half"
- elif band_pos <= 0.4:
- price_location = "lower_half"
- else:
- price_location = "centered"
- else:
- price_location = "unknown"
- regime = {
- "timeframe": timeframe,
- "price": round(price, 8),
- "trend": {
- "ema_fast": round(ema_fast, 8) if ema_fast is not None else None,
- "ema_slow": round(ema_slow, 8) if ema_slow is not None else None,
- "sma_long": round(sma_long, 8) if sma_long is not None else None,
- },
- "momentum": {
- "rsi": round(rsi_value, 4) if rsi_value is not None else None,
- "macd_histogram": round(macd_value, 8) if macd_value is not None else None,
- },
- "volatility": {
- "atr": round(atr_value, 8) if atr_value is not None else None,
- "atr_percent": round(atr_percent, 8) if atr_percent is not None else None,
- },
- "bands": {
- "bollinger": {
- "middle": round(middle, 8) if middle is not None else None,
- "upper": round(upper, 8) if upper is not None else None,
- "lower": round(lower, 8) if lower is not None else None,
- }
- },
- "vwap": round(vwap_value, 8) if vwap_value is not None else None,
- "reversal": {
- "direction": reversal_direction,
- "score": round(reversal_score, 4),
- },
- "meta": {
- "trend_direction": trend_direction,
- "trend_strength": round(trend_strength, 6),
- "price_location": price_location,
- "candle_count": len(candles),
- },
- }
- return regime
- def build_regimes(candles: list[Candle], timeframes: tuple[str, ...] = DEFAULT_TIMEFRAMES) -> list[dict[str, Any]]:
- regimes: list[dict[str, Any]] = []
- for timeframe in timeframes:
- if timeframe == "1m":
- tf_candles = candles
- else:
- tf_candles = _group_by_timeframe(candles, timeframe)
- if tf_candles:
- regimes.append(_window_regime(tf_candles, timeframe))
- return [r for r in regimes if r is not None]
- def _wallet_state(config: ReplayConfig, close: float) -> dict[str, Any]:
- base_value = config.base_balance * close
- quote_value = config.quote_balance
- total_value = base_value + quote_value
- base_ratio = base_value / total_value if total_value else 0.5
- quote_ratio = quote_value / total_value if total_value else 0.5
- imbalance = abs(base_ratio - 0.5)
- return {
- "inventory_state": config.inventory_state,
- "rebalance_needed": config.rebalance_needed,
- "grid_ready": config.inventory_state == "balanced",
- "base_ratio": round(base_ratio, 4),
- "quote_ratio": round(quote_ratio, 4),
- "imbalance_score": round(imbalance, 4),
- }
- def _strategies(config: ReplayConfig) -> list[dict[str, Any]]:
- return [
- {
- "id": "grid-1",
- "strategy_type": "grid_trader",
- "mode": "active",
- "account_id": config.account_id,
- "market_symbol": config.market_symbol,
- "state": {},
- "config": {},
- },
- {
- "id": "trend-1",
- "strategy_type": "trend_follower",
- "mode": "off",
- "account_id": config.account_id,
- "market_symbol": config.market_symbol,
- "state": {},
- "config": {"trade_side": "both"},
- },
- {
- "id": "protect-1",
- "strategy_type": "exposure_protector",
- "mode": "off",
- "account_id": config.account_id,
- "market_symbol": config.market_symbol,
- "state": {},
- "config": {},
- },
- ]
- def _future_return(candles: list[Candle], index: int, horizon_bars: int) -> float | None:
- future_index = index + horizon_bars
- if future_index >= len(candles):
- return None
- start = candles[index].close
- end = candles[future_index].close
- if start == 0:
- return None
- return ((end - start) / start) * 100.0
- def _fee_adjusted_return(future_return_pct: float | None, fee_rate: float) -> float | None:
- if future_return_pct is None:
- return None
- return future_return_pct - (fee_rate * 100.0 * 2.0)
- def _direction_from_decision(decision_action: str, narrative: dict[str, Any]) -> str | None:
- if "trend" in decision_action:
- breakout = narrative.get("grid_breakout_pressure") if isinstance(narrative.get("grid_breakout_pressure"), dict) else {}
- meso_bias = str(breakout.get("meso_bias") or "")
- if meso_bias in {"bullish", "bearish"}:
- return meso_bias
- stance = str(narrative.get("stance") or "")
- if "bullish" in stance:
- return "bullish"
- if "bearish" in stance:
- return "bearish"
- return None
- def _score_row(decision_action: str, future_return_pct: float | None, fee_adjusted_return_pct: float | None) -> float:
- if future_return_pct is None or fee_adjusted_return_pct is None:
- return 0.0
- if decision_action == "keep_grid":
- return 1.0 if abs(future_return_pct) < 0.25 else -abs(fee_adjusted_return_pct) / 10.0
- if "trend" in decision_action:
- return fee_adjusted_return_pct / 5.0
- if "protect" in decision_action or "rebalance" in decision_action:
- return max(0.0, 0.5 - abs(future_return_pct) / 10.0)
- return future_return_pct / 10.0
- def run_replay(*, candles: list[Candle], config: ReplayConfig, lookback_bars: int = 2000, progress_every: int = 2000) -> list[ReplayRow]:
- if len(candles) < 50:
- return []
- rows: list[ReplayRow] = []
- start_index = max(50, lookback_bars)
- total = max(0, (len(candles) - config.horizon_bars) - start_index)
- for i, index in enumerate(range(start_index, len(candles) - config.horizon_bars)):
- if progress_every and (i % progress_every == 0):
- print(f"replay {i}/{total}", file=sys.stderr)
- window = candles[max(0, index - lookback_bars + 1) : index + 1]
- current = candles[index]
- regimes = build_regimes(window, config.timeframes)
- concern = {
- "id": "sim-concern",
- "account_id": config.account_id,
- "market_symbol": config.market_symbol,
- "base_currency": config.base_currency,
- "quote_currency": config.quote_currency,
- }
- account_info = {
- "balances": [
- {"asset_code": config.base_currency, "available": config.base_balance},
- {"asset_code": config.quote_currency, "available": config.quote_balance},
- ]
- }
- state_payload = synthesize_state(concern=concern, regimes=regimes, account_info=account_info)
- narrative = build_narrative(concern=concern, state_payload=state_payload.payload)
- wallet_state = _wallet_state(config, current.close)
- decision = make_decision(
- concern=concern,
- narrative_payload=narrative.payload,
- wallet_state=wallet_state,
- strategies=_strategies(config),
- history_window={
- "window_seconds": timeframe_seconds("1m") * config.horizon_bars,
- "recent_states": [],
- },
- )
- future_return_pct = _future_return(candles, index, config.horizon_bars)
- fee_adjusted = _fee_adjusted_return(future_return_pct, config.fee_rate)
- score = _score_row(decision.action, future_return_pct, fee_adjusted)
- rows.append(
- ReplayRow(
- timestamp=current.timestamp.isoformat(),
- close=current.close,
- decision_mode=decision.mode,
- decision_action=decision.action,
- target_strategy=decision.target_strategy,
- confidence=decision.confidence,
- future_return_pct=future_return_pct,
- fee_adjusted_future_return_pct=fee_adjusted,
- score=score,
- reason_summary=decision.reason_summary,
- payload={
- "decision": decision.payload,
- "state": state_payload.payload,
- "narrative": narrative.payload,
- },
- )
- )
- return rows
- def rows_to_jsonl(rows: list[ReplayRow]) -> str:
- return "\n".join(json.dumps(asdict(row), ensure_ascii=False) for row in rows)
|