Răsfoiți Sursa

small decision fix, simulation started

Lukas Goldschmidt 3 săptămâni în urmă
părinte
comite
ec51124bbc

+ 8 - 0
simulation/.gitignore

@@ -0,0 +1,8 @@
+# Downloaded market data and generated fixtures
+/data/
+/downloads/
+*.csv
+*.json
+*.parquet
+*.feather
+*.sqlite3

+ 18 - 0
simulation/PLAN.md

@@ -0,0 +1,18 @@
+# Simulation Plan
+
+## Goal
+Build a minimal but useful Hermes-MCP simulation pipeline with clean separation between data acquisition, preprocessing, and replay/analysis.
+
+## Current structure
+- `download_candles.py` downloads raw Binance archives into `simulation/data/raw/`
+- `prepare_candles.py` converts raw archives into a canonical prepared CSV plus manifest
+- `run_replay.py` replays Hermes against the prepared input only
+
+## Next steps
+1. Add month-append support so new raw archives can be added without rebuilding everything.
+2. Add prepared-dataset merging so multiple months become one continuous test set.
+3. Add basic evaluation reports for decision quality and fee-adjusted outcomes.
+4. Add optional parameter sweeps for unknowns in the decision layer.
+
+## Working rule
+Only the prepared dataset should be used for replay and optimization. Raw downloads stay isolated in `simulation/data/raw/`.

+ 33 - 0
simulation/README.md

@@ -0,0 +1,33 @@
+# Simulation
+
+Root for Hermes-MCP simulation and replay experiments.
+
+## Pipeline
+1. download raw archives,
+2. preprocess into canonical input CSV,
+3. run replay / analysis on the prepared input only.
+
+## Download raw Binance data
+```bash
+python3 simulation/download_candles.py \
+  --symbol XRPUSDT \
+  --interval 1m \
+  --start 2024-01-01T00:00:00Z \
+  --end 2024-02-01T00:00:00Z
+```
+
+## Preprocess into test input
+```bash
+python3 simulation/prepare_candles.py \
+  --symbol XRPUSDT \
+  --interval 1m
+```
+
+## Replay Hermes
+```bash
+python3 simulation/run_replay.py \
+  --candles simulation/data/prepared/XRPUSDT/1m/XRPUSDT-1m.csv \
+  --symbol XRPUSDT \
+  --base XRP \
+  --quote USDT
+```

+ 81 - 0
simulation/download_candles.py

@@ -0,0 +1,81 @@
+from __future__ import annotations
+
+from datetime import date, datetime, timezone
+from pathlib import Path
+import argparse
+import json
+import sys
+
+ROOT = Path(__file__).resolve().parents[1]
+SRC = ROOT / "src"
+SIM_SRC = ROOT / "simulation" / "src"
+for path in (str(SRC), str(SIM_SRC)):
+    if path not in sys.path:
+        sys.path.insert(0, path)
+
+from hermes_sim.binance_bulk import _daily_iter, _month_iter, download_day_archive, download_month_archive  # noqa: E402
+
+
+def _parse_dt(value: str) -> datetime:
+    text = value.strip()
+    if text.endswith("Z"):
+        text = text[:-1] + "+00:00"
+    dt = datetime.fromisoformat(text)
+    if dt.tzinfo is None:
+        return dt.replace(tzinfo=timezone.utc)
+    return dt.astimezone(timezone.utc)
+
+
+def _download_bulk_archives(*, symbol: str, interval: str, start: datetime, end: datetime, raw_dir: Path) -> list[str]:
+    downloaded: list[str] = []
+    start_day = start.date()
+    end_day = end.date()
+    for year, month in _month_iter(start_day, end_day):
+        month_path = download_month_archive(symbol, interval, year, month, cache_dir=raw_dir)
+        if month_path is not None:
+            downloaded.append(str(month_path))
+            continue
+
+        month_start = date(year, month, 1)
+        next_month = date(year + 1, 1, 1) if month == 12 else date(year, month + 1, 1)
+        month_end = next_month.fromordinal(next_month.toordinal() - 1)
+        for day in _daily_iter(max(start_day, month_start), min(end_day, month_end)):
+            day_path = download_day_archive(symbol, interval, day, cache_dir=raw_dir)
+            if day_path is not None:
+                downloaded.append(str(day_path))
+    return downloaded
+
+
+def main() -> int:
+    parser = argparse.ArgumentParser(description="Download raw Binance candle archives for Hermes simulation.")
+    parser.add_argument("--symbol", default="XRPUSDT")
+    parser.add_argument("--interval", default="1m")
+    parser.add_argument("--start", required=True, help="ISO timestamp, e.g. 2024-01-01T00:00:00Z")
+    parser.add_argument("--end", required=True, help="ISO timestamp, e.g. 2024-02-01T00:00:00Z")
+    parser.add_argument("--raw-dir", default=str(ROOT / "simulation" / "data" / "raw"))
+    parser.add_argument("--manifest-out", default=str(ROOT / "simulation" / "data" / "manifests" / "raw_download_manifest.json"))
+    args = parser.parse_args()
+
+    start = _parse_dt(args.start)
+    end = _parse_dt(args.end)
+    raw_dir = Path(args.raw_dir)
+    downloaded = _download_bulk_archives(symbol=args.symbol, interval=args.interval, start=start, end=end, raw_dir=raw_dir)
+    manifest = {
+        "symbol": args.symbol.upper(),
+        "interval": args.interval,
+        "start": start.isoformat(),
+        "end": end.isoformat(),
+        "source_kind": "bulk",
+        "raw_dir": str(raw_dir),
+        "archives": downloaded,
+        "archive_count": len(downloaded),
+    }
+    manifest_path = Path(args.manifest_out)
+    manifest_path.parent.mkdir(parents=True, exist_ok=True)
+    manifest_path.write_text(json.dumps(manifest, indent=2, ensure_ascii=False) + "\n", encoding="utf-8")
+    print(json.dumps({"downloaded": len(downloaded), "manifest": str(manifest_path)}, indent=2))
+    return 0
+
+
+if __name__ == "__main__":
+    raise SystemExit(main())

+ 52 - 0
simulation/prepare_candles.py

@@ -0,0 +1,52 @@
+from __future__ import annotations
+
+from pathlib import Path
+import argparse
+import json
+import sys
+
+ROOT = Path(__file__).resolve().parents[1]
+SRC = ROOT / "src"
+SIM_SRC = ROOT / "simulation" / "src"
+for path in (str(SRC), str(SIM_SRC)):
+    if path not in sys.path:
+        sys.path.insert(0, path)
+
+from hermes_sim.candles import Candle  # noqa: E402
+from hermes_sim.preprocess import build_manifest, discover_raw_archives, load_raw_archives, write_manifest, write_prepared_csv  # noqa: E402
+
+
+def main() -> int:
+    parser = argparse.ArgumentParser(description="Preprocess raw Binance archives into a canonical Hermes input CSV.")
+    parser.add_argument("--symbol", default="XRPUSDT")
+    parser.add_argument("--interval", default="1m")
+    parser.add_argument("--raw-dir", default=str(ROOT / "simulation" / "data" / "raw"))
+    parser.add_argument("--out", default=None, help="Output CSV path. Defaults to simulation/data/prepared/<symbol>/<interval>/dataset.csv")
+    parser.add_argument("--manifest-out", default=None, help="Output manifest JSON path. Defaults beside the CSV.")
+    args = parser.parse_args()
+
+    raw_dir = Path(args.raw_dir)
+    archives = discover_raw_archives(raw_dir, args.symbol, args.interval)
+    if not archives:
+        raise SystemExit(f"No raw archives found under {raw_dir}")
+
+    candles = load_raw_archives(archives)
+    out_path = Path(args.out) if args.out else ROOT / "simulation" / "data" / "prepared" / args.symbol.upper() / args.interval / f"{args.symbol.upper()}-{args.interval}.csv"
+    write_prepared_csv(out_path, candles)
+
+    dataset = build_manifest(
+        symbol=args.symbol,
+        interval=args.interval,
+        source_kind="bulk",
+        source_files=archives,
+        candles=candles,
+        output_csv=out_path,
+    )
+    manifest_path = Path(args.manifest_out) if args.manifest_out else out_path.with_suffix(".manifest.json")
+    write_manifest(manifest_path, dataset)
+    print(json.dumps({"candles": len(candles), "csv": str(out_path), "manifest": str(manifest_path)}, indent=2))
+    return 0
+
+
+if __name__ == "__main__":
+    raise SystemExit(main())

