from __future__ import annotations """Deterministic strategy-supervision logic for Hermes. This is the first decision slice. Hermes is currently acting as a supervisor for existing trader strategies, not as a direct trading engine. Design intent: - prefer one active posture at a time over layered companions - detect when grid trading becomes unsafe because market posture or wallet balance no longer supports it - switch cleanly between directional, range, and rebalancing phases """ import json from dataclasses import dataclass from datetime import datetime, timezone from typing import Any @dataclass(frozen=True) class DecisionSnapshot: mode: str action: str target_strategy: str | None reason_summary: str confidence: float requires_action: bool payload: dict[str, Any] def _clamp(value: float, lower: float, upper: float) -> float: return max(lower, min(upper, value)) def _safe_float(value: Any) -> float | None: try: if value is None: return None return float(value) except Exception: return None def _inventory_state_label(value: Any) -> str: state = str(value or "unknown").strip().lower() aliases = { "critical": "critically_unbalanced", "critically_imbalanced": "critically_unbalanced", "depleted_base": "depleted_base_side", "depleted_quote": "depleted_quote_side", "one_sided_base": "depleted_base_side", "one_sided_quote": "depleted_quote_side", } return aliases.get(state, state) SEVERE_INVENTORY_STATES = {"critically_unbalanced", "depleted_base_side", "depleted_quote_side"} REBALANCE_INVENTORY_STATES = {"base_heavy", "quote_heavy", *SEVERE_INVENTORY_STATES} def _infer_market_pair(concern: dict[str, Any]) -> tuple[str, str]: base = str(concern.get("base_currency") or "").strip().upper() quote = str(concern.get("quote_currency") or "").strip().upper() if base and quote: return base, quote market = str(concern.get("market_symbol") or "").strip().upper().replace("/", "").replace("-", "") for suffix in ("USDT", "USDC", "USD", "EUR", "BTC", "ETH"): if market.endswith(suffix) and len(market) > len(suffix): inferred_base = market[:-len(suffix)] inferred_quote = suffix return base or inferred_base, quote or inferred_quote return base or market, quote or "USD" def assess_wallet_state(*, account_info: dict[str, Any], concern: dict[str, Any], price: float | None, strategies: list[dict[str, Any]] | None = None) -> dict[str, Any]: """Summarize inventory health for strategy switching. The key output is whether the wallet is balanced enough for range/grid harvesting, or so skewed that Hermes should prefer trend capture or rebalancing before grid is allowed again. """ balances = account_info.get("balances") if isinstance(account_info.get("balances"), list) else [] base, quote = _infer_market_pair(concern) base_available = 0.0 quote_available = 0.0 for item in balances: if not isinstance(item, dict): continue asset = str(item.get("asset_code") or item.get("asset") or "").upper() amount = _safe_float(item.get("available") if item.get("available") is not None else item.get("total")) if amount is None: continue if asset == base: base_available = amount elif asset == quote: quote_available = amount reserved_base = 0.0 reserved_quote = 0.0 for strategy in strategies or []: if not isinstance(strategy, dict): continue if str(strategy.get("account_id") or "").strip() != str(concern.get("account_id") or "").strip(): continue market_symbol = str(strategy.get("market_symbol") or "").strip().lower() if market_symbol and market_symbol != str(concern.get("market_symbol") or "").strip().lower(): continue if str(strategy.get("mode") or "off") == "off": continue state = strategy.get("state") if isinstance(strategy.get("state"), dict) else {} orders = state.get("orders") if isinstance(state.get("orders"), list) else [] for order in orders: if not isinstance(order, dict): continue if str(order.get("status") or "open").lower() not in {"open", "live", "active"}: continue side = str(order.get("side") or "").lower() amount = _safe_float(order.get("amount") or order.get("amount_remaining")) or 0.0 order_price = _safe_float(order.get("price")) or price or 0.0 if side == "sell": reserved_base += amount elif side == "buy": reserved_quote += amount * order_price price = price or 0.0 effective_base = base_available + reserved_base effective_quote = quote_available + reserved_quote base_value = effective_base * price if price > 0 else 0.0 quote_value = effective_quote total_value = base_value + quote_value base_ratio = (base_value / total_value) if total_value > 0 else 0.5 quote_ratio = (quote_value / total_value) if total_value > 0 else 0.5 imbalance = abs(base_ratio - 0.5) if total_value <= 0: inventory_state = "unknown" elif base_ratio <= 0.02: inventory_state = "depleted_base_side" elif quote_ratio <= 0.02: inventory_state = "depleted_quote_side" elif base_ratio < 0.08: inventory_state = "critically_unbalanced" elif quote_ratio < 0.08: inventory_state = "critically_unbalanced" elif imbalance >= 0.35: inventory_state = "critically_unbalanced" elif base_ratio > 0.62: inventory_state = "base_heavy" elif quote_ratio > 0.62: inventory_state = "quote_heavy" else: inventory_state = "balanced" grid_ready = inventory_state == "balanced" rebalance_needed = inventory_state in REBALANCE_INVENTORY_STATES return { "generated_at": datetime.now(timezone.utc).isoformat(), "base_currency": base, "quote_currency": quote, "base_available": round(base_available, 8), "quote_available": round(quote_available, 8), "base_reserved": round(reserved_base, 8), "quote_reserved": round(reserved_quote, 8), "base_effective": round(effective_base, 8), "quote_effective": round(effective_quote, 8), "base_value": round(base_value, 4), "quote_value": round(quote_value, 4), "total_value": round(total_value, 4), "base_ratio": round(base_ratio, 4), "quote_ratio": round(quote_ratio, 4), "imbalance_score": round(imbalance, 4), "inventory_state": inventory_state, "grid_ready": grid_ready, "rebalance_needed": rebalance_needed, } def normalize_strategy_snapshot(strategy: dict[str, Any]) -> dict[str, Any]: strategy_type = str(strategy.get("strategy_type") or "unknown") mode = str(strategy.get("mode") or "off") state = strategy.get("state") if isinstance(strategy.get("state"), dict) else {} config = strategy.get("config") if isinstance(strategy.get("config"), dict) else {} report = strategy.get("report") if isinstance(strategy.get("report"), dict) else {} report_fit = report.get("fit") if isinstance(report.get("fit"), dict) else {} report_supervision = report.get("supervision") if isinstance(report.get("supervision"), dict) else {} report_state = report.get("state") if isinstance(report.get("state"), dict) else {} # Stable minimum contract used by Hermes while the trader-side strategy # metadata evolves. These values can later be sourced directly from richer # reports, but the decision layer keeps a normalized shape from day one. defaults = { "grid_trader": { "role": "primary", "inventory_behavior": "balanced", "requires_rebalance_before_start": False, "requires_rebalance_before_stop": False, "safe_when_unbalanced": False, "can_run_with": ["exposure_protector"], }, "trend_follower": { "role": "primary", "inventory_behavior": "accumulative_long", "requires_rebalance_before_start": False, "requires_rebalance_before_stop": False, "safe_when_unbalanced": True, "can_run_with": [], "trade_side": "both", }, "exposure_protector": { "role": "rebalancing", "inventory_behavior": "rebalancing", "requires_rebalance_before_start": False, "requires_rebalance_before_stop": False, "safe_when_unbalanced": True, "can_run_with": [], }, } contract = defaults.get(strategy_type, { "role": "primary", "inventory_behavior": "unknown", "requires_rebalance_before_start": False, "requires_rebalance_before_stop": False, "safe_when_unbalanced": True, "can_run_with": [], }) contract = {**contract, **report_fit} return { "id": strategy.get("id"), "strategy_type": strategy_type, "mode": mode, "enabled": mode != "off", "status": strategy.get("status") or ("running" if mode != "off" else "stopped"), "market_symbol": strategy.get("market_symbol"), "account_id": strategy.get("account_id"), "open_order_count": int(state.get("open_order_count") or report_state.get("open_order_count") or strategy.get("open_order_count") or 0), "last_action": state.get("last_action") or report_state.get("last_action") or strategy.get("last_side"), "last_error": state.get("last_error") or report_state.get("last_error") or "", "contract": contract, "trade_side": str(config.get("trade_side") or contract.get("trade_side") or "both"), "supervision": report_supervision, "config": config, "state": {**report_state, **state}, } def _argus_decision_context(narrative_payload: dict[str, Any]) -> dict[str, Any]: argus = narrative_payload.get("argus_context") if isinstance(narrative_payload.get("argus_context"), dict) else {} regime = str(argus.get("regime") or argus.get("snapshot_regime") or "").strip() confidence_raw = argus.get("regime_confidence") if argus.get("regime_confidence") is not None else argus.get("snapshot_confidence") confidence = float(confidence_raw) if isinstance(confidence_raw, (int, float)) else 0.0 components = argus.get("regime_components") if isinstance(argus.get("regime_components"), dict) else {} if not components: components = argus.get("snapshot_components") if isinstance(argus.get("snapshot_components"), dict) else {} compression = float(components.get("compression") or 0.0) compression_active = regime == "compression" and confidence >= 0.55 and compression >= 0.65 return { "regime": regime, "confidence": round(confidence, 4), "components": components, "compression": round(compression, 4), "compression_active": compression_active, } def _parse_timestamp(value: Any) -> datetime | None: text = str(value or "").strip() if not text: return None if text.endswith("Z"): text = text[:-1] + "+00:00" try: parsed = datetime.fromisoformat(text) except Exception: return None if parsed.tzinfo is None: return parsed.replace(tzinfo=timezone.utc) return parsed.astimezone(timezone.utc) def score_strategy_fit(*, strategy: dict[str, Any], narrative: dict[str, Any], wallet_state: dict[str, Any]) -> dict[str, Any]: stance = str(narrative.get("stance") or "neutral_rotational") opportunity_map = narrative.get("opportunity_map") if isinstance(narrative.get("opportunity_map"), dict) else {} breakout_pressure = narrative.get("grid_breakout_pressure") if isinstance(narrative.get("grid_breakout_pressure"), dict) else {} breakout_phase = str(breakout_pressure.get("phase") or "none") continuation = float(opportunity_map.get("continuation") or 0.0) mean_reversion = float(opportunity_map.get("mean_reversion") or 0.0) reversal = float(opportunity_map.get("reversal") or 0.0) wait = float(opportunity_map.get("wait") or 0.0) inventory_state = _inventory_state_label(wallet_state.get("inventory_state")) argus_context = _argus_decision_context(narrative) strategy_type = strategy["strategy_type"] supervision = strategy.get("supervision") if isinstance(strategy.get("supervision"), dict) else {} inventory_pressure = str(supervision.get("inventory_pressure") or "") capacity_available = bool(supervision.get("capacity_available")) side_capacity = supervision.get("side_capacity") if isinstance(supervision.get("side_capacity"), dict) else {} score = 0.0 reasons: list[str] = [] blocks: list[str] = [] if strategy_type == "grid_trader": score += mean_reversion * 1.8 if stance in {"neutral_rotational", "breakout_watch"}: score += 0.45 reasons.append("narrative still supports rotational structure") if continuation >= 0.45: score -= 0.8 blocks.append("continuation pressure is too strong for safe grid harvesting") if inventory_state != "balanced": score -= 1.0 blocks.append(f"wallet is not grid-ready: {inventory_state}") else: reasons.append("wallet is balanced enough for two-sided harvesting") if not capacity_available: score -= 0.25 blocks.append("grid report shows one-sided capacity") if side_capacity and not (bool(side_capacity.get("buy", True)) and bool(side_capacity.get("sell", True))): score -= 0.25 blocks.append("grid side capacity is asymmetric") if argus_context["compression_active"]: score += 0.2 reasons.append("Argus compression supports staying selective with grid") elif strategy_type == "trend_follower": score += continuation * 1.9 trade_side = _strategy_trade_side(strategy) narrative_direction = _narrative_direction(narrative) if stance in {"constructive_bullish", "cautious_bullish", "constructive_bearish", "cautious_bearish"}: score += 0.5 reasons.append("narrative supports directional continuation") if trade_side == "buy": if narrative_direction == "bullish": score += 0.6 reasons.append("buy-side trend instance matches bullish direction") elif narrative_direction == "bearish": score -= 0.9 blocks.append("buy-side trend instance conflicts with bearish direction") elif trade_side == "sell": if narrative_direction == "bearish": score += 0.6 reasons.append("sell-side trend instance matches bearish direction") elif narrative_direction == "bullish": score -= 0.9 blocks.append("sell-side trend instance conflicts with bullish direction") if breakout_phase == "confirmed": score += 0.45 reasons.append("confirmed breakout pressure supports directional continuation") elif breakout_phase == "developing": score += 0.2 reasons.append("breakout pressure is developing in trend's favor") if wait >= 0.45 and breakout_phase != "confirmed": score -= 0.35 blocks.append("market still has too much wait/uncertainty for trend commitment") if inventory_state in SEVERE_INVENTORY_STATES: score -= 0.25 blocks.append("wallet may be too skewed for clean directional scaling") if inventory_pressure in {"base_heavy", "quote_heavy"}: score -= 0.1 blocks.append("trend report shows rising inventory pressure") if not capacity_available: score -= 0.1 blocks.append("trend strength is below its own capacity threshold") if trade_side == "both" and narrative_direction in {"bullish", "bearish"}: score += 0.15 reasons.append("generic trend instance can follow either side") if argus_context["compression_active"] and breakout_phase != "confirmed": score -= 0.15 blocks.append("Argus compression says the broader tape is still range-like") elif strategy_type == "exposure_protector": score += reversal * 0.4 + wait * 0.5 if wallet_state.get("rebalance_needed"): score += 1.1 reasons.append("wallet imbalance calls for rebalancing protection") if inventory_state in SEVERE_INVENTORY_STATES: score += 0.45 reasons.append("inventory drift is high enough to justify defensive action") if stance in {"constructive_bullish", "constructive_bearish"} and continuation > 0.65: score -= 0.2 if inventory_pressure in {"critical", "elevated"}: score += 0.25 reasons.append("protector reports active inventory pressure") if strategy.get("last_error"): score -= 0.25 blocks.append("strategy recently reported an error") if bool(supervision.get("degraded")): score -= 0.15 blocks.append("strategy self-reports degraded supervision state") return { "strategy_id": strategy.get("id"), "strategy_type": strategy_type, "score": round(score, 4), "reasons": reasons, "blocks": blocks, "enabled": strategy.get("enabled", False), } def _breakout_phase_from_score(score: float) -> str: if score >= 3.45: return "confirmed" if score >= 2.45: return "developing" if score >= 1.4: return "probing" return "none" def _local_breakout_snapshot(narrative_payload: dict[str, Any]) -> dict[str, Any]: scoped = narrative_payload.get("scoped_state") if isinstance(narrative_payload.get("scoped_state"), dict) else {} cross = narrative_payload.get("cross_scope_summary") if isinstance(narrative_payload.get("cross_scope_summary"), dict) else {} micro = scoped.get("micro") if isinstance(scoped.get("micro"), dict) else {} meso = scoped.get("meso") if isinstance(scoped.get("meso"), dict) else {} macro = scoped.get("macro") if isinstance(scoped.get("macro"), dict) else {} micro_impulse = str(micro.get("impulse") or "mixed") micro_bias = str(micro.get("trend_bias") or "mixed") meso_structure = str(meso.get("structure") or "rotation") meso_bias = str(meso.get("momentum_bias") or "neutral") macro_bias = str(macro.get("bias") or "mixed") alignment = str(cross.get("alignment") or "partial_alignment") friction = str(cross.get("friction") or "medium") micro_directional = micro_impulse in {"up", "down"} and micro_bias in {"bullish", "bearish"} meso_directional = meso_structure == "trend_continuation" and meso_bias in {"bullish", "bearish"} macro_supportive = macro_bias in {"bullish", "bearish"} score = 0.0 if micro_directional: score += 1.0 if meso_directional: score += 1.1 if macro_supportive: score += 0.55 if alignment == "micro_meso_macro_aligned": score += 0.8 elif alignment == "partial_alignment": score += 0.35 if friction == "low": score += 0.45 elif friction == "medium": score += 0.15 return { "score": round(score, 4), "phase": _breakout_phase_from_score(score), "micro_impulse": micro_impulse, "micro_bias": micro_bias, "meso_structure": meso_structure, "meso_bias": meso_bias, "macro_bias": macro_bias, "alignment": alignment, "friction": friction, } def _breakout_memory(narrative_payload: dict[str, Any], history_window: dict[str, Any] | None, current_breakout: dict[str, Any]) -> dict[str, Any]: recent_states = history_window.get("recent_states") if isinstance(history_window, dict) and isinstance(history_window.get("recent_states"), list) else [] window_seconds = int(history_window.get("window_seconds") or 0) if isinstance(history_window, dict) else 0 current_ts = _parse_timestamp(narrative_payload.get("generated_at")) or datetime.now(timezone.utc) current_direction = str(current_breakout.get("meso_bias") or "neutral") directional = current_direction in {"bullish", "bearish"} and current_breakout.get("meso_structure") == "trend_continuation" if not directional: return {"window_seconds": window_seconds, "samples_considered": 0, "qualifying_samples": 0, "same_direction_seconds": 0, "promoted_to_confirmed": False} qualifying_samples = 0 oldest_match: datetime | None = None for row in recent_states: if not isinstance(row, dict): continue try: payload = json.loads(row.get("payload_json") or "{}") except Exception: continue snapshot = _local_breakout_snapshot(payload) sample_ts = _parse_timestamp(row.get("created_at") or payload.get("generated_at")) if sample_ts is None: continue if snapshot.get("phase") not in {"developing", "confirmed"}: continue if str(snapshot.get("meso_bias") or "neutral") != current_direction: continue if str(snapshot.get("macro_bias") or "mixed") != str(current_breakout.get("macro_bias") or "mixed"): continue qualifying_samples += 1 if oldest_match is None: oldest_match = sample_ts same_direction_seconds = int((current_ts - oldest_match).total_seconds()) if oldest_match else 0 promoted = current_breakout.get("phase") == "developing" and qualifying_samples >= 2 and same_direction_seconds >= min(window_seconds, 8 * 60) return { "window_seconds": window_seconds, "samples_considered": len(recent_states), "qualifying_samples": qualifying_samples, "same_direction_seconds": max(0, same_direction_seconds), "promoted_to_confirmed": promoted, } def _grid_breakout_pressure(narrative_payload: dict[str, Any], history_window: dict[str, Any] | None = None) -> dict[str, Any]: argus_context = _argus_decision_context(narrative_payload) breakout = _local_breakout_snapshot(narrative_payload) memory = _breakout_memory(narrative_payload, history_window, breakout) phase = str(breakout.get("phase") or "none") if memory["promoted_to_confirmed"]: phase = "confirmed" persistent = phase == "confirmed" return { "persistent": persistent, "phase": phase, "score": breakout["score"], "micro_impulse": breakout["micro_impulse"], "micro_bias": breakout["micro_bias"], "meso_structure": breakout["meso_structure"], "meso_bias": breakout["meso_bias"], "macro_bias": breakout["macro_bias"], "alignment": breakout["alignment"], "friction": breakout["friction"], "time_window_memory": memory, "argus_regime": argus_context["regime"], "argus_confidence": argus_context["confidence"], "argus_compression_active": argus_context["compression_active"], } def _select_current_primary(strategies: list[dict[str, Any]]) -> dict[str, Any] | None: primaries = [s for s in strategies if s["strategy_type"] in {"grid_trader", "trend_follower", "exposure_protector"} and s.get("mode") != "off"] if not primaries: return None active = next((s for s in primaries if s.get("mode") == "active"), None) if active: return active return primaries[0] def _inventory_breakout_is_directionally_compatible(inventory_state: str, breakout: dict[str, Any]) -> bool: inventory_state = _inventory_state_label(inventory_state) macro_bias = str(breakout.get("macro_bias") or "mixed") meso_bias = str(breakout.get("meso_bias") or "neutral") bullish = macro_bias == "bullish" and meso_bias == "bullish" bearish = macro_bias == "bearish" and meso_bias == "bearish" if bullish and inventory_state in {"depleted_base_side", "quote_heavy"}: return True if bearish and inventory_state in {"depleted_quote_side", "base_heavy"}: return True return False def _trend_cooling_edge(narrative_payload: dict[str, Any], wallet_state: dict[str, Any]) -> bool: if not wallet_state.get("rebalance_needed"): return False scoped = narrative_payload.get("scoped_state") if isinstance(narrative_payload.get("scoped_state"), dict) else {} micro = scoped.get("micro") if isinstance(scoped.get("micro"), dict) else {} meso = scoped.get("meso") if isinstance(scoped.get("meso"), dict) else {} micro_impulse = str(micro.get("impulse") or "mixed") micro_bias = str(micro.get("trend_bias") or "mixed") micro_location = str(micro.get("location") or "unknown") micro_reversal_risk = str(micro.get("reversal_risk") or "low") meso_bias = str(meso.get("momentum_bias") or "neutral") meso_structure = str(meso.get("structure") or "rotation") inventory_state = _inventory_state_label(wallet_state.get("inventory_state")) early_reversal_warning = micro_reversal_risk in {"medium", "high"} bullish_cooling = ( inventory_state in {"base_heavy", "critically_unbalanced"} and meso_structure == "trend_continuation" and meso_bias == "bullish" and (micro_impulse == "mixed" or early_reversal_warning) and micro_bias in {"mixed", "bearish", "bullish"} and micro_location in {"near_upper_band", "upper_half", "centered"} ) bearish_cooling = ( inventory_state in {"quote_heavy", "critically_unbalanced"} and meso_structure == "trend_continuation" and meso_bias == "bearish" and (micro_impulse == "mixed" or early_reversal_warning) and micro_bias in {"mixed", "bullish", "bearish"} and micro_location in {"near_lower_band", "lower_half", "centered"} ) return bullish_cooling or bearish_cooling def _grid_fill_proximity(strategy: dict[str, Any], narrative_payload: dict[str, Any]) -> dict[str, Any]: state = strategy.get("state") if isinstance(strategy.get("state"), dict) else {} orders = state.get("orders") if isinstance(state.get("orders"), list) else [] features = narrative_payload.get("features_by_timeframe") if isinstance(narrative_payload.get("features_by_timeframe"), dict) else {} micro_raw = features.get("1m", {}).get("raw", {}) if isinstance(features.get("1m"), dict) else {} current_price = _safe_float(micro_raw.get("price") or state.get("last_price") or state.get("center_price")) atr_percent = _safe_float(micro_raw.get("atr_percent")) or 0.0 if not current_price or current_price <= 0: return {"near_fill": False} sell_prices: list[float] = [] buy_prices: list[float] = [] for order in orders: if not isinstance(order, dict): continue if str(order.get("status") or "open").lower() not in {"open", "live", "active"}: continue price = _safe_float(order.get("price")) if price is None or price <= 0: continue side = str(order.get("side") or "").lower() if side == "sell" and price >= current_price: sell_prices.append(price) elif side == "buy" and price <= current_price: buy_prices.append(price) next_sell = min(sell_prices) if sell_prices else None next_buy = max(buy_prices) if buy_prices else None next_sell_distance_pct = (((next_sell - current_price) / current_price) * 100.0) if next_sell else None next_buy_distance_pct = (((current_price - next_buy) / current_price) * 100.0) if next_buy else None threshold_pct = max(0.25, atr_percent * 1.5) near_sell_fill = bool( next_sell_distance_pct is not None and next_sell_distance_pct >= 0 and next_sell_distance_pct <= threshold_pct and next_buy is not None ) near_buy_fill = bool( next_buy_distance_pct is not None and next_buy_distance_pct >= 0 and next_buy_distance_pct <= threshold_pct and next_sell is not None ) near_fill_side: str | None = None if near_sell_fill and near_buy_fill: near_fill_side = "sell" if (next_sell_distance_pct or 0.0) <= (next_buy_distance_pct or 0.0) else "buy" elif near_sell_fill: near_fill_side = "sell" elif near_buy_fill: near_fill_side = "buy" return { "near_fill": bool(near_sell_fill or near_buy_fill), "near_fill_side": near_fill_side, "near_sell_fill": near_sell_fill, "near_buy_fill": near_buy_fill, "current_price": current_price, "next_sell": next_sell, "next_buy": next_buy, "next_sell_distance_pct": round(next_sell_distance_pct, 4) if next_sell_distance_pct is not None else None, "next_buy_distance_pct": round(next_buy_distance_pct, 4) if next_buy_distance_pct is not None else None, "threshold_pct": round(threshold_pct, 4), } def _grid_fill_fights_breakout(grid_fill: dict[str, Any], breakout: dict[str, Any]) -> bool: side = str(grid_fill.get("near_fill_side") or "") bias = str(breakout.get("meso_bias") or breakout.get("micro_bias") or "") if bias == "bullish": return side == "sell" if bias == "bearish": return side == "buy" return False def _breakout_direction(breakout: dict[str, Any], stance: str | None = None) -> str | None: meso_bias = str(breakout.get("meso_bias") or "") micro_bias = str(breakout.get("micro_bias") or "") if meso_bias in {"bullish", "bearish"}: return meso_bias if micro_bias in {"bullish", "bearish"}: return micro_bias stance_text = str(stance or "") if "bullish" in stance_text: return "bullish" if "bearish" in stance_text: return "bearish" return None def _narrative_direction(narrative: dict[str, Any]) -> str | None: stance = str(narrative.get("stance") or "") breakout = narrative.get("grid_breakout_pressure") if isinstance(narrative.get("grid_breakout_pressure"), dict) else {} direction = _breakout_direction(breakout, stance) if direction: return direction if stance in {"constructive_bullish", "cautious_bullish", "fragile_bullish"}: return "bullish" if stance in {"constructive_bearish", "cautious_bearish", "fragile_bearish"}: return "bearish" return None def _strategy_trade_side(strategy: dict[str, Any]) -> str: config = strategy.get("config") if isinstance(strategy.get("config"), dict) else {} state = strategy.get("state") if isinstance(strategy.get("state"), dict) else {} side = str(config.get("trade_side") or state.get("trade_side") or strategy.get("trade_side") or "both").strip().lower() return side if side in {"buy", "sell", "both"} else "both" def _trend_handoff_level_threshold(breakout: dict[str, Any]) -> float: memory = breakout.get("time_window_memory") if isinstance(breakout.get("time_window_memory"), dict) else {} if bool(memory.get("promoted_to_confirmed")): return 2.0 return 2.75 def _grid_switch_tradeoff(*, current_primary: dict[str, Any], wallet_state: dict[str, Any], breakout: dict[str, Any], grid_fill: dict[str, Any], grid_pressure: dict[str, Any], directional_micro_clear: bool, trend: dict[str, Any] | None, ) -> dict[str, Any]: inventory_state = _inventory_state_label(wallet_state.get("inventory_state")) open_order_count = int(current_primary.get("open_order_count") or 0) if not open_order_count: state = current_primary.get("state") if isinstance(current_primary.get("state"), dict) else {} open_order_count = int(state.get("open_order_count") or len(state.get("orders") or []) or 0) trend_score = float(trend.get("score") or 0.0) if trend else 0.0 breakout_score = float(breakout.get("score") or 0.0) levels = float(grid_pressure.get("levels") or 0.0) near_fill = bool(grid_fill.get("near_fill")) fill_fights = _grid_fill_fights_breakout(grid_fill, breakout) persistent = bool(breakout.get("persistent")) trend_ready = trend_score > 0.45 and directional_micro_clear switch_benefit = 0.0 if persistent: switch_benefit += 0.28 if trend_ready: switch_benefit += 0.34 if fill_fights: switch_benefit += 0.12 if levels >= _trend_handoff_level_threshold(breakout): switch_benefit += 0.18 switch_benefit += min(trend_score, 2.5) * 0.18 switch_benefit += min(breakout_score, 5.0) * 0.04 stay_cost = 0.0 if inventory_state == "balanced": stay_cost += 0.06 elif inventory_state in {"base_heavy", "quote_heavy"}: stay_cost += 0.16 elif inventory_state in SEVERE_INVENTORY_STATES: stay_cost += 0.28 else: stay_cost += 0.1 stay_cost += min(levels, 6.0) * 0.06 stay_cost += min(open_order_count, 8) * 0.025 if near_fill: stay_cost += 0.06 if fill_fights: stay_cost += 0.18 if not persistent: stay_cost += 0.12 margin = round(switch_benefit - stay_cost, 4) should_switch = persistent and trend_ready and margin > 0.0 return { "trend_score": round(trend_score, 4), "breakout_score": round(breakout_score, 4), "switch_benefit": round(switch_benefit, 4), "stay_cost": round(stay_cost, 4), "margin": margin, "should_switch": should_switch, "trend_ready": trend_ready, "persistent": persistent, "levels": round(levels, 4), "open_order_count": open_order_count, "near_fill": near_fill, "fill_fights": fill_fights, "inventory_state": inventory_state, } def _grid_trend_pressure(strategy: dict[str, Any], narrative_payload: dict[str, Any]) -> dict[str, Any]: state = strategy.get("state") if isinstance(strategy.get("state"), dict) else {} config = strategy.get("config") if isinstance(strategy.get("config"), dict) else {} features = narrative_payload.get("features_by_timeframe") if isinstance(narrative_payload.get("features_by_timeframe"), dict) else {} micro_raw = features.get("1m", {}).get("raw", {}) if isinstance(features.get("1m"), dict) else {} current_price = _safe_float(micro_raw.get("price") or state.get("last_price") or state.get("center_price")) center_price = _safe_float(state.get("center_price") or state.get("last_price")) step_pct = _safe_float(config.get("grid_step_pct") or state.get("grid_step_pct") or state.get("recenter_pct_live")) or 0.0 if not current_price or not center_price or current_price <= 0 or center_price <= 0 or step_pct <= 0: return {"levels": 0.0, "rounded_levels": 0, "direction": "unknown", "current_price": current_price, "center_price": center_price, "step_pct": step_pct} distance_pct = abs(current_price - center_price) / center_price levels = distance_pct / step_pct direction = "bullish" if current_price > center_price else "bearish" if current_price < center_price else "flat" return { "levels": round(levels, 4), "rounded_levels": int(levels), "direction": direction, "current_price": current_price, "center_price": center_price, "step_pct": step_pct, "distance_pct": round(distance_pct, 4), } def _grid_can_still_work(strategy: dict[str, Any], wallet_state: dict[str, Any], grid_fill: dict[str, Any]) -> bool: supervision = strategy.get("supervision") if isinstance(strategy.get("supervision"), dict) else {} side_capacity = supervision.get("side_capacity") if isinstance(supervision.get("side_capacity"), dict) else {} buy_capacity = bool(side_capacity.get("buy", False)) sell_capacity = bool(side_capacity.get("sell", False)) open_order_count = int(strategy.get("open_order_count") or 0) degraded = bool(supervision.get("degraded")) inventory_state = _inventory_state_label(wallet_state.get("inventory_state")) if degraded: return False if buy_capacity or sell_capacity: return True if open_order_count > 0: return True if grid_fill.get("near_fill"): return True return inventory_state not in SEVERE_INVENTORY_STATES def _grid_is_truly_stuck_for_recovery(strategy: dict[str, Any], wallet_state: dict[str, Any], grid_fill: dict[str, Any]) -> bool: if _grid_can_still_work(strategy, wallet_state, grid_fill): return False inventory_state = _inventory_state_label(wallet_state.get("inventory_state")) return wallet_state.get("rebalance_needed") and inventory_state in SEVERE_INVENTORY_STATES def _wallet_within_rebalance_tolerance(wallet_state: dict[str, Any], tolerance: float = 0.3) -> bool: imbalance = _safe_float(wallet_state.get("imbalance_score")) if imbalance is None: base_ratio = _safe_float(wallet_state.get("base_ratio")) if base_ratio is not None: imbalance = abs(base_ratio - 0.5) if imbalance is None: return str(wallet_state.get("inventory_state") or "").lower() == "balanced" return imbalance <= tolerance def _decide_for_grid(*, current_primary: dict[str, Any], stance: str, inventory_state: str, wallet_state: dict[str, Any], breakout: dict[str, Any], grid_fill: dict[str, Any], grid_pressure: dict[str, Any], directional_micro_clear: bool, severe_imbalance: bool, trend: dict[str, Any] | None, rebalance: dict[str, Any] | None, ) -> tuple[str, str, str | None, list[str], list[str]]: action = "keep_grid" mode = "observe" target_strategy = current_primary["id"] reasons: list[str] = [] blocks: list[str] = [] inventory_state = _inventory_state_label(inventory_state) # Grid is the base mode. Leave it only for a persistent breakout or when # the grid has genuinely lost its ability to recover on its own. grid_friendly_stance = stance in {"neutral_rotational", "breakout_watch", "cautious_bullish", "cautious_bearish", "fragile_bullish", "fragile_bearish"} grid_can_work = _grid_can_still_work(current_primary, wallet_state, grid_fill) grid_stuck_for_recovery = _grid_is_truly_stuck_for_recovery(current_primary, wallet_state, grid_fill) persistent_breakout = bool(breakout["persistent"]) breakout_phase = str(breakout.get("phase") or "none") trend_handoff_ready = bool( trend and trend["score"] > 0.45 and directional_micro_clear and grid_pressure.get("levels", 0.0) >= _trend_handoff_level_threshold(breakout) ) fill_fights_breakout = _grid_fill_fights_breakout(grid_fill, breakout) switch_tradeoff = _grid_switch_tradeoff( current_primary=current_primary, wallet_state=wallet_state, breakout=breakout, grid_fill=grid_fill, grid_pressure=grid_pressure, directional_micro_clear=directional_micro_clear, trend=trend, ) if severe_imbalance and persistent_breakout: reasons.append("grid imbalance now coincides with persistent breakout pressure") directional_inventory = _inventory_breakout_is_directionally_compatible(inventory_state, breakout) if switch_tradeoff["should_switch"] and trend_handoff_ready and ( not wallet_state.get("rebalance_needed") or directional_inventory or not rebalance or trend["score"] >= rebalance["score"] ): action = "replace_with_trend_follower" target_strategy = trend["strategy_id"] mode = "act" if directional_inventory: reasons.append("inventory posture can be absorbed by the directional handoff") reasons.append( f"switch benefit ({switch_tradeoff['switch_benefit']:.2f}) exceeds stay cost ({switch_tradeoff['stay_cost']:.2f})" ) elif wallet_state.get("rebalance_needed") and rebalance and rebalance["score"] > 0.35: action = "replace_with_exposure_protector" target_strategy = rebalance["strategy_id"] mode = "act" else: action = "suspend_grid" mode = "warn" elif severe_imbalance and grid_stuck_for_recovery and not persistent_breakout and rebalance and rebalance["score"] > 0.6: action = "replace_with_exposure_protector" target_strategy = rebalance["strategy_id"] mode = "act" reasons.append("grid has lost practical recovery capacity, so inventory repair should take over") elif persistent_breakout and trend_handoff_ready and inventory_state in {"balanced", "base_heavy", "quote_heavy"}: if not switch_tradeoff["should_switch"]: reasons.append( f"breakout is persistent, but staying in grid still looks cheaper than switching (benefit {switch_tradeoff['switch_benefit']:.2f} vs cost {switch_tradeoff['stay_cost']:.2f})" ) if grid_fill.get("near_fill") and fill_fights_breakout: reasons.append("nearby opposing fill is only a warning here, not enough on its own to justify the handoff") else: action = "replace_with_trend_follower" target_strategy = trend["strategy_id"] if trend else target_strategy mode = "act" if grid_fill.get("near_fill") and fill_fights_breakout: reasons.append("confirmed trend should not be delayed by a nearby grid fill that trades against the move") elif grid_fill.get("near_fill"): reasons.append("confirmed directional pressure is strong enough that nearby grid fills should not delay the trend handoff") else: reasons.append("grid should yield because directional pressure is confirmed and the trend handoff is ready") elif not persistent_breakout and grid_can_work: if breakout_phase == "developing": reasons.append("breakout pressure is developing, but grid can still work and should not be abandoned yet") else: reasons.append("grid can still operate and self-heal, so inventory skew alone should not force a rebalance handoff") elif persistent_breakout and grid_fill.get("near_fill") and inventory_state in {"balanced", "base_heavy", "quote_heavy"}: reasons.append("grid is still close to a working fill, but trend handoff is not ready enough yet") elif not grid_friendly_stance and persistent_breakout: reasons.append("grid should yield because directional pressure is persistent across scopes") if trend_handoff_ready: action = "replace_with_trend_follower" target_strategy = trend["strategy_id"] mode = "act" else: mode = "warn" if grid_pressure.get("levels", 0.0) < _trend_handoff_level_threshold(breakout): blocks.append("grid has not yet been eaten by enough levels to justify leaving it") else: blocks.append("directional pressure is rising but the micro layer is not clear enough for a trend handoff") else: reasons.append("grid can likely self-heal because breakout pressure is not yet persistent") return action, mode, target_strategy, reasons, blocks def _decide_for_trend(*, current_primary: dict[str, Any], stance: str, narrative_payload: dict[str, Any], wallet_state: dict[str, Any], grid: dict[str, Any] | None, rebalance: dict[str, Any] | None = None, ) -> tuple[str, str, str | None, list[str], list[str]]: action = "keep_trend" mode = "observe" target_strategy = current_primary["id"] reasons: list[str] = [] blocks: list[str] = [] # Trend should cool into rebalancing first when the wallet is skewed, then # let rebalancer hand back to grid once the inventory is healthy again. cooling = _trend_cooling_edge(narrative_payload, wallet_state) if cooling: if wallet_state.get("rebalance_needed") and rebalance: action = "replace_with_exposure_protector" target_strategy = rebalance["strategy_id"] mode = "act" reasons.append("trend has cooled and rebalancing should repair the wallet before grid resumes") elif grid and wallet_state.get("grid_ready"): action = "replace_with_grid" target_strategy = grid["strategy_id"] mode = "act" reasons.append("trend has cooled and grid can resume because no rebalancer is available") else: mode = "warn" blocks.append("edge cooling is visible but the wallet is not yet ready for grid") elif stance == "neutral_rotational": if wallet_state.get("rebalance_needed") and rebalance: action = "replace_with_exposure_protector" target_strategy = rebalance["strategy_id"] mode = "act" reasons.append("trend conditions have cooled and rebalancing should repair the wallet before grid resumes") elif grid and wallet_state.get("grid_ready"): action = "replace_with_grid" target_strategy = grid["strategy_id"] mode = "act" reasons.append("trend conditions have cooled and wallet is grid-ready again") elif wallet_state.get("rebalance_needed"): mode = "warn" blocks.append("trend has cooled but rebalancing should be the next hop") else: action = "hold_trend" blocks.append("grid candidate not strong enough yet") else: reasons.append("trend strategy still fits the directional narrative") return action, mode, target_strategy, reasons, blocks def _decide_for_rebalancer(*, current_primary: dict[str, Any], stance: str, wallet_state: dict[str, Any], grid: dict[str, Any] | None, trend: dict[str, Any] | None = None, ) -> tuple[str, str, str | None, list[str], list[str]]: action = "keep_rebalancer" mode = "observe" target_strategy = current_primary["id"] reasons: list[str] = [] blocks: list[str] = [] # Rebalancing is a repair phase. Once the wallet is usable again, Hermes # should prefer handing back to grid, not directly to trend. trend_strength = float(trend["score"]) if trend and isinstance(trend.get("score"), (int, float)) else 0.0 if trend and trend_strength >= 1.5: blocks.append("trend is still strong enough that rebalancer should keep repairing instead of resetting to grid") elif _wallet_within_rebalance_tolerance(wallet_state, 0.3): if grid: action = "replace_with_grid" target_strategy = grid["strategy_id"] mode = "act" reasons.append("wallet is within the 0.3 rebalance tolerance, so grid can resume before perfect balance") else: blocks.append("wallet is within the rebalance tolerance but no grid candidate is available") elif wallet_state.get("grid_ready") and stance == "neutral_rotational": if grid and grid["score"] >= 0.5: action = "replace_with_grid" target_strategy = grid["strategy_id"] mode = "act" reasons.append("rebalance is complete and rotational conditions support grid again") else: blocks.append("wallet is ready but grid fit is still too weak") elif grid and grid["score"] >= 0.5: action = "replace_with_grid" target_strategy = grid["strategy_id"] mode = "act" reasons.append("trend is directional but not yet sustained, so grid can resume first") else: blocks.append("trend candidate is not strong enough yet and grid fit is not ready, so rebalancer should not hand directly back to trend") return action, mode, target_strategy, reasons, blocks def make_decision(*, concern: dict[str, Any], narrative_payload: dict[str, Any], wallet_state: dict[str, Any], strategies: list[dict[str, Any]], history_window: dict[str, Any] | None = None) -> DecisionSnapshot: normalized = [normalize_strategy_snapshot(s) for s in strategies if str(s.get("account_id") or "") == str(concern.get("account_id") or "")] breakout = _grid_breakout_pressure(narrative_payload, history_window=history_window) narrative_for_scoring = {**narrative_payload, "grid_breakout_pressure": breakout} fit_reports = [score_strategy_fit(strategy=s, narrative=narrative_for_scoring, wallet_state=wallet_state) for s in normalized] ranked = sorted(fit_reports, key=lambda item: item["score"], reverse=True) current_primary = _select_current_primary(normalized) best = ranked[0] if ranked else None stance = str(narrative_payload.get("stance") or "neutral_rotational") inventory_state = _inventory_state_label(wallet_state.get("inventory_state")) scoped = narrative_payload.get("scoped_state") if isinstance(narrative_payload.get("scoped_state"), dict) else {} micro = scoped.get("micro") if isinstance(scoped.get("micro"), dict) else {} micro_impulse = str(micro.get("impulse") or "mixed") micro_bias = str(micro.get("trend_bias") or "mixed") micro_reversal_risk = str(micro.get("reversal_risk") or "low") bullish_micro_clear = micro_impulse == "up" and micro_bias == "bullish" and micro_reversal_risk != "high" bearish_micro_clear = micro_impulse == "down" and micro_bias == "bearish" and micro_reversal_risk != "high" breakout_direction = _breakout_direction(breakout, stance) directional_micro_clear = bullish_micro_clear if breakout_direction == "bullish" else bearish_micro_clear if breakout_direction == "bearish" else False grid_fill = _grid_fill_proximity(current_primary, narrative_payload) if current_primary and current_primary["strategy_type"] == "grid_trader" else {"near_fill": False} grid_pressure = _grid_trend_pressure(current_primary, narrative_payload) if current_primary and current_primary["strategy_type"] == "grid_trader" else {"levels": 0.0, "rounded_levels": 0, "direction": "unknown"} severe_imbalance = inventory_state in SEVERE_INVENTORY_STATES action = "hold" mode = "observe" target_strategy = current_primary.get("id") if current_primary else (best.get("strategy_id") if best else None) reasons: list[str] = [] blocks: list[str] = [] trend = next((r for r in ranked if r["strategy_type"] == "trend_follower"), None) rebalance = next((r for r in ranked if r["strategy_type"] == "exposure_protector"), None) grid = next((r for r in ranked if r["strategy_type"] == "grid_trader"), None) switch_tradeoff: dict[str, Any] = {} if current_primary and current_primary["strategy_type"] == "grid_trader": action, mode, target_strategy, reasons, blocks = _decide_for_grid( current_primary=current_primary, stance=stance, inventory_state=inventory_state, wallet_state=wallet_state, breakout=breakout, grid_fill=grid_fill, grid_pressure=grid_pressure, directional_micro_clear=directional_micro_clear, severe_imbalance=severe_imbalance, trend=trend, rebalance=rebalance, ) switch_tradeoff = _grid_switch_tradeoff( current_primary=current_primary, wallet_state=wallet_state, breakout=breakout, grid_fill=grid_fill, grid_pressure=grid_pressure, directional_micro_clear=directional_micro_clear, trend=trend, ) elif current_primary and current_primary["strategy_type"] == "trend_follower": action, mode, target_strategy, reasons, blocks = _decide_for_trend( current_primary=current_primary, stance=stance, narrative_payload=narrative_payload, wallet_state=wallet_state, grid=grid, rebalance=rebalance, ) elif current_primary and current_primary["strategy_type"] == "exposure_protector": action, mode, target_strategy, reasons, blocks = _decide_for_rebalancer( current_primary=current_primary, stance=stance, wallet_state=wallet_state, grid=grid, trend=trend, ) else: if best and best["score"] >= 0.55: action = f"enable_{best['strategy_type']}" target_strategy = best["strategy_id"] mode = "act" reasons.extend(best["reasons"]) else: action = "wait" mode = "observe" blocks.append("no strategy is yet a strong enough fit") reason_summary = reasons[0] if reasons else (blocks[0] if blocks else "strategy posture unchanged") confidence = float(narrative_payload.get("confidence") or 0.4) if action.startswith("replace_with") or action.startswith("enable_"): confidence += 0.08 if wallet_state.get("rebalance_needed") and "grid" in action: confidence -= 0.08 confidence = round(_clamp(confidence, 0.2, 0.95), 3) payload = { "generated_at": datetime.now(timezone.utc).isoformat(), "wallet_state": wallet_state, "narrative_stance": stance, "strategy_fit_ranking": ranked, "current_primary_strategy": current_primary.get("id") if current_primary else None, "argus_decision_context": _argus_decision_context(narrative_payload), "history_window": history_window or {}, "grid_breakout_pressure": breakout, "grid_fill_context": grid_fill, "grid_switch_tradeoff": switch_tradeoff if current_primary and current_primary["strategy_type"] == "grid_trader" else {}, "reason_chain": reasons, "blocks": blocks, "decision_version": 2, } return DecisionSnapshot( mode=mode, action=action, target_strategy=target_strategy, reason_summary=reason_summary, confidence=confidence, requires_action=mode == "act", payload=payload, )