+ 66 - 0
simulation/run_replay.py

@@ -0,0 +1,66 @@
+from __future__ import annotations
+
+from pathlib import Path
+import argparse
+import json
+import sys
+
+ROOT = Path(__file__).resolve().parents[1]
+SRC = ROOT / "src"
+SIM_SRC = ROOT / "simulation" / "src"
+for path in (str(SRC), str(SIM_SRC)):
+    if path not in sys.path:
+        sys.path.insert(0, path)
+
+from hermes_sim.candles import load_candles_csv
+from hermes_sim.harness import ReplayConfig, rows_to_jsonl, run_replay
+
+
+def main() -> int:
+    parser = argparse.ArgumentParser(description="Replay Hermes decisions from historic candles.")
+    parser.add_argument("--candles", required=True, help="Path to a CSV with timestamp,open,high,low,close,volume columns")
+    parser.add_argument("--symbol", default="XRPUSD")
+    parser.add_argument("--base", default="XRP")
+    parser.add_argument("--quote", default="USD")
+    parser.add_argument("--account-id", default="sim-account")
+    parser.add_argument("--fee-rate", type=float, default=0.004)
+    parser.add_argument("--horizon-bars", type=int, default=30)
+    parser.add_argument("--base-balance", type=float, default=500.0)
+    parser.add_argument("--quote-balance", type=float, default=500.0)
+    parser.add_argument("--inventory-state", default="balanced")
+    parser.add_argument("--rebalance-needed", action="store_true")
+    parser.add_argument("--out", help="Optional JSONL output file")
+    args = parser.parse_args()
+
+    candles = load_candles_csv(args.candles)
+    config = ReplayConfig(
+        market_symbol=args.symbol,
+        base_currency=args.base,
+        quote_currency=args.quote,
+        account_id=args.account_id,
+        fee_rate=args.fee_rate,
+        horizon_bars=args.horizon_bars,
+        base_balance=args.base_balance,
+        quote_balance=args.quote_balance,
+        inventory_state=args.inventory_state,
+        rebalance_needed=args.rebalance_needed,
+    )
+    rows = run_replay(candles=candles, config=config)
+    output = rows_to_jsonl(rows)
+
+    if args.out:
+        Path(args.out).write_text(output + ("\n" if output else ""), encoding="utf-8")
+    else:
+        print(output)
+
+    summary = {
+        "candles": len(candles),
+        "rows": len(rows),
+        "avg_score": round(sum(r.score for r in rows) / len(rows), 4) if rows else 0.0,
+    }
+    print(json.dumps(summary, indent=2), file=sys.stderr)
+    return 0
+
+
+if __name__ == "__main__":
+    raise SystemExit(main())

+ 13 - 0
simulation/src/hermes_sim/__init__.py

@@ -0,0 +1,13 @@
+"""Hermes-MCP simulation helpers."""
+
+from .candles import Candle, load_candles_csv, resample_candles
+from .harness import ReplayConfig, ReplayRow, run_replay
+
+__all__ = [
+    "Candle",
+    "ReplayConfig",
+    "ReplayRow",
+    "load_candles_csv",
+    "resample_candles",
+    "run_replay",
+]

+ 201 - 0
simulation/src/hermes_sim/binance_bulk.py

@@ -0,0 +1,201 @@
+from __future__ import annotations
+
+from dataclasses import dataclass
+from datetime import datetime, timezone, date
+from io import BytesIO, TextIOWrapper
+from pathlib import Path
+from typing import Iterable
+from urllib.error import HTTPError
+from urllib.request import urlopen
+import csv
+import zipfile
+
+from .candles import Candle
+
+SPOT_MONTHLY_BASE = "https://data.binance.vision/data/spot/monthly/klines"
+SPOT_DAILY_BASE = "https://data.binance.vision/data/spot/daily/klines"
+
+
+@dataclass(frozen=True)
+class BinanceBulkSource:
+    symbol: str
+    interval: str = "1m"
+    market: str = "spot"
+    cache_dir: Path | None = None
+
+
+def _normalize_symbol(symbol: str) -> str:
+    return symbol.strip().upper()
+
+
+def _month_iter(start: date, end: date) -> Iterable[tuple[int, int]]:
+    year = start.year
+    month = start.month
+    while (year, month) <= (end.year, end.month):
+        yield year, month
+        if month == 12:
+            year += 1
+            month = 1
+        else:
+            month += 1
+
+
+def _daily_iter(start: date, end: date) -> Iterable[date]:
+    current = start
+    while current <= end:
+        yield current
+        current = current.fromordinal(current.toordinal() + 1)
+
+
+def _month_url(symbol: str, interval: str, year: int, month: int) -> str:
+    return f"{SPOT_MONTHLY_BASE}/{symbol}/{interval}/{symbol}-{interval}-{year:04d}-{month:02d}.zip"
+
+
+def _day_url(symbol: str, interval: str, day: date) -> str:
+    return f"{SPOT_DAILY_BASE}/{symbol}/{interval}/{symbol}-{interval}-{day:%Y-%m-%d}.zip"
+
+
+def _cache_path(base: Path, symbol: str, interval: str, kind: str, suffix: str) -> Path:
+    return base / "binance" / "spot" / kind / "klines" / symbol / interval / suffix
+
+
+def _open_url(url: str) -> bytes:
+    with urlopen(url, timeout=60) as response:
+        return response.read()
+
+
+def _load_zip_candles(blob: bytes) -> list[Candle]:
+    with zipfile.ZipFile(BytesIO(blob)) as archive:
+        names = [name for name in archive.namelist() if name.lower().endswith(".csv")]
+        if not names:
+            raise ValueError("Zip file did not contain a CSV")
+        with archive.open(names[0]) as raw:
+            text = TextIOWrapper(raw, encoding="utf-8")
+            reader = csv.reader(text)
+            out: list[Candle] = []
+            for row in reader:
+                if not row:
+                    continue
+                if len(row) < 6:
+                    continue
+                open_time = int(float(row[0]))
+                out.append(
+                    Candle(
+                        timestamp=datetime.fromtimestamp(open_time / 1000.0, tz=timezone.utc),
+                        open=float(row[1]),
+                        high=float(row[2]),
+                        low=float(row[3]),
+                        close=float(row[4]),
+                        volume=float(row[5]),
+                    )
+                )
+            return out
+
+
+def load_candles_from_zip_path(path: Path) -> list[Candle]:
+    return _load_zip_candles(path.read_bytes())
+
+
+def _download_archive(url: str, cache_path: Path | None = None) -> bytes | None:
+    if cache_path is not None and cache_path.exists():
+        return cache_path.read_bytes()
+    try:
+        blob = _open_url(url)
+    except HTTPError as exc:
+        if exc.code == 404:
+            return None
+        raise
+    if cache_path is not None:
+        cache_path.parent.mkdir(parents=True, exist_ok=True)
+        cache_path.write_bytes(blob)
+    return blob
+
+
+def download_month_archive(symbol: str, interval: str, year: int, month: int, cache_dir: Path | None = None) -> Path | None:
+    symbol = _normalize_symbol(symbol)
+    cache_path = None
+    if cache_dir is not None:
+        cache_path = _cache_path(cache_dir, symbol, interval, "monthly", f"{symbol}-{interval}-{year:04d}-{month:02d}.zip")
+        if cache_path.exists():
+            return cache_path
+    blob = _download_archive(_month_url(symbol, interval, year, month), cache_path=cache_path)
+    if blob is None:
+        return None
+    if cache_path is not None:
+        return cache_path
+    return None
+
+
+def download_day_archive(symbol: str, interval: str, day: date, cache_dir: Path | None = None) -> Path | None:
+    symbol = _normalize_symbol(symbol)
+    cache_path = None
+    if cache_dir is not None:
+        cache_path = _cache_path(cache_dir, symbol, interval, "daily", f"{symbol}-{interval}-{day:%Y-%m-%d}.zip")
+        if cache_path.exists():
+            return cache_path
+    blob = _download_archive(_day_url(symbol, interval, day), cache_path=cache_path)
+    if blob is None:
+        return None
+    if cache_path is not None:
+        return cache_path
+    return None
+
+
+def download_month(symbol: str, interval: str, year: int, month: int, cache_dir: Path | None = None) -> list[Candle] | None:
+    symbol = _normalize_symbol(symbol)
+    url = _month_url(symbol, interval, year, month)
+    cache_path = None
+    if cache_dir is not None:
+        cache_path = _cache_path(cache_dir, symbol, interval, "monthly", f"{symbol}-{interval}-{year:04d}-{month:02d}.zip")
+        if cache_path.exists():
+            return _load_zip_candles(cache_path.read_bytes())
+    try:
+        blob = _open_url(url)
+    except HTTPError as exc:
+        if exc.code == 404:
+            return None
+        raise
+    if cache_path is not None:
+        cache_path.parent.mkdir(parents=True, exist_ok=True)
+        cache_path.write_bytes(blob)
+    return _load_zip_candles(blob)
+
+
+def download_day(symbol: str, interval: str, day: date, cache_dir: Path | None = None) -> list[Candle] | None:
+    symbol = _normalize_symbol(symbol)
+    url = _day_url(symbol, interval, day)
+    cache_path = None
+    if cache_dir is not None:
+        cache_path = _cache_path(cache_dir, symbol, interval, "daily", f"{symbol}-{interval}-{day:%Y-%m-%d}.zip")
+        if cache_path.exists():
+            return _load_zip_candles(cache_path.read_bytes())
+    try:
+        blob = _open_url(url)
+    except HTTPError as exc:
+        if exc.code == 404:
+            return None
+        raise
+    if cache_path is not None:
+        cache_path.parent.mkdir(parents=True, exist_ok=True)
+        cache_path.write_bytes(blob)
+    return _load_zip_candles(blob)
+
+
+def bulk_download_candles(*, symbol: str, interval: str, start: datetime, end: datetime, cache_dir: Path | None = None) -> list[Candle]:
+    symbol = _normalize_symbol(symbol)
+    start_day = start.date()
+    end_day = end.date()
+    candles: list[Candle] = []
+    for year, month in _month_iter(start_day, end_day):
+        month_candles = download_month(symbol, interval, year, month, cache_dir=cache_dir)
+        if month_candles is None:
+            month_start = date(year, month, 1)
+            next_month = date(year + 1, 1, 1) if month == 12 else date(year, month + 1, 1)
+            month_end = next_month.fromordinal(next_month.toordinal() - 1)
+            for day in _daily_iter(max(start_day, month_start), min(end_day, month_end)):
+                day_candles = download_day(symbol, interval, day, cache_dir=cache_dir)
+                if day_candles:
+                    candles.extend(day_candles)
+            continue
+        candles.extend(month_candles)
+    return [c for c in candles if start <= c.timestamp <= end]

+ 113 - 0
simulation/src/hermes_sim/candles.py

@@ -0,0 +1,113 @@
+from __future__ import annotations
+
+from dataclasses import dataclass
+from datetime import datetime, timezone
+from pathlib import Path
+from typing import Iterable
+import csv
+import math
+
+_TIMEFRAME_SECONDS = {
+    "1m": 60,
+    "5m": 5 * 60,
+    "15m": 15 * 60,
+    "1h": 60 * 60,
+    "4h": 4 * 60 * 60,
+    "1d": 24 * 60 * 60,
+}
+
+
+@dataclass(frozen=True)
+class Candle:
+    timestamp: datetime
+    open: float
+    high: float
+    low: float
+    close: float
+    volume: float
+
+
+def _parse_timestamp(value: str) -> datetime:
+    text = value.strip()
+    if text.endswith("Z"):
+        text = text[:-1] + "+00:00"
+    dt = datetime.fromisoformat(text)
+    if dt.tzinfo is None:
+        return dt.replace(tzinfo=timezone.utc)
+    return dt.astimezone(timezone.utc)
+
+
+def load_candles_csv(path: str | Path) -> list[Candle]:
+    with Path(path).open(newline="", encoding="utf-8") as fh:
+        reader = csv.DictReader(fh)
+        candles: list[Candle] = []
+        for row in reader:
+            candles.append(
+                Candle(
+                    timestamp=_parse_timestamp(row["timestamp"]),
+                    open=float(row["open"]),
+                    high=float(row["high"]),
+                    low=float(row["low"]),
+                    close=float(row["close"]),
+                    volume=float(row.get("volume") or 0.0),
+                )
+            )
+    candles.sort(key=lambda candle: candle.timestamp)
+    return candles
+
+
+def _floor_timestamp(ts: datetime, bucket_seconds: int) -> datetime:
+    epoch = int(ts.timestamp())
+    floored = (epoch // bucket_seconds) * bucket_seconds
+    return datetime.fromtimestamp(floored, tz=timezone.utc)
+
+
+def resample_candles(candles: Iterable[Candle], timeframe: str) -> list[Candle]:
+    bucket_seconds = _TIMEFRAME_SECONDS.get(timeframe)
+    if bucket_seconds is None:
+        raise ValueError(f"Unsupported timeframe: {timeframe}")
+
+    buckets: dict[datetime, list[Candle]] = {}
+    for candle in candles:
+        bucket = _floor_timestamp(candle.timestamp, bucket_seconds)
+        buckets.setdefault(bucket, []).append(candle)
+
+    out: list[Candle] = []
+    for bucket_start in sorted(buckets):
+        chunk = buckets[bucket_start]
+        out.append(
+            Candle(
+                timestamp=bucket_start,
+                open=chunk[0].open,
+                high=max(c.high for c in chunk),
+                low=min(c.low for c in chunk),
+                close=chunk[-1].close,
+                volume=sum(c.volume for c in chunk),
+            )
+        )
+    return out
+
+
+def timeframe_seconds(timeframe: str) -> int:
+    seconds = _TIMEFRAME_SECONDS.get(timeframe)
+    if seconds is None:
+        raise ValueError(f"Unsupported timeframe: {timeframe}")
+    return seconds
+
+
+def slice_through(candles: list[Candle], timestamp: datetime) -> list[Candle]:
+    return [c for c in candles if c.timestamp <= timestamp]
+
+
+def infer_common_timeframe(candles: list[Candle]) -> str:
+    if len(candles) < 2:
+        return "1m"
+    delta = candles[1].timestamp - candles[0].timestamp
+    minutes = max(1, int(round(delta.total_seconds() / 60.0)))
+    if minutes >= 1440 and minutes % 1440 == 0:
+        return "1d"
+    if minutes >= 240 and minutes % 240 == 0:
+        return "4h"
+    if minutes >= 60 and minutes % 60 == 0:
+        return f"{minutes // 60}h"
+    return f"{minutes}m"

+ 329 - 0
simulation/src/hermes_sim/harness.py

@@ -0,0 +1,329 @@
+from __future__ import annotations
+
+from dataclasses import dataclass, asdict
+from datetime import datetime, timedelta, timezone
+from pathlib import Path
+from typing import Any
+import json
+
+from .candles import Candle, load_candles_csv, resample_candles, slice_through, timeframe_seconds
+from .indicators import atr, bollinger, ema, macd_histogram, rsi, sma, vwap
+
+
+def _bootstrap_hermes_imports() -> None:
+    import sys
+    root = Path(__file__).resolve().parents[3]
+    src = root / "src"
+    if str(src) not in sys.path:
+        sys.path.insert(0, str(src))
+
+
+_bootstrap_hermes_imports()
+from hermes_mcp.decision_engine import make_decision  # noqa: E402
+from hermes_mcp.narrative_engine import build_narrative  # noqa: E402
+from hermes_mcp.state_engine import synthesize_state  # noqa: E402
+
+
+DEFAULT_TIMEFRAMES = ("1m", "5m", "15m", "1h", "4h", "1d")
+
+
+@dataclass(frozen=True)
+class ReplayConfig:
+    market_symbol: str = "XRPUSD"
+    base_currency: str = "XRP"
+    quote_currency: str = "USD"
+    account_id: str = "sim-account"
+    fee_rate: float = 0.004
+    horizon_bars: int = 30
+    timeframes: tuple[str, ...] = DEFAULT_TIMEFRAMES
+    base_balance: float = 500.0
+    quote_balance: float = 500.0
+    inventory_state: str = "balanced"
+    rebalance_needed: bool = False
+
+
+@dataclass(frozen=True)
+class ReplayRow:
+    timestamp: str
+    close: float
+    decision_mode: str
+    decision_action: str
+    target_strategy: str | None
+    confidence: float
+    future_return_pct: float | None
+    fee_adjusted_future_return_pct: float | None
+    score: float
+    reason_summary: str
+    payload: dict[str, Any]
+
+
+def _group_by_timeframe(candles: list[Candle], timeframe: str) -> list[Candle]:
+    if timeframe == "1m":
+        return candles
+    return resample_candles(candles, timeframe)
+
+
+def _window_regime(candles: list[Candle], timeframe: str) -> dict[str, Any] | None:
+    if not candles:
+        return None
+    closes = [c.close for c in candles]
+    highs = [c.high for c in candles]
+    lows = [c.low for c in candles]
+    volumes = [c.volume for c in candles]
+
+    ema_fast = ema(closes, 8)
+    ema_slow = ema(closes, 21)
+    sma_long = sma(closes, 50)
+    price = closes[-1]
+    rsi_value = rsi(closes, 14)
+    macd_value = macd_histogram(closes)
+    atr_value = atr(highs, lows, closes, 14)
+    middle, upper, lower = bollinger(closes, 20, 2.0)
+    vwap_value = vwap(highs, lows, closes, volumes)
+
+    atr_percent = None
+    if atr_value is not None and price:
+        atr_percent = (atr_value / price) * 100.0
+
+    reversal_direction = "none"
+    reversal_score = 0.0
+    if rsi_value is not None:
+        if rsi_value >= 70:
+            reversal_direction = "down"
+            reversal_score = min((rsi_value - 70.0) * 2.0, 100.0)
+        elif rsi_value <= 30:
+            reversal_direction = "up"
+            reversal_score = min((30.0 - rsi_value) * 2.0, 100.0)
+
+    if ema_fast is not None and ema_slow is not None:
+        if ema_fast > ema_slow:
+            trend_strength = min(((ema_fast - ema_slow) / price) * 100.0 if price else 0.0, 5.0)
+            trend_direction = "bullish"
+        else:
+            trend_strength = min(((ema_slow - ema_fast) / price) * 100.0 if price else 0.0, 5.0)
+            trend_direction = "bearish"
+    else:
+        trend_strength = 0.0
+        trend_direction = "mixed"
+
+    if upper is not None and lower is not None and price:
+        band_span = max(upper - lower, 1e-9)
+        band_pos = (price - lower) / band_span
+        if band_pos >= 0.85:
+            price_location = "near_upper_band"
+        elif band_pos <= 0.15:
+            price_location = "near_lower_band"
+        elif band_pos >= 0.6:
+            price_location = "upper_half"
+        elif band_pos <= 0.4:
+            price_location = "lower_half"
+        else:
+            price_location = "centered"
+    else:
+        price_location = "unknown"
+
+    regime = {
+        "timeframe": timeframe,
+        "price": round(price, 8),
+        "trend": {
+            "ema_fast": round(ema_fast, 8) if ema_fast is not None else None,
+            "ema_slow": round(ema_slow, 8) if ema_slow is not None else None,
+            "sma_long": round(sma_long, 8) if sma_long is not None else None,
+        },
+        "momentum": {
+            "rsi": round(rsi_value, 4) if rsi_value is not None else None,
+            "macd_histogram": round(macd_value, 8) if macd_value is not None else None,
+        },
+        "volatility": {
+            "atr": round(atr_value, 8) if atr_value is not None else None,
+            "atr_percent": round(atr_percent, 8) if atr_percent is not None else None,
+        },
+        "bands": {
+            "bollinger": {
+                "middle": round(middle, 8) if middle is not None else None,
+                "upper": round(upper, 8) if upper is not None else None,
+                "lower": round(lower, 8) if lower is not None else None,
+            }
+        },
+        "vwap": round(vwap_value, 8) if vwap_value is not None else None,
+        "reversal": {
+            "direction": reversal_direction,
+            "score": round(reversal_score, 4),
+        },
+        "meta": {
+            "trend_direction": trend_direction,
+            "trend_strength": round(trend_strength, 6),
+            "price_location": price_location,
+            "candle_count": len(candles),
+        },
+    }
+    return regime
+
+
+def build_regimes(candles: list[Candle], timeframes: tuple[str, ...] = DEFAULT_TIMEFRAMES) -> list[dict[str, Any]]:
+    regimes: list[dict[str, Any]] = []
+    for timeframe in timeframes:
+        if timeframe == "1m":
+            tf_candles = candles
+        else:
+            tf_candles = _group_by_timeframe(candles, timeframe)
+        if tf_candles:
+            regimes.append(_window_regime(tf_candles, timeframe))
+    return [r for r in regimes if r is not None]
+
+
+def _wallet_state(config: ReplayConfig, close: float) -> dict[str, Any]:
+    base_value = config.base_balance * close
+    quote_value = config.quote_balance
+    total_value = base_value + quote_value
+    base_ratio = base_value / total_value if total_value else 0.5
+    quote_ratio = quote_value / total_value if total_value else 0.5
+    imbalance = abs(base_ratio - 0.5)
+    return {
+        "inventory_state": config.inventory_state,
+        "rebalance_needed": config.rebalance_needed,
+        "grid_ready": config.inventory_state == "balanced",
+        "base_ratio": round(base_ratio, 4),
+        "quote_ratio": round(quote_ratio, 4),
+        "imbalance_score": round(imbalance, 4),
+    }
+
+
+def _strategies(config: ReplayConfig) -> list[dict[str, Any]]:
+    return [
+        {
+            "id": "grid-1",
+            "strategy_type": "grid_trader",
+            "mode": "active",
+            "account_id": config.account_id,
+            "market_symbol": config.market_symbol,
+            "state": {},
+            "config": {},
+        },
+        {
+            "id": "trend-1",
+            "strategy_type": "trend_follower",
+            "mode": "off",
+            "account_id": config.account_id,
+            "market_symbol": config.market_symbol,
+            "state": {},
+            "config": {"trade_side": "both"},
+        },
+        {
+            "id": "protect-1",
+            "strategy_type": "exposure_protector",
+            "mode": "off",
+            "account_id": config.account_id,
+            "market_symbol": config.market_symbol,
+            "state": {},
+            "config": {},
+        },
+    ]
+
+
+def _future_return(candles: list[Candle], index: int, horizon_bars: int) -> float | None:
+    future_index = index + horizon_bars
+    if future_index >= len(candles):
+        return None
+    start = candles[index].close
+    end = candles[future_index].close
+    if start == 0:
+        return None
+    return ((end - start) / start) * 100.0
+
+
+def _fee_adjusted_return(future_return_pct: float | None, fee_rate: float) -> float | None:
+    if future_return_pct is None:
+        return None
+    return future_return_pct - (fee_rate * 100.0 * 2.0)
+
+
+def _direction_from_decision(decision_action: str, narrative: dict[str, Any]) -> str | None:
+    if "trend" in decision_action:
+        breakout = narrative.get("grid_breakout_pressure") if isinstance(narrative.get("grid_breakout_pressure"), dict) else {}
+        meso_bias = str(breakout.get("meso_bias") or "")
+        if meso_bias in {"bullish", "bearish"}:
+            return meso_bias
+        stance = str(narrative.get("stance") or "")
+        if "bullish" in stance:
+            return "bullish"
+        if "bearish" in stance:
+            return "bearish"
+    return None
+
+
+def _score_row(decision_action: str, future_return_pct: float | None, fee_adjusted_return_pct: float | None) -> float:
+    if future_return_pct is None or fee_adjusted_return_pct is None:
+        return 0.0
+    if decision_action == "keep_grid":
+        return 1.0 if abs(future_return_pct) < 0.25 else -abs(fee_adjusted_return_pct) / 10.0
+    if "trend" in decision_action:
+        return fee_adjusted_return_pct / 5.0
+    if "protect" in decision_action or "rebalance" in decision_action:
+        return max(0.0, 0.5 - abs(future_return_pct) / 10.0)
+    return future_return_pct / 10.0
+
+
+def run_replay(*, candles: list[Candle], config: ReplayConfig) -> list[ReplayRow]:
+    if len(candles) < 50:
+        return []
+
+    rows: list[ReplayRow] = []
+    for index in range(50, len(candles) - config.horizon_bars):
+        window = candles[: index + 1]
+        current = candles[index]
+        regimes = build_regimes(window, config.timeframes)
+        concern = {
+            "id": "sim-concern",
+            "account_id": config.account_id,
+            "market_symbol": config.market_symbol,
+            "base_currency": config.base_currency,
+            "quote_currency": config.quote_currency,
+        }
+        account_info = {
+            "balances": [
+                {"asset_code": config.base_currency, "available": config.base_balance},
+                {"asset_code": config.quote_currency, "available": config.quote_balance},
+            ]
+        }
+        state_payload = synthesize_state(concern=concern, regimes=regimes, account_info=account_info)
+        narrative = build_narrative(concern=concern, state_payload=state_payload.payload)
+        wallet_state = _wallet_state(config, current.close)
+        decision = make_decision(
+            concern=concern,
+            narrative_payload=narrative.payload,
+            wallet_state=wallet_state,
+            strategies=_strategies(config),
+            history_window={
+                "window_seconds": timeframe_seconds("1m") * config.horizon_bars,
+                "recent_states": [],
+            },
+        )
+
+        future_return_pct = _future_return(candles, index, config.horizon_bars)
+        fee_adjusted = _fee_adjusted_return(future_return_pct, config.fee_rate)
+        score = _score_row(decision.action, future_return_pct, fee_adjusted)
+        rows.append(
+            ReplayRow(
+                timestamp=current.timestamp.isoformat(),
+                close=current.close,
+                decision_mode=decision.mode,
+                decision_action=decision.action,
+                target_strategy=decision.target_strategy,
+                confidence=decision.confidence,
+                future_return_pct=future_return_pct,
+                fee_adjusted_future_return_pct=fee_adjusted,
+                score=score,
+                reason_summary=decision.reason_summary,
+                payload={
+                    "decision": decision.payload,
+                    "state": state_payload.payload,
+                    "narrative": narrative.payload,
+                },
+            )
+        )
+    return rows
+
+
+def rows_to_jsonl(rows: list[ReplayRow]) -> str:
+    return "\n".join(json.dumps(asdict(row), ensure_ascii=False) for row in rows)

+ 117 - 0
simulation/src/hermes_sim/indicators.py

@@ -0,0 +1,117 @@
+from __future__ import annotations
+
+from typing import Sequence
+
+
+def _ensure_values(values: Sequence[float]) -> list[float]:
+    return [float(v) for v in values]
+
+
+def sma(values: Sequence[float], period: int) -> float | None:
+    data = _ensure_values(values)
+    if not data:
+        return None
+    window = data[-period:] if len(data) >= period else data
+    return sum(window) / len(window)
+
+
+def ema(values: Sequence[float], period: int) -> float | None:
+    data = _ensure_values(values)
+    if not data:
+        return None
+    k = 2.0 / (period + 1.0)
+    current = data[0]
+    for price in data[1:]:
+        current = price * k + current * (1.0 - k)
+    return current
+
+
+def rsi(values: Sequence[float], period: int = 14) -> float | None:
+    data = _ensure_values(values)
+    if len(data) < 2:
+        return None
+    gains: list[float] = []
+    losses: list[float] = []
+    for prev, curr in zip(data[:-1], data[1:]):
+        change = curr - prev
+        gains.append(max(change, 0.0))
+        losses.append(max(-change, 0.0))
+    avg_gain = sum(gains[:period]) / min(period, len(gains))
+    avg_loss = sum(losses[:period]) / min(period, len(losses))
+    if len(gains) <= period:
+        if avg_loss == 0:
+            return 100.0 if avg_gain > 0 else 50.0
+        rs = avg_gain / avg_loss
+        return 100.0 - (100.0 / (1.0 + rs))
+    for gain, loss in zip(gains[period:], losses[period:]):
+        avg_gain = ((avg_gain * (period - 1)) + gain) / period
+        avg_loss = ((avg_loss * (period - 1)) + loss) / period
+    if avg_loss == 0:
+        return 100.0 if avg_gain > 0 else 50.0
+    rs = avg_gain / avg_loss
+    return 100.0 - (100.0 / (1.0 + rs))
+
+
+def atr(highs: Sequence[float], lows: Sequence[float], closes: Sequence[float], period: int = 14) -> float | None:
+    high_vals = _ensure_values(highs)
+    low_vals = _ensure_values(lows)
+    close_vals = _ensure_values(closes)
+    if not close_vals:
+        return None
+    true_ranges: list[float] = []
+    prev_close = close_vals[0]
+    for high, low, close in zip(high_vals, low_vals, close_vals):
+        tr = max(high - low, abs(high - prev_close), abs(low - prev_close))
+        true_ranges.append(tr)
+        prev_close = close
+    window = true_ranges[-period:] if len(true_ranges) >= period else true_ranges
+    return sum(window) / len(window) if window else None
+
+
+def bollinger(values: Sequence[float], period: int = 20, stdev_mult: float = 2.0) -> tuple[float | None, float | None, float | None]:
+    data = _ensure_values(values)
+    if not data:
+        return None, None, None
+    window = data[-period:] if len(data) >= period else data
+    middle = sum(window) / len(window)
+    if len(window) < 2:
+        return middle, middle, middle
+    variance = sum((x - middle) ** 2 for x in window) / len(window)
+    deviation = variance ** 0.5
+    upper = middle + stdev_mult * deviation
+    lower = middle - stdev_mult * deviation
+    return middle, upper, lower
+
+
+def macd_histogram(values: Sequence[float]) -> float | None:
+    data = _ensure_values(values)
+    if len(data) < 2:
+        return None
+    fast_series = []
+    slow_series = []
+    for i in range(1, len(data) + 1):
+        fast_series.append(ema(data[:i], 12) or data[i - 1])
+        slow_series.append(ema(data[:i], 26) or data[i - 1])
+    macd_line = [f - s for f, s in zip(fast_series, slow_series)]
+    signal = ema(macd_line, 9)
+    if signal is None:
+        return None
+    return macd_line[-1] - signal
+
+
+def vwap(highs: Sequence[float], lows: Sequence[float], closes: Sequence[float], volumes: Sequence[float]) -> float | None:
+    h = _ensure_values(highs)
+    l = _ensure_values(lows)
+    c = _ensure_values(closes)
+    v = _ensure_values(volumes)
+    if not c:
+        return None
+    total_pv = 0.0
+    total_volume = 0.0
+    for high, low, close, volume in zip(h, l, c, v):
+        typical = (high + low + close) / 3.0
+        total_pv += typical * volume
+        total_volume += volume
+    if total_volume == 0:
+        return sum(c) / len(c)
+    return total_pv / total_volume

+ 76 - 0
simulation/src/hermes_sim/preprocess.py

@@ -0,0 +1,76 @@
+from __future__ import annotations
+
+from dataclasses import asdict, dataclass
+from datetime import datetime, timezone
+from pathlib import Path
+from typing import Iterable
+import csv
+import json
+
+from .binance_bulk import load_candles_from_zip_path
+from .candles import Candle
+
+
+@dataclass(frozen=True)
+class PreparedDataset:
+    symbol: str
+    interval: str
+    source_kind: str
+    source_files: list[str]
+    candle_count: int
+    first_timestamp: str | None
+    last_timestamp: str | None
+    output_csv: str
+
+
+def discover_raw_archives(raw_dir: Path, symbol: str, interval: str) -> list[Path]:
+    symbol = symbol.strip().upper()
+    interval = interval.strip()
+    pattern = f"**/{symbol}-{interval}-*.zip"
+    return sorted(p for p in raw_dir.glob(pattern) if p.is_file())
+
+
+def load_raw_archives(paths: Iterable[Path]) -> list[Candle]:
+    candles: dict[datetime, Candle] = {}
+    for path in paths:
+        for candle in load_candles_from_zip_path(path):
+            candles[candle.timestamp] = candle
+    return [candles[key] for key in sorted(candles)]
+
+
+def write_prepared_csv(path: Path, candles: Iterable[Candle]) -> None:
+    path.parent.mkdir(parents=True, exist_ok=True)
+    with path.open("w", newline="", encoding="utf-8") as fh:
+        writer = csv.writer(fh)
+        writer.writerow(["timestamp", "open", "high", "low", "close", "volume"])
+        for candle in candles:
+            writer.writerow([
+                candle.timestamp.isoformat().replace("+00:00", "Z"),
+                candle.open,
+                candle.high,
+                candle.low,
+                candle.close,
+                candle.volume,
+            ])
+
+
+def build_manifest(*, symbol: str, interval: str, source_kind: str, source_files: list[Path], candles: list[Candle], output_csv: Path) -> PreparedDataset:
+    first_timestamp = candles[0].timestamp.isoformat() if candles else None
+    last_timestamp = candles[-1].timestamp.isoformat() if candles else None
+    return PreparedDataset(
+        symbol=symbol.strip().upper(),
+        interval=interval,
+        source_kind=source_kind,
+        source_files=[str(path) for path in source_files],
+        candle_count=len(candles),
+        first_timestamp=first_timestamp,
+        last_timestamp=last_timestamp,
+        output_csv=str(output_csv),
+    )
+
+
+def write_manifest(path: Path, dataset: PreparedDataset) -> None:
+    path.parent.mkdir(parents=True, exist_ok=True)
+    payload = asdict(dataset)
+    payload["generated_at"] = datetime.now(timezone.utc).isoformat()
+    path.write_text(json.dumps(payload, indent=2, ensure_ascii=False) + "\n", encoding="utf-8")

+ 130 - 5
src/hermes_mcp/decision_engine.py

@@ -217,6 +217,7 @@ def normalize_strategy_snapshot(strategy: dict[str, Any]) -> dict[str, Any]:
             "requires_rebalance_before_stop": False,
             "safe_when_unbalanced": True,
             "can_run_with": [],
+            "rebalance_tolerance": 0.3,
         },
     }
     contract = defaults.get(strategy_type, {
@@ -660,6 +661,30 @@ def _grid_fill_fights_breakout(grid_fill: dict[str, Any], breakout: dict[str, An
     return False
 
 
+def _recent_1m_price_trace(history_window: dict[str, Any] | None) -> list[tuple[datetime, float]]:
+    recent_states = history_window.get("recent_states") if isinstance(history_window, dict) and isinstance(history_window.get("recent_states"), list) else []
+    trace: list[tuple[datetime, float]] = []
+    for row in recent_states:
+        if not isinstance(row, dict):
+            continue
+        try:
+            payload = json.loads(row.get("payload_json") or "{}")
+        except Exception:
+            continue
+        features = payload.get("features_by_timeframe") if isinstance(payload.get("features_by_timeframe"), dict) else {}
+        micro = features.get("1m") if isinstance(features.get("1m"), dict) else {}
+        raw = micro.get("raw") if isinstance(micro.get("raw"), dict) else {}
+        price = _safe_float(raw.get("price"))
+        if price is None:
+            continue
+        timestamp = _parse_timestamp(row.get("created_at") or payload.get("generated_at"))
+        if timestamp is None:
+            continue
+        trace.append((timestamp, price))
+    trace.sort(key=lambda item: item[0])
+    return trace
+
+
 def _breakout_direction(breakout: dict[str, Any], stance: str | None = None) -> str | None:
     meso_bias = str(breakout.get("meso_bias") or "")
     micro_bias = str(breakout.get("micro_bias") or "")
@@ -701,6 +726,7 @@ def _extract_decision_signals(*,
     wallet_state: dict[str, Any],
     grid_strategy: dict[str, Any] | None = None,
     breakout: dict[str, Any] | None = None,
+    history_window: dict[str, Any] | None = None,
 ) -> dict[str, Any]:
     scoped = narrative_payload.get("scoped_state") if isinstance(narrative_payload.get("scoped_state"), dict) else {}
     cross = narrative_payload.get("cross_scope_summary") if isinstance(narrative_payload.get("cross_scope_summary"), dict) else {}
@@ -713,6 +739,7 @@ def _extract_decision_signals(*,
     micro_features = features.get("1m") if isinstance(features.get("1m"), dict) else {}
     micro_vol = micro_features.get("volatility") if isinstance(micro_features.get("volatility"), dict) else {}
     micro_raw = micro_features.get("raw") if isinstance(micro_features.get("raw"), dict) else {}
+    recent_prices = _recent_1m_price_trace(history_window)
 
     alignment = str(cross.get("alignment") or "partial_alignment")
     friction = str(cross.get("friction") or "medium")
@@ -822,6 +849,40 @@ def _extract_decision_signals(*,
     if grid_step_pct and grid_step_pct > 0:
         pullback_to_grid_ratio = noise_pct / max(grid_step_pct * 100.0, 0.0001)
 
+    recent_move_pct = 0.0
+    recent_move_window_minutes = 0
+    recent_move_direction = "mixed"
+    if recent_prices:
+        current_price = _safe_float(micro_raw.get("price")) or recent_prices[-1][1]
+        first_price = recent_prices[0][1]
+        if first_price > 0:
+            recent_move_pct = ((current_price - first_price) / first_price) * 100.0
+        recent_move_window_minutes = max(0, int((recent_prices[-1][0] - recent_prices[0][0]).total_seconds() / 60.0))
+        if recent_move_pct > 0:
+            recent_move_direction = "bullish"
+        elif recent_move_pct < 0:
+            recent_move_direction = "bearish"
+    rapid_directional_pressure = bool(
+        recent_move_direction in {"bullish", "bearish"}
+        and abs(recent_move_pct) >= max(0.8, (atr_percent or 0.0) * 2.5)
+        and recent_move_window_minutes >= 10
+        and structural_direction == recent_move_direction
+        and tactical_direction == recent_move_direction
+        and macro_bias == recent_move_direction
+    )
+    if breakout and isinstance(breakout, dict):
+        rapid_directional_pressure = bool(
+            rapid_directional_pressure
+            or (
+                breakout.get("persistent")
+                and str(breakout.get("macro_bias") or "") == recent_move_direction
+                and str(breakout.get("meso_bias") or "") == recent_move_direction
+                and str(breakout.get("micro_bias") or "") == recent_move_direction
+                and abs(recent_move_pct) >= max(0.6, (atr_percent or 0.0) * 1.8)
+            )
+        )
+    rapid_downside_pressure = bool(rapid_directional_pressure and recent_move_direction == "bearish")
+
     harvestability_score = tactical_range_quality * 0.45
     if pullback_to_grid_ratio is not None:
         harvestability_score += min(pullback_to_grid_ratio, 2.0) * 0.22
@@ -856,18 +917,18 @@ def _extract_decision_signals(*,
         and not tactical_easing
     )
     grid_harvestable_now = bool(
-        harvestability_score >= 0.52
-        and wallet_grid_usability >= 0.42
+        harvestability_score >= 0.48
+        and wallet_grid_usability >= 0.35
     )
     rebalancer_release_ready = bool(
         within_rebalance_tolerance
         and (
             (
-                harvestability_score >= 0.45
-                and (tactical_easing or breakout_persistence < 1.0 or tactical_range_quality >= 0.45)
+                harvestability_score >= 0.35
+                and (tactical_easing or breakout_persistence < 1.0 or tactical_range_quality >= 0.35)
             )
             or (wallet_state.get("grid_ready") and breakout_persistence < 1.0)
-            or (tactical_range_quality >= 0.5 and breakout_persistence < 0.65)
+            or (tactical_range_quality >= 0.42 and breakout_persistence < 0.75)
         )
     )
 
@@ -887,7 +948,12 @@ def _extract_decision_signals(*,
         "grid_harvestability_score": harvestability_score,
         "wallet_grid_usability": round(wallet_grid_usability, 4),
         "within_rebalance_tolerance": within_rebalance_tolerance,
+        "rebalance_tolerance": 0.3,
         "trend_following_pressure": trend_following_pressure,
+        "rapid_directional_pressure": rapid_directional_pressure,
+        "rapid_downside_pressure": rapid_downside_pressure,
+        "recent_move_pct": round(recent_move_pct, 4),
+        "recent_move_window_minutes": recent_move_window_minutes,
         "grid_harvestable_now": grid_harvestable_now,
         "rebalancer_release_ready": rebalancer_release_ready,
     }
@@ -1104,6 +1170,7 @@ def _decide_for_grid(*,
     grid_stuck_for_recovery = _grid_is_truly_stuck_for_recovery(current_primary, wallet_state, grid_fill)
     persistent_breakout = bool(breakout["persistent"])
     breakout_phase = str(breakout.get("phase") or "none")
+    breakout_direction = _breakout_direction(breakout, stance)
     trend_handoff_ready = bool(
         trend
         and bool(decision_signals.get("trend_following_pressure"))
@@ -1121,6 +1188,63 @@ def _decide_for_grid(*,
         trend=trend,
     )
 
+    rapid_directional = bool(decision_signals.get("rapid_directional_pressure"))
+    directional_pressure = breakout_direction if breakout_direction in {"bullish", "bearish"} else "mixed"
+    all_scopes_aligned = (
+        directional_pressure in {"bullish", "bearish"}
+        and str(decision_signals.get("structural_direction") or "") == directional_pressure
+        and str(decision_signals.get("tactical_direction") or "") == directional_pressure
+        and str(grid_pressure.get("direction") or "") == directional_pressure
+    )
+    repair_inventory_match = bool(
+        (directional_pressure == "bullish" and inventory_state in {"quote_heavy", "critically_unbalanced"})
+        or (directional_pressure == "bearish" and inventory_state in {"base_heavy", "critically_unbalanced"})
+    )
+    urgent_rebalance_exit = bool(
+        rebalance
+        and wallet_state.get("rebalance_needed")
+        and rapid_directional
+        and all_scopes_aligned
+        and repair_inventory_match
+    )
+
+    if urgent_rebalance_exit:
+        action = "replace_with_exposure_protector"
+        target_strategy = rebalance["strategy_id"]
+        mode = "act"
+        reasons.append("wallet is skewed and the directional move is accelerating, so exposure repair should happen before the trend handoff")
+        reasons.append(
+            f"recent 1m history moved {decision_signals.get('recent_move_pct', 0.0):.2f}% over about {decision_signals.get('recent_move_window_minutes', 0)} minutes"
+        )
+        return action, mode, target_strategy, reasons, blocks
+
+    urgent_trend_exit = bool(
+        trend
+        and persistent_breakout
+        and bool(decision_signals.get("trend_following_pressure"))
+        and all_scopes_aligned
+        and (
+            rapid_directional
+            or grid_fill.get("near_fill")
+            or inventory_state in SEVERE_INVENTORY_STATES
+        )
+    )
+
+    if urgent_trend_exit:
+        action = "replace_with_trend_follower"
+        target_strategy = trend["strategy_id"] if trend else target_strategy
+        mode = "act"
+        reasons.append("all scopes line up and the tape is moving fast, so grid should yield early")
+        if rapid_directional:
+            reasons.append(
+                f"recent 1m history moved {decision_signals.get('recent_move_pct', 0.0):.2f}% over about {decision_signals.get('recent_move_window_minutes', 0)} minutes"
+            )
+        if grid_pressure.get("levels", 0.0) < _trend_handoff_level_threshold(breakout):
+            reasons.append("handoff is happening early, before the normal level threshold, because directional acceleration is sharp")
+        if grid_fill.get("near_fill"):
+            reasons.append("grid fill pressure is already near the market")
+        return action, mode, target_strategy, reasons, blocks
+
     if severe_imbalance and persistent_breakout:
         reasons.append("grid imbalance now coincides with persistent breakout pressure")
         directional_inventory = _inventory_breakout_is_directionally_compatible(inventory_state, breakout)
@@ -1346,6 +1470,7 @@ def make_decision(*, concern: dict[str, Any], narrative_payload: dict[str, Any],
         wallet_state=wallet_state,
         grid_strategy=grid_strategy,
         breakout=breakout,
+        history_window=history_window,
     )
     switch_tradeoff: dict[str, Any] = {}
 

+ 157 - 0
tests/test_decision_engine.py

@@ -336,6 +336,163 @@ def test_make_decision_replaces_grid_when_third_level_is_sustained():
     assert decision.payload["grid_breakout_pressure"]["phase"] == "confirmed"
 
 
+def test_make_decision_exits_grid_early_on_fast_bearish_alignment():
+    concern = {"id": "c1", "account_id": "a1", "market_symbol": "xrpusd", "base_currency": "XRP", "quote_currency": "USD"}
+    narrative = {
+        "generated_at": "2026-04-18T20:15:00+00:00",
+        "stance": "constructive_bearish",
+        "confidence": 0.84,
+        "opportunity_map": {"continuation": 0.8, "mean_reversion": 0.05, "reversal": 0.05, "wait": 0.1},
+        "features_by_timeframe": {
+            "1m": {"raw": {"price": 96.0, "atr_percent": 0.22, "rsi": 28.0, "macd_histogram": -0.02, "vwap": 97.2, "ema_fast": 96.8, "ema_slow": 97.6, "sma_long": 98.0, "bands": {"bollinger": {"middle": 97.0, "upper": 98.0, "lower": 95.5}}}},
+        },
+        "scoped_state": {
+            "micro": {"impulse": "down", "trend_bias": "bearish", "location": "near_lower_band", "reversal_risk": "low"},
+            "meso": {"structure": "trend_continuation", "momentum_bias": "bearish"},
+            "macro": {"bias": "bearish"},
+        },
+        "cross_scope_summary": {"alignment": "micro_meso_macro_aligned", "friction": "low"},
+    }
+    wallet_state = {
+        "inventory_state": "balanced",
+        "rebalance_needed": False,
+        "grid_ready": True,
+        "base_ratio": 0.5,
+        "quote_ratio": 0.5,
+    }
+    strategies = [
+        {"id": "grid-1", "strategy_type": "grid_trader", "mode": "active", "account_id": "a1", "state": {"center_price": 100.0}, "config": {"grid_step_pct": 0.05}},
+        {"id": "trend-1", "strategy_type": "trend_follower", "mode": "off", "account_id": "a1", "state": {}, "config": {"trade_side": "sell"}},
+        {"id": "protect-1", "strategy_type": "exposure_protector", "mode": "off", "account_id": "a1", "state": {}, "config": {}},
+    ]
+    history_window = {
+        "window_seconds": 1800,
+        "recent_states": [
+            {"created_at": "2026-04-18T19:55:00+00:00", "payload_json": '{"features_by_timeframe":{"1m":{"raw":{"price":100.0}}}}'},
+            {"created_at": "2026-04-18T20:00:00+00:00", "payload_json": '{"features_by_timeframe":{"1m":{"raw":{"price":99.2}}}}'},
+            {"created_at": "2026-04-18T20:05:00+00:00", "payload_json": '{"features_by_timeframe":{"1m":{"raw":{"price":97.8}}}}'},
+            {"created_at": "2026-04-18T20:10:00+00:00", "payload_json": '{"features_by_timeframe":{"1m":{"raw":{"price":96.8}}}}'},
+        ],
+    }
+
+    decision = make_decision(
+        concern=concern,
+        narrative_payload=narrative,
+        wallet_state=wallet_state,
+        strategies=strategies,
+        history_window=history_window,
+    )
+
+    assert decision.mode == "act"
+    assert decision.action == "replace_with_trend_follower"
+    assert decision.target_strategy == "trend-1"
+    assert decision.payload["decision_audit"]["rapid_downside_pressure"] is True
+
+
+def test_make_decision_prefers_exposure_protector_when_downside_hits_skewed_wallet():
+    concern = {"id": "c1", "account_id": "a1", "market_symbol": "xrpusd", "base_currency": "XRP", "quote_currency": "USD"}
+    narrative = {
+        "generated_at": "2026-04-18T20:15:00+00:00",
+        "stance": "constructive_bearish",
+        "confidence": 0.84,
+        "opportunity_map": {"continuation": 0.8, "mean_reversion": 0.05, "reversal": 0.05, "wait": 0.1},
+        "features_by_timeframe": {
+            "1m": {"raw": {"price": 96.0, "atr_percent": 0.22, "rsi": 28.0, "macd_histogram": -0.02, "vwap": 97.2, "ema_fast": 96.8, "ema_slow": 97.6, "sma_long": 98.0}},
+        },
+        "scoped_state": {
+            "micro": {"impulse": "down", "trend_bias": "bearish", "location": "near_lower_band", "reversal_risk": "low"},
+            "meso": {"structure": "trend_continuation", "momentum_bias": "bearish"},
+            "macro": {"bias": "bearish"},
+        },
+        "cross_scope_summary": {"alignment": "micro_meso_macro_aligned", "friction": "low"},
+    }
+    wallet_state = {
+        "inventory_state": "base_heavy",
+        "rebalance_needed": True,
+        "grid_ready": False,
+        "base_ratio": 0.8,
+        "quote_ratio": 0.2,
+    }
+    strategies = [
+        {"id": "grid-1", "strategy_type": "grid_trader", "mode": "active", "account_id": "a1", "state": {"center_price": 100.0}, "config": {"grid_step_pct": 0.05}},
+        {"id": "trend-1", "strategy_type": "trend_follower", "mode": "off", "account_id": "a1", "state": {}, "config": {"trade_side": "sell"}},
+        {"id": "protect-1", "strategy_type": "exposure_protector", "mode": "off", "account_id": "a1", "state": {}, "config": {}},
+    ]
+    history_window = {
+        "window_seconds": 1800,
+        "recent_states": [
+            {"created_at": "2026-04-18T19:55:00+00:00", "payload_json": '{"features_by_timeframe":{"1m":{"raw":{"price":100.0}}}}'},
+            {"created_at": "2026-04-18T20:00:00+00:00", "payload_json": '{"features_by_timeframe":{"1m":{"raw":{"price":99.2}}}}'},
+            {"created_at": "2026-04-18T20:05:00+00:00", "payload_json": '{"features_by_timeframe":{"1m":{"raw":{"price":97.8}}}}'},
+            {"created_at": "2026-04-18T20:10:00+00:00", "payload_json": '{"features_by_timeframe":{"1m":{"raw":{"price":96.8}}}}'},
+        ],
+    }
+
+    decision = make_decision(
+        concern=concern,
+        narrative_payload=narrative,
+        wallet_state=wallet_state,
+        strategies=strategies,
+        history_window=history_window,
+    )
+
+    assert decision.mode == "act"
+    assert decision.action == "replace_with_exposure_protector"
+    assert decision.target_strategy == "protect-1"
+
+
+def test_make_decision_prefers_exposure_protector_when_upside_hits_skewed_wallet():
+    concern = {"id": "c1", "account_id": "a1", "market_symbol": "xrpusd", "base_currency": "XRP", "quote_currency": "USD"}
+    narrative = {
+        "generated_at": "2026-04-18T20:15:00+00:00",
+        "stance": "constructive_bullish",
+        "confidence": 0.84,
+        "opportunity_map": {"continuation": 0.8, "mean_reversion": 0.05, "reversal": 0.05, "wait": 0.1},
+        "features_by_timeframe": {
+            "1m": {"raw": {"price": 104.0, "atr_percent": 0.22, "rsi": 72.0, "macd_histogram": 0.02, "vwap": 102.8, "ema_fast": 103.2, "ema_slow": 102.4, "sma_long": 102.0}},
+        },
+        "scoped_state": {
+            "micro": {"impulse": "up", "trend_bias": "bullish", "location": "near_upper_band", "reversal_risk": "low"},
+            "meso": {"structure": "trend_continuation", "momentum_bias": "bullish"},
+            "macro": {"bias": "bullish"},
+        },
+        "cross_scope_summary": {"alignment": "micro_meso_macro_aligned", "friction": "low"},
+    }
+    wallet_state = {
+        "inventory_state": "quote_heavy",
+        "rebalance_needed": True,
+        "grid_ready": False,
+        "base_ratio": 0.2,
+        "quote_ratio": 0.8,
+    }
+    strategies = [
+        {"id": "grid-1", "strategy_type": "grid_trader", "mode": "active", "account_id": "a1", "state": {"center_price": 100.0}, "config": {"grid_step_pct": 0.05}},
+        {"id": "trend-1", "strategy_type": "trend_follower", "mode": "off", "account_id": "a1", "state": {}, "config": {"trade_side": "buy"}},
+        {"id": "protect-1", "strategy_type": "exposure_protector", "mode": "off", "account_id": "a1", "state": {}, "config": {}},
+    ]
+    history_window = {
+        "window_seconds": 1800,
+        "recent_states": [
+            {"created_at": "2026-04-18T19:55:00+00:00", "payload_json": '{"features_by_timeframe":{"1m":{"raw":{"price":100.0}}}}'},
+            {"created_at": "2026-04-18T20:00:00+00:00", "payload_json": '{"features_by_timeframe":{"1m":{"raw":{"price":100.8}}}}'},
+            {"created_at": "2026-04-18T20:05:00+00:00", "payload_json": '{"features_by_timeframe":{"1m":{"raw":{"price":102.2}}}}'},
+            {"created_at": "2026-04-18T20:10:00+00:00", "payload_json": '{"features_by_timeframe":{"1m":{"raw":{"price":103.4}}}}'},
+        ],
+    }
+
+    decision = make_decision(
+        concern=concern,
+        narrative_payload=narrative,
+        wallet_state=wallet_state,
+        strategies=strategies,
+        history_window=history_window,
+    )
+
+    assert decision.mode == "act"
+    assert decision.action == "replace_with_exposure_protector"
+    assert decision.target_strategy == "protect-1"
+
+
 def test_make_decision_targets_the_trade_side_that_matches_direction():
     concern = {"id": "c1", "account_id": "a1", "market_symbol": "xrpusd", "base_currency": "XRP", "quote_currency": "USD"}
     narrative = {