from __future__ import annotations """Deterministic strategy-supervision logic for Hermes. This is the first decision slice. Hermes is currently acting as a supervisor for existing trader strategies, not as a direct trading engine. Design intent: - prefer one active posture at a time over layered companions - detect when grid trading becomes unsafe because market posture or wallet balance no longer supports it - switch cleanly between directional, range, and rebalancing phases """ import json from dataclasses import dataclass from datetime import datetime, timezone from typing import Any @dataclass(frozen=True) class DecisionSnapshot: mode: str action: str target_strategy: str | None reason_summary: str confidence: float requires_action: bool payload: dict[str, Any] def _clamp(value: float, lower: float, upper: float) -> float: return max(lower, min(upper, value)) def _safe_float(value: Any) -> float | None: try: if value is None: return None return float(value) except Exception: return None def _decision_profile_config(decision_profile: dict[str, Any] | None) -> dict[str, Any]: if not isinstance(decision_profile, dict): return {} config = decision_profile.get("config") if isinstance(config, dict): return config raw = decision_profile.get("config_json") if isinstance(raw, str) and raw.strip(): try: parsed = json.loads(raw) if isinstance(parsed, dict): return parsed except Exception: return {} return {} def _inventory_state_label(value: Any) -> str: state = str(value or "unknown").strip().lower() aliases = { "critical": "critically_unbalanced", "critically_imbalanced": "critically_unbalanced", "depleted_base": "depleted_base_side", "depleted_quote": "depleted_quote_side", "one_sided_base": "depleted_base_side", "one_sided_quote": "depleted_quote_side", } return aliases.get(state, state) def _timeframe_direction(feature: dict[str, Any] | None) -> str: if not isinstance(feature, dict): return "mixed" trend = feature.get("trend") if isinstance(feature.get("trend"), dict) else {} momentum = feature.get("momentum") if isinstance(feature.get("momentum"), dict) else {} alignment = str(trend.get("alignment") or "") if alignment in {"fully_bullish", "bullish_pullback"}: return "bullish" if alignment in {"fully_bearish", "bearish_pullback"}: return "bearish" bias_score = _safe_float(trend.get("bias_score")) if bias_score is not None: if bias_score >= 0.55: return "bullish" if bias_score <= -0.55: return "bearish" impulse = str(momentum.get("impulse") or "") if impulse == "up": return "bullish" if impulse == "down": return "bearish" return "mixed" def _short_term_trend_dislocated(narrative_payload: dict[str, Any]) -> bool: features = narrative_payload.get("features_by_timeframe") if isinstance(narrative_payload.get("features_by_timeframe"), dict) else {} short_dirs = [_timeframe_direction(features.get(tf)) for tf in ("1m", "5m")] higher_dirs = [_timeframe_direction(features.get(tf)) for tf in ("15m", "1h", "4h", "1d")] short_clean = [d for d in short_dirs if d in {"bullish", "bearish"}] higher_clean = [d for d in higher_dirs if d in {"bullish", "bearish"}] if not short_clean: return bool(higher_clean) and len(set(higher_clean)) == 1 if any(d == "mixed" for d in short_dirs): return bool(higher_clean) and len(set(higher_clean)) == 1 short_direction = short_clean[0] if len(set(short_clean)) == 1 else "mixed" if short_direction == "mixed": return True if not higher_clean: return False higher_direction = higher_clean[0] if len(set(higher_clean)) == 1 else "mixed" return higher_direction in {"bullish", "bearish"} and short_direction != higher_direction def _short_term_trend_manifest_score(narrative_payload: dict[str, Any], direction: str) -> float: features = narrative_payload.get("features_by_timeframe") if isinstance(narrative_payload.get("features_by_timeframe"), dict) else {} if direction not in {"bullish", "bearish"}: return 0.0 total = 0.0 seen = 0 for timeframe in ("1m", "5m"): feature = features.get(timeframe) if isinstance(features.get(timeframe), dict) else None if not feature: continue seen += 1 trend = feature.get("trend") if isinstance(feature.get("trend"), dict) else {} if not trend: local = 0.68 else: alignment = str(trend.get("alignment") or "") strength = _safe_float(trend.get("strength")) or 0.0 bias_score = abs(_safe_float(trend.get("bias_score")) or 0.0) local = 0.0 if direction == "bullish": if alignment == "fully_bullish": local = 1.0 elif alignment == "bullish_pullback": local = 0.62 else: if alignment == "fully_bearish": local = 1.0 elif alignment == "bearish_pullback": local = 0.62 if local == 0.0: short_direction = _timeframe_direction(feature) if short_direction == direction: local = 0.34 elif short_direction == "mixed": local = 0.12 if strength >= 0.55: local += 0.16 elif strength <= 0.2: local -= 0.08 if bias_score >= 0.75: local += 0.08 elif bias_score <= 0.2: local -= 0.04 total += _clamp(local, 0.0, 1.0) if not seen: return 0.0 return round(total / seen, 4) SEVERE_INVENTORY_STATES = {"critically_unbalanced", "depleted_base_side", "depleted_quote_side"} REBALANCE_INVENTORY_STATES = {"base_heavy", "quote_heavy", *SEVERE_INVENTORY_STATES} def _infer_market_pair(concern: dict[str, Any]) -> tuple[str, str]: base = str(concern.get("base_currency") or "").strip().upper() quote = str(concern.get("quote_currency") or "").strip().upper() if base and quote: return base, quote market = str(concern.get("market_symbol") or "").strip().upper().replace("/", "").replace("-", "") for suffix in ("USDT", "USDC", "USD", "EUR", "BTC", "ETH"): if market.endswith(suffix) and len(market) > len(suffix): inferred_base = market[:-len(suffix)] inferred_quote = suffix return base or inferred_base, quote or inferred_quote return base or market, quote or "USD" def assess_wallet_state(*, account_info: dict[str, Any], concern: dict[str, Any], price: float | None, strategies: list[dict[str, Any]] | None = None) -> dict[str, Any]: """Summarize inventory health for strategy switching. The key output is whether the wallet is balanced enough for range/grid harvesting, or so skewed that Hermes should prefer trend capture or rebalancing before grid is allowed again. """ balances = account_info.get("balances") if isinstance(account_info.get("balances"), list) else [] base, quote = _infer_market_pair(concern) base_available = 0.0 quote_available = 0.0 for item in balances: if not isinstance(item, dict): continue asset = str(item.get("asset_code") or item.get("asset") or "").upper() amount = _safe_float(item.get("available") if item.get("available") is not None else item.get("total")) if amount is None: continue if asset == base: base_available = amount elif asset == quote: quote_available = amount reserved_base = 0.0 reserved_quote = 0.0 for strategy in strategies or []: if not isinstance(strategy, dict): continue if str(strategy.get("account_id") or "").strip() != str(concern.get("account_id") or "").strip(): continue market_symbol = str(strategy.get("market_symbol") or "").strip().lower() if market_symbol and market_symbol != str(concern.get("market_symbol") or "").strip().lower(): continue if str(strategy.get("mode") or "off") == "off": continue state = strategy.get("state") if isinstance(strategy.get("state"), dict) else {} orders = state.get("orders") if isinstance(state.get("orders"), list) else [] for order in orders: if not isinstance(order, dict): continue if str(order.get("status") or "open").lower() not in {"open", "live", "active"}: continue side = str(order.get("side") or "").lower() amount = _safe_float(order.get("amount") or order.get("amount_remaining")) or 0.0 order_price = _safe_float(order.get("price")) or price or 0.0 if side == "sell": reserved_base += amount elif side == "buy": reserved_quote += amount * order_price price = price or 0.0 effective_base = base_available + reserved_base effective_quote = quote_available + reserved_quote base_value = effective_base * price if price > 0 else 0.0 quote_value = effective_quote total_value = base_value + quote_value base_ratio = (base_value / total_value) if total_value > 0 else 0.5 quote_ratio = (quote_value / total_value) if total_value > 0 else 0.5 imbalance = abs(base_ratio - 0.5) if total_value <= 0: inventory_state = "unknown" elif base_ratio <= 0.02: inventory_state = "depleted_base_side" elif quote_ratio <= 0.02: inventory_state = "depleted_quote_side" elif base_ratio < 0.08: inventory_state = "critically_unbalanced" elif quote_ratio < 0.08: inventory_state = "critically_unbalanced" elif imbalance >= 0.35: inventory_state = "critically_unbalanced" elif base_ratio > 0.62: inventory_state = "base_heavy" elif quote_ratio > 0.62: inventory_state = "quote_heavy" else: inventory_state = "balanced" grid_ready = inventory_state == "balanced" rebalance_needed = inventory_state in REBALANCE_INVENTORY_STATES return { "generated_at": datetime.now(timezone.utc).isoformat(), "base_currency": base, "quote_currency": quote, "base_available": round(base_available, 8), "quote_available": round(quote_available, 8), "base_reserved": round(reserved_base, 8), "quote_reserved": round(reserved_quote, 8), "base_effective": round(effective_base, 8), "quote_effective": round(effective_quote, 8), "base_value": round(base_value, 4), "quote_value": round(quote_value, 4), "total_value": round(total_value, 4), "base_ratio": round(base_ratio, 4), "quote_ratio": round(quote_ratio, 4), "imbalance_score": round(imbalance, 4), "inventory_state": inventory_state, "grid_ready": grid_ready, "rebalance_needed": rebalance_needed, } def normalize_strategy_snapshot(strategy: dict[str, Any]) -> dict[str, Any]: strategy_type = str(strategy.get("strategy_type") or "unknown") mode = str(strategy.get("mode") or "off") state = strategy.get("state") if isinstance(strategy.get("state"), dict) else {} config = strategy.get("config") if isinstance(strategy.get("config"), dict) else {} report = strategy.get("report") if isinstance(strategy.get("report"), dict) else {} report_fit = report.get("fit") if isinstance(report.get("fit"), dict) else {} report_supervision = report.get("supervision") if isinstance(report.get("supervision"), dict) else {} report_state = report.get("state") if isinstance(report.get("state"), dict) else {} # Stable minimum contract used by Hermes while the trader-side strategy # metadata evolves. These values can later be sourced directly from richer # reports, but the decision layer keeps a normalized shape from day one. defaults = { "grid_trader": { "role": "primary", "inventory_behavior": "balanced", "requires_rebalance_before_start": False, "requires_rebalance_before_stop": False, "safe_when_unbalanced": False, "can_run_with": ["exposure_protector"], }, "trend_follower": { "role": "primary", "inventory_behavior": "accumulative_long", "requires_rebalance_before_start": False, "requires_rebalance_before_stop": False, "safe_when_unbalanced": True, "can_run_with": [], "trade_side": "both", }, "exposure_protector": { "role": "rebalancing", "inventory_behavior": "rebalancing", "requires_rebalance_before_start": False, "requires_rebalance_before_stop": False, "safe_when_unbalanced": True, "can_run_with": [], "rebalance_tolerance": 0.3, }, } contract = defaults.get(strategy_type, { "role": "primary", "inventory_behavior": "unknown", "requires_rebalance_before_start": False, "requires_rebalance_before_stop": False, "safe_when_unbalanced": True, "can_run_with": [], }) contract = {**contract, **report_fit} return { "id": strategy.get("id"), "strategy_type": strategy_type, "mode": mode, "enabled": mode != "off", "status": strategy.get("status") or ("running" if mode != "off" else "stopped"), "market_symbol": strategy.get("market_symbol"), "account_id": strategy.get("account_id"), "open_order_count": int(state.get("open_order_count") or report_state.get("open_order_count") or strategy.get("open_order_count") or 0), "last_action": state.get("last_action") or report_state.get("last_action") or strategy.get("last_side"), "last_error": state.get("last_error") or report_state.get("last_error") or "", "contract": contract, "trade_side": str(config.get("trade_side") or contract.get("trade_side") or "both"), "supervision": report_supervision, "config": config, "state": {**report_state, **state}, } def _argus_decision_context(narrative_payload: dict[str, Any]) -> dict[str, Any]: argus = narrative_payload.get("argus_context") if isinstance(narrative_payload.get("argus_context"), dict) else {} regime = str(argus.get("regime") or argus.get("snapshot_regime") or "").strip() confidence_raw = argus.get("regime_confidence") if argus.get("regime_confidence") is not None else argus.get("snapshot_confidence") confidence = float(confidence_raw) if isinstance(confidence_raw, (int, float)) else 0.0 components = argus.get("regime_components") if isinstance(argus.get("regime_components"), dict) else {} if not components: components = argus.get("snapshot_components") if isinstance(argus.get("snapshot_components"), dict) else {} compression = float(components.get("compression") or 0.0) compression_active = regime == "compression" and confidence >= 0.55 and compression >= 0.65 return { "regime": regime, "confidence": round(confidence, 4), "components": components, "compression": round(compression, 4), "compression_active": compression_active, } def _parse_timestamp(value: Any) -> datetime | None: text = str(value or "").strip() if not text: return None if text.endswith("Z"): text = text[:-1] + "+00:00" try: parsed = datetime.fromisoformat(text) except Exception: return None if parsed.tzinfo is None: return parsed.replace(tzinfo=timezone.utc) return parsed.astimezone(timezone.utc) def _recent_switch_cooldown_active(history_window: dict[str, Any] | None, concern_id: str, cooldown_seconds: int) -> tuple[bool, float | None, str | None]: if cooldown_seconds <= 0: return False, None, None rows = history_window.get("recent_decisions") if isinstance(history_window, dict) and isinstance(history_window.get("recent_decisions"), list) else [] now = datetime.now(timezone.utc) for row in rows: if not isinstance(row, dict): continue if concern_id and str(row.get("concern_id") or "") != concern_id: continue mode = str(row.get("mode") or "").lower() action = str(row.get("action") or "") target = str(row.get("target_strategy") or "") if mode != "act" or not action or not target: continue created = _parse_timestamp(row.get("created_at")) if not created: continue elapsed = (now - created).total_seconds() if elapsed < cooldown_seconds: return True, round(max(cooldown_seconds - elapsed, 0.0), 1), action return False, None, action return False, None, None def score_strategy_fit(*, strategy: dict[str, Any], narrative: dict[str, Any], wallet_state: dict[str, Any]) -> dict[str, Any]: stance = str(narrative.get("stance") or "neutral_rotational") opportunity_map = narrative.get("opportunity_map") if isinstance(narrative.get("opportunity_map"), dict) else {} breakout_pressure = narrative.get("grid_breakout_pressure") if isinstance(narrative.get("grid_breakout_pressure"), dict) else {} breakout_phase = str(breakout_pressure.get("phase") or "none") continuation = float(opportunity_map.get("continuation") or 0.0) mean_reversion = float(opportunity_map.get("mean_reversion") or 0.0) reversal = float(opportunity_map.get("reversal") or 0.0) wait = float(opportunity_map.get("wait") or 0.0) inventory_state = _inventory_state_label(wallet_state.get("inventory_state")) argus_context = _argus_decision_context(narrative) strategy_type = strategy["strategy_type"] supervision = strategy.get("supervision") if isinstance(strategy.get("supervision"), dict) else {} inventory_pressure = str(supervision.get("inventory_pressure") or "") capacity_available = bool(supervision.get("capacity_available")) side_capacity = supervision.get("side_capacity") if isinstance(supervision.get("side_capacity"), dict) else {} score = 0.0 reasons: list[str] = [] blocks: list[str] = [] if strategy_type == "grid_trader": score += mean_reversion * 1.8 if stance in {"neutral_rotational", "breakout_watch"}: score += 0.45 reasons.append("narrative still supports rotational structure") if continuation >= 0.45: score -= 0.8 blocks.append("continuation pressure is too strong for safe grid harvesting") if inventory_state != "balanced": score -= 1.0 blocks.append(f"wallet is not grid-ready: {inventory_state}") else: reasons.append("wallet is balanced enough for two-sided harvesting") if not capacity_available: score -= 0.25 blocks.append("grid report shows one-sided capacity") if side_capacity and not (bool(side_capacity.get("buy", True)) and bool(side_capacity.get("sell", True))): score -= 0.25 blocks.append("grid side capacity is asymmetric") if argus_context["compression_active"]: score += 0.2 reasons.append("Argus compression supports staying selective with grid") elif strategy_type == "trend_follower": score += continuation * 1.9 trade_side = _strategy_trade_side(strategy) narrative_direction = _narrative_direction(narrative) if stance in {"constructive_bullish", "cautious_bullish", "constructive_bearish", "cautious_bearish"}: score += 0.5 reasons.append("narrative supports directional continuation") if trade_side == "buy": if narrative_direction == "bullish": score += 0.6 reasons.append("buy-side trend instance matches bullish direction") elif narrative_direction == "bearish": score -= 0.9 blocks.append("buy-side trend instance conflicts with bearish direction") elif trade_side == "sell": if narrative_direction == "bearish": score += 0.6 reasons.append("sell-side trend instance matches bearish direction") elif narrative_direction == "bullish": score -= 0.9 blocks.append("sell-side trend instance conflicts with bullish direction") if breakout_phase == "confirmed": score += 0.45 reasons.append("confirmed breakout pressure supports directional continuation") elif breakout_phase == "developing": score += 0.2 reasons.append("breakout pressure is developing in trend's favor") if wait >= 0.45 and breakout_phase != "confirmed": score -= 0.35 blocks.append("market still has too much wait/uncertainty for trend commitment") if inventory_state in SEVERE_INVENTORY_STATES: score -= 0.25 blocks.append("wallet may be too skewed for clean directional scaling") if inventory_pressure in {"base_heavy", "quote_heavy"}: score -= 0.1 blocks.append("trend report shows rising inventory pressure") if not capacity_available: score -= 0.1 blocks.append("trend strength is below its own capacity threshold") if trade_side == "both" and narrative_direction in {"bullish", "bearish"}: score += 0.15 reasons.append("generic trend instance can follow either side") if argus_context["compression_active"] and breakout_phase != "confirmed": score -= 0.15 blocks.append("Argus compression says the broader tape is still range-like") elif strategy_type == "exposure_protector": score += reversal * 0.4 + wait * 0.5 if wallet_state.get("rebalance_needed"): score += 1.1 reasons.append("wallet imbalance calls for rebalancing protection") if inventory_state in SEVERE_INVENTORY_STATES: score += 0.45 reasons.append("inventory drift is high enough to justify defensive action") if stance in {"constructive_bullish", "constructive_bearish"} and continuation > 0.65: score -= 0.2 if inventory_pressure in {"critical", "elevated"}: score += 0.25 reasons.append("protector reports active inventory pressure") if strategy.get("last_error"): score -= 0.25 blocks.append("strategy recently reported an error") if bool(supervision.get("degraded")): score -= 0.15 blocks.append("strategy self-reports degraded supervision state") return { "strategy_id": strategy.get("id"), "strategy_type": strategy_type, "score": round(score, 4), "reasons": reasons, "blocks": blocks, "enabled": strategy.get("enabled", False), } def _breakout_phase_from_score(score: float) -> str: if score >= 3.45: return "confirmed" if score >= 2.45: return "developing" if score >= 1.4: return "probing" return "none" def _local_breakout_snapshot(narrative_payload: dict[str, Any]) -> dict[str, Any]: scoped = narrative_payload.get("scoped_state") if isinstance(narrative_payload.get("scoped_state"), dict) else {} cross = narrative_payload.get("cross_scope_summary") if isinstance(narrative_payload.get("cross_scope_summary"), dict) else {} micro = scoped.get("micro") if isinstance(scoped.get("micro"), dict) else {} meso = scoped.get("meso") if isinstance(scoped.get("meso"), dict) else {} macro = scoped.get("macro") if isinstance(scoped.get("macro"), dict) else {} micro_impulse = str(micro.get("impulse") or "mixed") micro_bias = str(micro.get("trend_bias") or "mixed") meso_structure = str(meso.get("structure") or "rotation") meso_bias = str(meso.get("momentum_bias") or "neutral") macro_bias = str(macro.get("bias") or "mixed") alignment = str(cross.get("alignment") or "partial_alignment") friction = str(cross.get("friction") or "medium") micro_directional = micro_impulse in {"up", "down"} and micro_bias in {"bullish", "bearish"} meso_directional = meso_structure == "trend_continuation" and meso_bias in {"bullish", "bearish"} macro_supportive = macro_bias in {"bullish", "bearish"} score = 0.0 if micro_directional: score += 1.0 if meso_directional: score += 1.1 if macro_supportive: score += 0.55 if alignment == "micro_meso_macro_aligned": score += 0.8 elif alignment == "partial_alignment": score += 0.35 if friction == "low": score += 0.45 elif friction == "medium": score += 0.15 return { "score": round(score, 4), "phase": _breakout_phase_from_score(score), "micro_impulse": micro_impulse, "micro_bias": micro_bias, "meso_structure": meso_structure, "meso_bias": meso_bias, "macro_bias": macro_bias, "alignment": alignment, "friction": friction, } def _breakout_memory(narrative_payload: dict[str, Any], history_window: dict[str, Any] | None, current_breakout: dict[str, Any]) -> dict[str, Any]: recent_states = history_window.get("recent_states") if isinstance(history_window, dict) and isinstance(history_window.get("recent_states"), list) else [] window_seconds = int(history_window.get("window_seconds") or 0) if isinstance(history_window, dict) else 0 current_ts = _parse_timestamp(narrative_payload.get("generated_at")) or datetime.now(timezone.utc) current_direction = str(current_breakout.get("meso_bias") or "neutral") directional = current_direction in {"bullish", "bearish"} and current_breakout.get("meso_structure") == "trend_continuation" if not directional: return {"window_seconds": window_seconds, "samples_considered": 0, "qualifying_samples": 0, "same_direction_seconds": 0, "promoted_to_confirmed": False} qualifying_samples = 0 oldest_match: datetime | None = None for row in recent_states: if not isinstance(row, dict): continue try: payload = json.loads(row.get("payload_json") or "{}") except Exception: continue snapshot = _local_breakout_snapshot(payload) sample_ts = _parse_timestamp(row.get("created_at") or payload.get("generated_at")) if sample_ts is None: continue if snapshot.get("phase") not in {"developing", "confirmed"}: continue if str(snapshot.get("meso_bias") or "neutral") != current_direction: continue if str(snapshot.get("macro_bias") or "mixed") != str(current_breakout.get("macro_bias") or "mixed"): continue qualifying_samples += 1 if oldest_match is None: oldest_match = sample_ts same_direction_seconds = int((current_ts - oldest_match).total_seconds()) if oldest_match else 0 promoted = current_breakout.get("phase") == "developing" and qualifying_samples >= 2 and same_direction_seconds >= min(window_seconds, 8 * 60) return { "window_seconds": window_seconds, "samples_considered": len(recent_states), "qualifying_samples": qualifying_samples, "same_direction_seconds": max(0, same_direction_seconds), "promoted_to_confirmed": promoted, } def _grid_breakout_pressure(narrative_payload: dict[str, Any], history_window: dict[str, Any] | None = None) -> dict[str, Any]: argus_context = _argus_decision_context(narrative_payload) breakout = _local_breakout_snapshot(narrative_payload) memory = _breakout_memory(narrative_payload, history_window, breakout) phase = str(breakout.get("phase") or "none") if memory["promoted_to_confirmed"]: phase = "confirmed" persistent = phase == "confirmed" return { "persistent": persistent, "phase": phase, "score": breakout["score"], "micro_impulse": breakout["micro_impulse"], "micro_bias": breakout["micro_bias"], "meso_structure": breakout["meso_structure"], "meso_bias": breakout["meso_bias"], "macro_bias": breakout["macro_bias"], "alignment": breakout["alignment"], "friction": breakout["friction"], "time_window_memory": memory, "argus_regime": argus_context["regime"], "argus_confidence": argus_context["confidence"], "argus_compression_active": argus_context["compression_active"], } def _select_current_primary(strategies: list[dict[str, Any]]) -> dict[str, Any] | None: primaries = [s for s in strategies if s["strategy_type"] in {"grid_trader", "trend_follower", "exposure_protector"} and s.get("mode") != "off"] if not primaries: return None active = next((s for s in primaries if s.get("mode") == "active"), None) if active: return active return primaries[0] def _inventory_breakout_is_directionally_compatible(inventory_state: str, breakout: dict[str, Any]) -> bool: inventory_state = _inventory_state_label(inventory_state) macro_bias = str(breakout.get("macro_bias") or "mixed") meso_bias = str(breakout.get("meso_bias") or "neutral") bullish = macro_bias == "bullish" and meso_bias == "bullish" bearish = macro_bias == "bearish" and meso_bias == "bearish" if bullish and inventory_state in {"depleted_base_side", "quote_heavy"}: return True if bearish and inventory_state in {"depleted_quote_side", "base_heavy"}: return True return False def _trend_cooling_edge(narrative_payload: dict[str, Any], wallet_state: dict[str, Any], profile_config: dict[str, Any] | None = None) -> bool: profile_config = profile_config if isinstance(profile_config, dict) else {} scoped = narrative_payload.get("scoped_state") if isinstance(narrative_payload.get("scoped_state"), dict) else {} short_term_dislocated = _short_term_trend_dislocated(narrative_payload) micro = scoped.get("micro") if isinstance(scoped.get("micro"), dict) else {} meso = scoped.get("meso") if isinstance(scoped.get("meso"), dict) else {} micro_impulse = str(micro.get("impulse") or "mixed") micro_bias = str(micro.get("trend_bias") or "mixed") micro_location = str(micro.get("location") or "unknown") micro_reversal_risk = str(micro.get("reversal_risk") or "low") meso_bias = str(meso.get("momentum_bias") or "neutral") meso_structure = str(meso.get("structure") or "rotation") inventory_state = _inventory_state_label(wallet_state.get("inventory_state")) early_reversal_warning = micro_reversal_risk in {"medium", "high"} short_term_warning = short_term_dislocated and meso_structure == "trend_continuation" micro_weight = _safe_float(profile_config.get("micro_trend_weight")) if micro_weight is None: micro_weight = 0.8 meso_weight = _safe_float(profile_config.get("meso_trend_weight")) if meso_weight is None: meso_weight = 1.0 cooling_threshold = _safe_float(profile_config.get("trend_cooling_threshold")) if cooling_threshold is None: cooling_threshold = 0.45 bullish_inventory_pressure = inventory_state in {"base_heavy", "critically_unbalanced", "depleted_quote_side"} bearish_inventory_pressure = inventory_state in {"quote_heavy", "critically_unbalanced", "depleted_base_side"} bullish_cooling_score = 0.0 if meso_structure == "trend_continuation" and meso_bias == "bullish": bullish_cooling_score += 0.15 * meso_weight if micro_impulse == "mixed": bullish_cooling_score += 0.15 * micro_weight if early_reversal_warning: bullish_cooling_score += 0.25 * micro_weight if short_term_warning: bullish_cooling_score += 0.32 * micro_weight if micro_bias == "bearish": bullish_cooling_score += 0.15 * micro_weight if micro_location in {"near_upper_band", "upper_half", "centered"}: bullish_cooling_score += 0.15 * micro_weight bearish_cooling_score = 0.0 if meso_structure == "trend_continuation" and meso_bias == "bearish": bearish_cooling_score += 0.15 * meso_weight if micro_impulse == "mixed": bearish_cooling_score += 0.15 * micro_weight if early_reversal_warning: bearish_cooling_score += 0.25 * micro_weight if short_term_warning: bearish_cooling_score += 0.32 * micro_weight if micro_bias == "bullish": bearish_cooling_score += 0.15 * micro_weight if micro_location in {"near_lower_band", "lower_half", "centered"}: bearish_cooling_score += 0.15 * micro_weight bullish_cooling = ( meso_structure == "trend_continuation" and meso_bias == "bullish" and (micro_impulse == "mixed" or early_reversal_warning or short_term_warning) and micro_bias in {"mixed", "bearish", "bullish"} and (short_term_warning or micro_location in {"near_upper_band", "upper_half", "centered"}) and bullish_cooling_score >= cooling_threshold ) bearish_cooling = ( meso_structure == "trend_continuation" and meso_bias == "bearish" and (micro_impulse == "mixed" or early_reversal_warning or short_term_warning) and micro_bias in {"mixed", "bullish", "bearish"} and (short_term_warning or micro_location in {"near_lower_band", "lower_half", "centered"}) and bearish_cooling_score >= cooling_threshold ) return bullish_cooling or bearish_cooling def _grid_fill_proximity(strategy: dict[str, Any], narrative_payload: dict[str, Any]) -> dict[str, Any]: state = strategy.get("state") if isinstance(strategy.get("state"), dict) else {} orders = state.get("orders") if isinstance(state.get("orders"), list) else [] features = narrative_payload.get("features_by_timeframe") if isinstance(narrative_payload.get("features_by_timeframe"), dict) else {} micro_raw = features.get("1m", {}).get("raw", {}) if isinstance(features.get("1m"), dict) else {} current_price = _safe_float(micro_raw.get("price") or state.get("last_price") or state.get("center_price")) atr_percent = _safe_float(micro_raw.get("atr_percent")) or 0.0 if not current_price or current_price <= 0: return {"near_fill": False} sell_prices: list[float] = [] buy_prices: list[float] = [] for order in orders: if not isinstance(order, dict): continue if str(order.get("status") or "open").lower() not in {"open", "live", "active"}: continue price = _safe_float(order.get("price")) if price is None or price <= 0: continue side = str(order.get("side") or "").lower() if side == "sell" and price >= current_price: sell_prices.append(price) elif side == "buy" and price <= current_price: buy_prices.append(price) next_sell = min(sell_prices) if sell_prices else None next_buy = max(buy_prices) if buy_prices else None next_sell_distance_pct = (((next_sell - current_price) / current_price) * 100.0) if next_sell else None next_buy_distance_pct = (((current_price - next_buy) / current_price) * 100.0) if next_buy else None threshold_pct = max(0.25, atr_percent * 1.5) near_sell_fill = bool( next_sell_distance_pct is not None and next_sell_distance_pct >= 0 and next_sell_distance_pct <= threshold_pct and next_buy is not None ) near_buy_fill = bool( next_buy_distance_pct is not None and next_buy_distance_pct >= 0 and next_buy_distance_pct <= threshold_pct and next_sell is not None ) near_fill_side: str | None = None if near_sell_fill and near_buy_fill: near_fill_side = "sell" if (next_sell_distance_pct or 0.0) <= (next_buy_distance_pct or 0.0) else "buy" elif near_sell_fill: near_fill_side = "sell" elif near_buy_fill: near_fill_side = "buy" return { "near_fill": bool(near_sell_fill or near_buy_fill), "near_fill_side": near_fill_side, "near_sell_fill": near_sell_fill, "near_buy_fill": near_buy_fill, "current_price": current_price, "next_sell": next_sell, "next_buy": next_buy, "next_sell_distance_pct": round(next_sell_distance_pct, 4) if next_sell_distance_pct is not None else None, "next_buy_distance_pct": round(next_buy_distance_pct, 4) if next_buy_distance_pct is not None else None, "threshold_pct": round(threshold_pct, 4), } def _grid_fill_fights_breakout(grid_fill: dict[str, Any], breakout: dict[str, Any]) -> bool: """Whether a nearby grid fill is trading against the breakout move. Current product requirement: grid proximity-to-fills should not block or trigger a handoff. We only care about overall regime/tradeoff (fees vs staying), not which side happens to fill. """ return False def _recent_1m_price_trace(history_window: dict[str, Any] | None) -> list[tuple[datetime, float]]: recent_states = history_window.get("recent_states") if isinstance(history_window, dict) and isinstance(history_window.get("recent_states"), list) else [] trace: list[tuple[datetime, float]] = [] for row in recent_states: if not isinstance(row, dict): continue try: payload = json.loads(row.get("payload_json") or "{}") except Exception: continue features = payload.get("features_by_timeframe") if isinstance(payload.get("features_by_timeframe"), dict) else {} micro = features.get("1m") if isinstance(features.get("1m"), dict) else {} raw = micro.get("raw") if isinstance(micro.get("raw"), dict) else {} price = _safe_float(raw.get("price")) if price is None: continue timestamp = _parse_timestamp(row.get("created_at") or payload.get("generated_at")) if timestamp is None: continue trace.append((timestamp, price)) trace.sort(key=lambda item: item[0]) return trace def _breakout_direction(breakout: dict[str, Any], stance: str | None = None) -> str | None: meso_bias = str(breakout.get("meso_bias") or "") micro_bias = str(breakout.get("micro_bias") or "") if meso_bias in {"bullish", "bearish"}: return meso_bias if micro_bias in {"bullish", "bearish"}: return micro_bias stance_text = str(stance or "") if "bullish" in stance_text: return "bullish" if "bearish" in stance_text: return "bearish" return None def _narrative_direction(narrative: dict[str, Any]) -> str | None: stance = str(narrative.get("stance") or "") breakout = narrative.get("grid_breakout_pressure") if isinstance(narrative.get("grid_breakout_pressure"), dict) else {} direction = _breakout_direction(breakout, stance) if direction: return direction if stance in {"constructive_bullish", "cautious_bullish", "fragile_bullish"}: return "bullish" if stance in {"constructive_bearish", "cautious_bearish", "fragile_bearish"}: return "bearish" return None def _direction_label_from_score(score: float, bullish_threshold: float = 0.18) -> str: if score >= bullish_threshold: return "bullish" if score <= -bullish_threshold: return "bearish" return "mixed" def _extract_decision_signals(*, narrative_payload: dict[str, Any], wallet_state: dict[str, Any], grid_strategy: dict[str, Any] | None = None, breakout: dict[str, Any] | None = None, history_window: dict[str, Any] | None = None, decision_profile: dict[str, Any] | None = None, ) -> dict[str, Any]: scoped = narrative_payload.get("scoped_state") if isinstance(narrative_payload.get("scoped_state"), dict) else {} cross = narrative_payload.get("cross_scope_summary") if isinstance(narrative_payload.get("cross_scope_summary"), dict) else {} features = narrative_payload.get("features_by_timeframe") if isinstance(narrative_payload.get("features_by_timeframe"), dict) else {} opportunity_map = narrative_payload.get("opportunity_map") if isinstance(narrative_payload.get("opportunity_map"), dict) else {} embedded = narrative_payload.get("decision_inputs") if isinstance(narrative_payload.get("decision_inputs"), dict) else {} micro = scoped.get("micro") if isinstance(scoped.get("micro"), dict) else {} meso = scoped.get("meso") if isinstance(scoped.get("meso"), dict) else {} macro = scoped.get("macro") if isinstance(scoped.get("macro"), dict) else {} micro_features = features.get("1m") if isinstance(features.get("1m"), dict) else {} micro_vol = micro_features.get("volatility") if isinstance(micro_features.get("volatility"), dict) else {} micro_raw = micro_features.get("raw") if isinstance(micro_features.get("raw"), dict) else {} recent_prices = _recent_1m_price_trace(history_window) continuation = float(opportunity_map.get("continuation") or 0.0) alignment = str(cross.get("alignment") or "partial_alignment") friction = str(cross.get("friction") or "medium") micro_impulse = str(micro.get("impulse") or "mixed") micro_bias = str(micro.get("trend_bias") or "mixed") micro_location = str(micro.get("location") or embedded.get("micro_location") or "unknown") micro_reversal_risk = str(micro.get("reversal_risk") or "low") meso_structure = str(meso.get("structure") or "rotation") meso_bias = str(meso.get("momentum_bias") or "neutral") macro_bias = str(macro.get("bias") or "mixed") profile_config = _decision_profile_config(decision_profile) short_term_trend_min_score = _safe_float(profile_config.get("short_term_trend_min_score")) if short_term_trend_min_score is None: short_term_trend_min_score = _safe_float(profile_config.get("short_term_confirmation_min")) if short_term_trend_min_score is None: short_term_trend_min_score = 0.32 breakout_persistence_min = _safe_float(profile_config.get("breakout_persistence_min")) if breakout_persistence_min is None: breakout_persistence_min = 0.65 trend_hold_threshold = _safe_float(profile_config.get("trend_hold_threshold")) if trend_hold_threshold is None: trend_hold_threshold = 0.56 grid_release_threshold = _safe_float(profile_config.get("grid_release_threshold")) if grid_release_threshold is None: grid_release_threshold = 0.35 structural_direction = str(embedded.get("structural_direction") or "") if structural_direction not in {"bullish", "bearish"}: structural_direction = meso_bias if meso_bias in {"bullish", "bearish"} else macro_bias if macro_bias in {"bullish", "bearish"} else "mixed" structural_strength = _safe_float(embedded.get("structural_trend_strength")) if structural_strength is None: structural_strength = 0.0 if meso_structure == "trend_continuation" and meso_bias in {"bullish", "bearish"}: structural_strength += 0.45 elif meso_structure in {"bullish_pullback", "bearish_pullback"} and meso_bias in {"bullish", "bearish"}: structural_strength += 0.25 if macro_bias in {"bullish", "bearish"} and macro_bias == structural_direction: structural_strength += 0.25 if alignment == "micro_meso_macro_aligned": structural_strength += 0.2 elif alignment == "partial_alignment": structural_strength += 0.1 if friction == "high": structural_strength -= 0.18 structural_strength = round(_clamp(structural_strength, 0.0, 1.0), 4) tactical_direction = str(embedded.get("tactical_direction") or "") if tactical_direction not in {"bullish", "bearish", "mixed"}: micro_score = 0.0 if micro_impulse == "up": micro_score += 0.35 elif micro_impulse == "down": micro_score -= 0.35 if micro_bias == "bullish": micro_score += 0.45 elif micro_bias == "bearish": micro_score -= 0.45 tactical_direction = _direction_label_from_score(micro_score) tactical_strength = _safe_float(embedded.get("tactical_trend_strength")) if tactical_strength is None: tactical_strength = 0.0 if micro_impulse in {"up", "down"} and micro_bias in {"bullish", "bearish"}: tactical_strength += 0.45 elif micro_impulse in {"up", "down"}: tactical_strength += 0.2 if micro_location in {"near_upper_band", "near_lower_band"}: tactical_strength += 0.1 if micro_reversal_risk == "medium": tactical_strength -= 0.12 elif micro_reversal_risk == "high": tactical_strength -= 0.25 tactical_strength = round(_clamp(tactical_strength, 0.0, 1.0), 4) tactical_range_quality = _safe_float(embedded.get("tactical_range_quality")) if tactical_range_quality is None: tactical_range_quality = 0.0 if micro_impulse == "mixed": tactical_range_quality += 0.35 if micro_bias == "mixed": tactical_range_quality += 0.2 if micro_location in {"centered", "lower_half", "upper_half"}: tactical_range_quality += 0.18 if friction == "high": tactical_range_quality += 0.08 if micro_reversal_risk == "high": tactical_range_quality -= 0.08 tactical_range_quality = round(_clamp(tactical_range_quality, 0.0, 1.0), 4) tactical_easing = bool(embedded.get("tactical_easing")) if not tactical_easing: tactical_easing = bool( meso_structure == "trend_continuation" and meso_bias in {"bullish", "bearish"} and ( micro_impulse == "mixed" or micro_bias == "mixed" or micro_reversal_risk in {"medium", "high"} or micro_location == "centered" ) ) breakout = breakout or {} breakout_phase = str(breakout.get("phase") or "none") breakout_persistence = 1.0 if bool(breakout.get("persistent")) else 0.65 if breakout_phase == "developing" else 0.35 if breakout_phase == "probing" else 0.0 grid_step_pct = None if grid_strategy: state = grid_strategy.get("state") if isinstance(grid_strategy.get("state"), dict) else {} config = grid_strategy.get("config") if isinstance(grid_strategy.get("config"), dict) else {} grid_step_pct = _safe_float(config.get("grid_step_pct") or state.get("grid_step_pct") or state.get("recenter_pct_live")) atr_percent = _safe_float(embedded.get("micro_atr_percent")) if atr_percent is None: atr_percent = _safe_float(micro_raw.get("atr_percent")) band_width_pct = _safe_float(embedded.get("micro_bollinger_width_pct")) if band_width_pct is None: band_width_pct = _safe_float(micro_vol.get("bollinger_width_pct")) noise_pct = max(band_width_pct or 0.0, (atr_percent or 0.0) * 2.0) pullback_to_grid_ratio = None if grid_step_pct and grid_step_pct > 0: pullback_to_grid_ratio = noise_pct / max(grid_step_pct * 100.0, 0.0001) recent_move_pct = 0.0 recent_move_window_minutes = 0 recent_move_direction = "mixed" if recent_prices: current_price = _safe_float(micro_raw.get("price")) or recent_prices[-1][1] first_price = recent_prices[0][1] if first_price > 0: recent_move_pct = ((current_price - first_price) / first_price) * 100.0 recent_move_window_minutes = max(0, int((recent_prices[-1][0] - recent_prices[0][0]).total_seconds() / 60.0)) if recent_move_pct > 0: recent_move_direction = "bullish" elif recent_move_pct < 0: recent_move_direction = "bearish" rapid_directional_pressure = bool( recent_move_direction in {"bullish", "bearish"} and abs(recent_move_pct) >= max(0.8, (atr_percent or 0.0) * 2.5) and recent_move_window_minutes >= 10 and structural_direction == recent_move_direction and tactical_direction == recent_move_direction and macro_bias == recent_move_direction ) if breakout and isinstance(breakout, dict): rapid_directional_pressure = bool( rapid_directional_pressure or ( breakout.get("persistent") and str(breakout.get("macro_bias") or "") == recent_move_direction and str(breakout.get("meso_bias") or "") == recent_move_direction and str(breakout.get("micro_bias") or "") == recent_move_direction and abs(recent_move_pct) >= max(0.6, (atr_percent or 0.0) * 1.8) ) ) rapid_downside_pressure = bool(rapid_directional_pressure and recent_move_direction == "bearish") short_term_trend_score = _short_term_trend_manifest_score(narrative_payload, structural_direction) harvestability_score = tactical_range_quality * 0.45 if pullback_to_grid_ratio is not None: harvestability_score += min(pullback_to_grid_ratio, 2.0) * 0.22 elif atr_percent is not None: harvestability_score += min((atr_percent or 0.0) / 0.5, 1.0) * 0.18 if tactical_easing: harvestability_score += 0.18 if micro_location in {"centered", "lower_half", "upper_half"}: harvestability_score += 0.08 if breakout_persistence >= 1.0 and not tactical_easing and tactical_strength >= 0.5: harvestability_score -= 0.3 harvestability_score = round(_clamp(harvestability_score, 0.0, 1.0), 4) inventory_state = _inventory_state_label(wallet_state.get("inventory_state")) within_rebalance_tolerance = _wallet_within_rebalance_tolerance(wallet_state, 0.3) if wallet_state.get("grid_ready"): wallet_grid_usability = 1.0 elif within_rebalance_tolerance: wallet_grid_usability = 0.78 elif inventory_state in {"base_heavy", "quote_heavy"}: wallet_grid_usability = 0.42 elif inventory_state in SEVERE_INVENTORY_STATES: wallet_grid_usability = 0.12 else: wallet_grid_usability = 0.3 if scoped: trend_hold_strength = ( structural_strength * 0.34 + tactical_strength * 0.24 + breakout_persistence * 0.14 + min(short_term_trend_score, 1.0) * 0.10 + continuation * 0.18 ) else: trend_hold_strength = continuation * 0.9 + breakout_persistence * 0.1 if tactical_easing: trend_hold_strength -= 0.18 if tactical_direction not in {"mixed", structural_direction}: trend_hold_strength -= 0.16 if short_term_trend_score < short_term_trend_min_score: trend_hold_strength -= min(short_term_trend_min_score - short_term_trend_score, 0.25) trend_hold_strength = round(_clamp(trend_hold_strength, 0.0, 1.0), 4) trend_following_pressure = bool( ( structural_direction in {"bullish", "bearish"} and tactical_direction == structural_direction and breakout_persistence >= breakout_persistence_min and trend_hold_strength >= trend_hold_threshold ) or ( not scoped and continuation >= 0.7 and not tactical_easing ) ) grid_harvestable_now = bool( harvestability_score >= 0.48 and wallet_grid_usability >= 0.35 ) rebalancer_release_ready = bool( within_rebalance_tolerance and ( ( harvestability_score >= 0.35 and (tactical_easing or breakout_persistence < 1.0 or tactical_range_quality >= 0.35) ) or (wallet_state.get("grid_ready") and breakout_persistence < 1.0) or (tactical_range_quality >= grid_release_threshold and breakout_persistence < 0.75) ) ) return { "structural_direction": structural_direction, "structural_trend_strength": structural_strength, "tactical_direction": tactical_direction, "tactical_trend_strength": tactical_strength, "tactical_range_quality": tactical_range_quality, "tactical_easing": tactical_easing, "breakout_persistence_score": round(breakout_persistence, 4), "micro_location": micro_location, "micro_atr_percent": atr_percent, "micro_bollinger_width_pct": band_width_pct, "grid_step_pct": round(grid_step_pct, 6) if grid_step_pct is not None else None, "pullback_to_grid_ratio": round(pullback_to_grid_ratio, 4) if pullback_to_grid_ratio is not None else None, "grid_harvestability_score": harvestability_score, "wallet_grid_usability": round(wallet_grid_usability, 4), "within_rebalance_tolerance": within_rebalance_tolerance, "rebalance_tolerance": 0.3, "trend_following_pressure": trend_following_pressure, "trend_hold_strength": trend_hold_strength, "trend_hold_threshold": round(trend_hold_threshold, 4), "rapid_directional_pressure": rapid_directional_pressure, "rapid_downside_pressure": rapid_downside_pressure, "recent_move_pct": round(recent_move_pct, 4), "recent_move_window_minutes": recent_move_window_minutes, "short_term_trend_min_score": round(short_term_trend_min_score, 4), "breakout_persistence_min": round(breakout_persistence_min, 4), "grid_release_threshold": round(grid_release_threshold, 4), "short_term_trend_score": short_term_trend_score, "grid_harvestable_now": grid_harvestable_now, "rebalancer_release_ready": rebalancer_release_ready, } def _strategy_trade_side(strategy: dict[str, Any]) -> str: config = strategy.get("config") if isinstance(strategy.get("config"), dict) else {} state = strategy.get("state") if isinstance(strategy.get("state"), dict) else {} side = str(config.get("trade_side") or state.get("trade_side") or strategy.get("trade_side") or "both").strip().lower() return side if side in {"buy", "sell", "both"} else "both" def _trend_handoff_level_threshold(breakout: dict[str, Any]) -> float: memory = breakout.get("time_window_memory") if isinstance(breakout.get("time_window_memory"), dict) else {} if bool(memory.get("promoted_to_confirmed")): return 2.0 return 2.75 def _grid_switch_tradeoff(*, current_primary: dict[str, Any], wallet_state: dict[str, Any], breakout: dict[str, Any], grid_fill: dict[str, Any], grid_pressure: dict[str, Any], directional_micro_clear: bool, decision_signals: dict[str, Any], trend: dict[str, Any] | None, ) -> dict[str, Any]: inventory_state = _inventory_state_label(wallet_state.get("inventory_state")) supervision = current_primary.get("supervision") if isinstance(current_primary.get("supervision"), dict) else {} open_order_count = int(current_primary.get("open_order_count") or 0) if not open_order_count: state = current_primary.get("state") if isinstance(current_primary.get("state"), dict) else {} open_order_count = int(state.get("open_order_count") or len(state.get("orders") or []) or 0) adverse_side = str(supervision.get("adverse_side") or "unknown") adverse_count = int(supervision.get("adverse_side_open_order_count") or 0) adverse_notional = float(supervision.get("adverse_side_open_order_notional_quote") or 0.0) adverse_distance = _safe_float(supervision.get("adverse_side_nearest_distance_pct")) base_order_notional = 1.0 config = current_primary.get("config") if isinstance(current_primary.get("config"), dict) else {} for candidate in (config.get("order_notional_quote"), config.get("max_order_notional_quote")): candidate_value = _safe_float(candidate) if candidate_value and candidate_value > base_order_notional: base_order_notional = candidate_value trend_score = float(trend.get("score") or 0.0) if trend else 0.0 structural_strength = float(decision_signals.get("structural_trend_strength") or 0.0) tactical_strength = float(decision_signals.get("tactical_trend_strength") or 0.0) harvestability_score = float(decision_signals.get("grid_harvestability_score") or 0.0) breakout_score = float(breakout.get("score") or 0.0) short_term_trend_score = float(decision_signals.get("short_term_trend_score") or 0.0) levels = float(grid_pressure.get("levels") or 0.0) near_fill = bool(grid_fill.get("near_fill")) fill_fights = _grid_fill_fights_breakout(grid_fill, breakout) persistent = bool(breakout.get("persistent")) trend_ready = bool(decision_signals.get("trend_following_pressure")) and directional_micro_clear stay_cost = 0.0 switch_benefit = 0.0 if persistent: switch_benefit += 0.28 if trend_ready: switch_benefit += 0.34 # Requirement: ignore nearby fill timing/side when estimating the stay-vs-switch tradeoff. if levels >= _trend_handoff_level_threshold(breakout): switch_benefit += 0.18 switch_benefit += structural_strength * 0.26 switch_benefit += tactical_strength * 0.16 switch_benefit += min(trend_score, 2.0) * 0.04 switch_benefit += min(breakout_score, 5.0) * 0.04 if short_term_trend_score < 0.68: short_term_gap = 0.68 - short_term_trend_score switch_benefit -= short_term_gap * 1.15 stay_cost += short_term_gap * 0.42 if adverse_side in {"buy", "sell"} and adverse_count > 0: adverse_notional_ratio = adverse_notional / max(base_order_notional, 1.0) switch_benefit += min(adverse_count, 8) * 0.02 if adverse_distance is not None and adverse_distance <= 1.25: switch_benefit += 0.08 stay_cost += min(adverse_notional_ratio, 4.0) * 0.07 else: adverse_notional_ratio = 0.0 if inventory_state == "balanced": stay_cost += 0.06 elif inventory_state in {"base_heavy", "quote_heavy"}: stay_cost += 0.16 elif inventory_state in SEVERE_INVENTORY_STATES: stay_cost += 0.28 else: stay_cost += 0.1 stay_cost += min(levels, 6.0) * 0.06 stay_cost += min(open_order_count, 8) * 0.025 # Requirement: ignore nearby fill timing/side when estimating the stay-vs-switch tradeoff. if not persistent: stay_cost += 0.12 if adverse_notional_ratio >= 1.0: stay_cost += 0.08 stay_cost += harvestability_score * 0.18 margin = round(switch_benefit - stay_cost, 4) should_switch = persistent and trend_ready and margin > 0.0 return { "trend_score": round(trend_score, 4), "structural_trend_strength": round(structural_strength, 4), "tactical_trend_strength": round(tactical_strength, 4), "grid_harvestability_score": round(harvestability_score, 4), "short_term_trend_score": round(short_term_trend_score, 4), "breakout_score": round(breakout_score, 4), "switch_benefit": round(switch_benefit, 4), "stay_cost": round(stay_cost, 4), "margin": margin, "should_switch": should_switch, "trend_ready": trend_ready, "persistent": persistent, "levels": round(levels, 4), "open_order_count": open_order_count, "near_fill": near_fill, "fill_fights": fill_fights, "adverse_side": adverse_side, "adverse_side_open_order_count": adverse_count, "adverse_side_open_order_notional_quote": round(adverse_notional, 4), "adverse_side_nearest_distance_pct": round(adverse_distance, 4) if adverse_distance is not None else None, "inventory_state": inventory_state, } def _grid_trend_pressure(strategy: dict[str, Any], narrative_payload: dict[str, Any]) -> dict[str, Any]: state = strategy.get("state") if isinstance(strategy.get("state"), dict) else {} config = strategy.get("config") if isinstance(strategy.get("config"), dict) else {} features = narrative_payload.get("features_by_timeframe") if isinstance(narrative_payload.get("features_by_timeframe"), dict) else {} micro_raw = features.get("1m", {}).get("raw", {}) if isinstance(features.get("1m"), dict) else {} current_price = _safe_float(micro_raw.get("price") or state.get("last_price") or state.get("center_price")) center_price = _safe_float(state.get("center_price") or state.get("last_price")) step_pct = _safe_float(config.get("grid_step_pct") or state.get("grid_step_pct") or state.get("recenter_pct_live")) or 0.0 if not current_price or not center_price or current_price <= 0 or center_price <= 0 or step_pct <= 0: return {"levels": 0.0, "rounded_levels": 0, "direction": "unknown", "current_price": current_price, "center_price": center_price, "step_pct": step_pct} distance_pct = abs(current_price - center_price) / center_price levels = distance_pct / step_pct direction = "bullish" if current_price > center_price else "bearish" if current_price < center_price else "flat" return { "levels": round(levels, 4), "rounded_levels": int(levels), "direction": direction, "current_price": current_price, "center_price": center_price, "step_pct": step_pct, "distance_pct": round(distance_pct, 4), } def _grid_can_still_work(strategy: dict[str, Any], wallet_state: dict[str, Any], grid_fill: dict[str, Any]) -> bool: supervision = strategy.get("supervision") if isinstance(strategy.get("supervision"), dict) else {} side_capacity = supervision.get("side_capacity") if isinstance(supervision.get("side_capacity"), dict) else {} buy_capacity = bool(side_capacity.get("buy", False)) sell_capacity = bool(side_capacity.get("sell", False)) open_order_count = int(strategy.get("open_order_count") or 0) degraded = bool(supervision.get("degraded")) inventory_state = _inventory_state_label(wallet_state.get("inventory_state")) if degraded: return False if buy_capacity or sell_capacity: return True if open_order_count > 0: return True if grid_fill.get("near_fill"): return True return inventory_state not in SEVERE_INVENTORY_STATES def _grid_is_truly_stuck_for_recovery(strategy: dict[str, Any], wallet_state: dict[str, Any], grid_fill: dict[str, Any]) -> bool: if _grid_can_still_work(strategy, wallet_state, grid_fill): return False inventory_state = _inventory_state_label(wallet_state.get("inventory_state")) return wallet_state.get("rebalance_needed") and inventory_state in SEVERE_INVENTORY_STATES def _wallet_within_rebalance_tolerance(wallet_state: dict[str, Any], tolerance: float = 0.3) -> bool: imbalance = _safe_float(wallet_state.get("imbalance_score")) if imbalance is None: base_ratio = _safe_float(wallet_state.get("base_ratio")) if base_ratio is not None: imbalance = abs(base_ratio - 0.5) if imbalance is None: return str(wallet_state.get("inventory_state") or "").lower() == "balanced" return imbalance <= tolerance def _decide_for_grid(*, current_primary: dict[str, Any], stance: str, inventory_state: str, wallet_state: dict[str, Any], breakout: dict[str, Any], grid_fill: dict[str, Any], grid_pressure: dict[str, Any], directional_micro_clear: bool, severe_imbalance: bool, decision_signals: dict[str, Any], trend: dict[str, Any] | None, rebalance: dict[str, Any] | None, ) -> tuple[str, str, str | None, list[str], list[str]]: action = "keep_grid" mode = "observe" target_strategy = current_primary["id"] reasons: list[str] = [] blocks: list[str] = [] inventory_state = _inventory_state_label(inventory_state) # Grid is the base mode. Leave it only for a persistent breakout or when # the grid has genuinely lost its ability to recover on its own. grid_friendly_stance = stance in {"neutral_rotational", "breakout_watch", "cautious_bullish", "cautious_bearish", "fragile_bullish", "fragile_bearish"} grid_can_work = _grid_can_still_work(current_primary, wallet_state, grid_fill) grid_stuck_for_recovery = _grid_is_truly_stuck_for_recovery(current_primary, wallet_state, grid_fill) persistent_breakout = bool(breakout["persistent"]) breakout_phase = str(breakout.get("phase") or "none") breakout_direction = _breakout_direction(breakout, stance) trend_handoff_ready = bool( trend and bool(decision_signals.get("trend_following_pressure")) and grid_pressure.get("levels", 0.0) >= _trend_handoff_level_threshold(breakout) ) fill_fights_breakout = _grid_fill_fights_breakout(grid_fill, breakout) switch_tradeoff = _grid_switch_tradeoff( current_primary=current_primary, wallet_state=wallet_state, breakout=breakout, grid_fill=grid_fill, grid_pressure=grid_pressure, directional_micro_clear=directional_micro_clear, decision_signals=decision_signals, trend=trend, ) rapid_directional = bool(decision_signals.get("rapid_directional_pressure")) directional_pressure = breakout_direction if breakout_direction in {"bullish", "bearish"} else "mixed" all_scopes_aligned = ( directional_pressure in {"bullish", "bearish"} and str(decision_signals.get("structural_direction") or "") == directional_pressure and str(decision_signals.get("tactical_direction") or "") == directional_pressure and str(grid_pressure.get("direction") or "") == directional_pressure ) repair_inventory_match = bool( (directional_pressure == "bullish" and inventory_state in {"quote_heavy", "critically_unbalanced"}) or (directional_pressure == "bearish" and inventory_state in {"base_heavy", "critically_unbalanced"}) ) urgent_rebalance_exit = bool( rebalance and wallet_state.get("rebalance_needed") and rapid_directional and all_scopes_aligned and repair_inventory_match ) if urgent_rebalance_exit: action = "replace_with_exposure_protector" target_strategy = rebalance["strategy_id"] mode = "act" reasons.append("wallet is skewed and the directional move is accelerating, so exposure repair should happen before the trend handoff") reasons.append( f"recent 1m history moved {decision_signals.get('recent_move_pct', 0.0):.2f}% over about {decision_signals.get('recent_move_window_minutes', 0)} minutes" ) return action, mode, target_strategy, reasons, blocks urgent_trend_exit = bool( trend and persistent_breakout and bool(decision_signals.get("trend_following_pressure")) and all_scopes_aligned and ( rapid_directional or grid_fill.get("near_fill") or inventory_state in SEVERE_INVENTORY_STATES ) ) if urgent_trend_exit: action = "replace_with_trend_follower" target_strategy = trend["strategy_id"] if trend else target_strategy mode = "act" reasons.append("all scopes line up and the tape is moving fast, so grid should yield early") if rapid_directional: reasons.append( f"recent 1m history moved {decision_signals.get('recent_move_pct', 0.0):.2f}% over about {decision_signals.get('recent_move_window_minutes', 0)} minutes" ) if grid_pressure.get("levels", 0.0) < _trend_handoff_level_threshold(breakout): reasons.append("handoff is happening early, before the normal level threshold, because directional acceleration is sharp") if grid_fill.get("near_fill"): reasons.append("grid fill pressure is already near the market") return action, mode, target_strategy, reasons, blocks if severe_imbalance and persistent_breakout: reasons.append("grid imbalance now coincides with persistent breakout pressure") directional_inventory = _inventory_breakout_is_directionally_compatible(inventory_state, breakout) if switch_tradeoff["should_switch"] and trend_handoff_ready and ( not wallet_state.get("rebalance_needed") or directional_inventory or not rebalance or trend["score"] >= rebalance["score"] ): action = "replace_with_trend_follower" target_strategy = trend["strategy_id"] mode = "act" if switch_tradeoff.get("adverse_side_open_order_count", 0) > 0: reasons.append( f"{switch_tradeoff.get('adverse_side')} ladder is exposed near market" ) if directional_inventory: reasons.append("inventory posture can be absorbed by the directional handoff") reasons.append( f"switch benefit ({switch_tradeoff['switch_benefit']:.2f}) exceeds stay cost ({switch_tradeoff['stay_cost']:.2f})" ) elif wallet_state.get("rebalance_needed") and rebalance and rebalance["score"] > 0.35: action = "replace_with_exposure_protector" target_strategy = rebalance["strategy_id"] mode = "act" else: action = "suspend_grid" mode = "warn" elif severe_imbalance and grid_stuck_for_recovery and not persistent_breakout and rebalance and rebalance["score"] > 0.6: action = "replace_with_exposure_protector" target_strategy = rebalance["strategy_id"] mode = "act" reasons.append("grid has lost practical recovery capacity, so inventory repair should take over") elif persistent_breakout and trend_handoff_ready and inventory_state in {"balanced", "base_heavy", "quote_heavy"}: if not switch_tradeoff["should_switch"]: reasons.append( f"breakout is persistent, but staying in grid still looks cheaper than switching (benefit {switch_tradeoff['switch_benefit']:.2f} vs cost {switch_tradeoff['stay_cost']:.2f})" ) if switch_tradeoff.get("adverse_side_open_order_count", 0) > 0: reasons.append( f"{switch_tradeoff.get('adverse_side')} ladder exposure is not yet costly enough to justify the handoff" ) if grid_fill.get("near_fill") and fill_fights_breakout: reasons.append("nearby opposing fill is only a warning here, not enough on its own to justify the handoff") else: action = "replace_with_trend_follower" target_strategy = trend["strategy_id"] if trend else target_strategy mode = "act" if grid_fill.get("near_fill") and fill_fights_breakout: reasons.append("confirmed trend should not be delayed by a nearby grid fill that trades against the move") elif grid_fill.get("near_fill"): reasons.append("confirmed directional pressure is strong enough that nearby grid fills should not delay the trend handoff") else: reasons.append("grid should yield because directional pressure is confirmed and the trend handoff is ready") elif not persistent_breakout and grid_can_work: if breakout_phase == "developing": reasons.append("breakout pressure is developing, but grid can still work and should not be abandoned yet") else: reasons.append("grid can still operate and self-heal, so inventory skew alone should not force a rebalance handoff") if decision_signals.get("grid_harvestable_now"): reasons.append("tactical range quality still looks harvestable for the grid") elif persistent_breakout and grid_fill.get("near_fill") and inventory_state in {"balanced", "base_heavy", "quote_heavy"}: reasons.append("grid is still close to a working fill, but trend handoff is not ready enough yet") elif not grid_friendly_stance and persistent_breakout: reasons.append("grid should yield because directional pressure is persistent across scopes") if trend_handoff_ready: action = "replace_with_trend_follower" target_strategy = trend["strategy_id"] mode = "act" else: mode = "warn" if grid_pressure.get("levels", 0.0) < _trend_handoff_level_threshold(breakout): blocks.append("grid has not yet been eaten by enough levels to justify leaving it") else: blocks.append("directional pressure is rising but the micro layer is not clear enough for a trend handoff") else: reasons.append("grid can likely self-heal because breakout pressure is not yet persistent") return action, mode, target_strategy, reasons, blocks def _decide_for_trend(*, current_primary: dict[str, Any], stance: str, narrative_payload: dict[str, Any], wallet_state: dict[str, Any], grid: dict[str, Any] | None, rebalance: dict[str, Any] | None = None, profile_config: dict[str, Any] | None = None, decision_signals: dict[str, Any] | None = None, ) -> tuple[str, str, str | None, list[str], list[str]]: action = "keep_trend" mode = "observe" target_strategy = current_primary["id"] reasons: list[str] = [] blocks: list[str] = [] decision_signals = decision_signals if isinstance(decision_signals, dict) else {} trend_pressure = bool(decision_signals.get("trend_following_pressure")) trend_hold_strength = float(decision_signals.get("trend_hold_strength") or 0.0) trend_hold_threshold = float(decision_signals.get("trend_hold_threshold") or 0.56) grid_harvestable_now = bool(decision_signals.get("grid_harvestable_now")) # Trend should cool into rebalancing first when the wallet is skewed, then # let rebalancer hand back to grid once the inventory is healthy again. cooling = _trend_cooling_edge(narrative_payload, wallet_state, profile_config) if cooling: if wallet_state.get("rebalance_needed") and rebalance: action = "replace_with_exposure_protector" target_strategy = rebalance["strategy_id"] mode = "act" reasons.append("trend has cooled enough that directional mode no longer justifies staying active") elif grid and wallet_state.get("grid_ready"): action = "replace_with_grid" target_strategy = grid["strategy_id"] mode = "act" reasons.append("trend has cooled and the tape looks suitable for grid again") else: mode = "warn" blocks.append("trend is easing, but neither grid nor rebalancer is ready for a clean handoff") elif not trend_pressure: if grid and wallet_state.get("grid_ready") and grid_harvestable_now: action = "replace_with_grid" target_strategy = grid["strategy_id"] mode = "act" reasons.append(f"trend hold strength {trend_hold_strength:.2f} fell below threshold {trend_hold_threshold:.2f}, so grid can resume") elif wallet_state.get("rebalance_needed") and rebalance: action = "replace_with_exposure_protector" target_strategy = rebalance["strategy_id"] mode = "act" reasons.append(f"trend hold strength {trend_hold_strength:.2f} fell below threshold {trend_hold_threshold:.2f}, so directional mode should yield") else: action = "hold_trend" mode = "warn" blocks.append(f"trend hold strength {trend_hold_strength:.2f} is below threshold {trend_hold_threshold:.2f}, but no clean handoff is available yet") elif stance == "neutral_rotational": if wallet_state.get("rebalance_needed") and rebalance: action = "replace_with_exposure_protector" target_strategy = rebalance["strategy_id"] mode = "act" reasons.append("trend conditions have cooled and rebalancing should repair the wallet before grid resumes") elif grid and wallet_state.get("grid_ready"): action = "replace_with_grid" target_strategy = grid["strategy_id"] mode = "act" reasons.append("trend conditions have cooled and wallet is grid-ready again") elif wallet_state.get("rebalance_needed"): mode = "warn" blocks.append("trend has cooled but rebalancing should be the next hop") else: action = "hold_trend" blocks.append("grid candidate not strong enough yet") else: reasons.append(f"trend hold strength {trend_hold_strength:.2f} still clears threshold {trend_hold_threshold:.2f}") return action, mode, target_strategy, reasons, blocks def _decide_for_rebalancer(*, current_primary: dict[str, Any], stance: str, wallet_state: dict[str, Any], grid: dict[str, Any] | None, decision_signals: dict[str, Any], trend: dict[str, Any] | None = None, decision_profile: dict[str, Any] | None = None, ) -> tuple[str, str, str | None, list[str], list[str]]: action = "keep_rebalancer" mode = "observe" target_strategy = current_primary["id"] reasons: list[str] = [] blocks: list[str] = [] # Rebalancing is a repair phase. Once the wallet is usable again, Hermes # should prefer handing back to grid, not directly to trend. trend_strength = float(trend["score"]) if trend and isinstance(trend.get("score"), (int, float)) else 0.0 within_tolerance = bool(decision_signals.get("within_rebalance_tolerance")) release_ready = bool(decision_signals.get("rebalancer_release_ready")) trend_pressure = bool(decision_signals.get("trend_following_pressure")) grid_harvestable_now = bool(decision_signals.get("grid_harvestable_now")) profile_config = _decision_profile_config(decision_profile) force_grid_when_balanced = bool(profile_config.get("force_grid_when_balanced", True)) hold_rebalancer_until_cooldown = bool(profile_config.get("hold_rebalancer_until_cooldown", False)) if wallet_state.get("grid_ready") and grid and force_grid_when_balanced and not hold_rebalancer_until_cooldown: action = "replace_with_grid" target_strategy = grid["strategy_id"] mode = "act" reasons.append("wallet is rebalanced, so grid should resume first and let the tape prove itself again") elif trend_pressure and not release_ready: blocks.append("trend is still strong enough that rebalancer should keep repairing instead of resetting to grid") elif release_ready: if grid: action = "replace_with_grid" target_strategy = grid["strategy_id"] mode = "act" reasons.append("wallet is usable enough and micro conditions are easing, so grid can resume harvesting") else: blocks.append("wallet is within the rebalance tolerance but no grid candidate is available") elif within_tolerance and not grid_harvestable_now: blocks.append("wallet is close enough, but the local tape is still not harvestable enough for grid release") elif wallet_state.get("grid_ready") and stance == "neutral_rotational": if grid and grid["score"] >= 0.5: action = "replace_with_grid" target_strategy = grid["strategy_id"] mode = "act" reasons.append("rebalance is complete and rotational conditions support grid again") else: blocks.append("wallet is ready but grid fit is still too weak") elif grid and grid_harvestable_now: action = "replace_with_grid" target_strategy = grid["strategy_id"] mode = "act" reasons.append("local price action looks harvestable enough that grid can resume before perfect balance") else: blocks.append("trend candidate is not strong enough yet and grid fit is not ready, so rebalancer should not hand directly back to trend") return action, mode, target_strategy, reasons, blocks def make_decision(*, concern: dict[str, Any], narrative_payload: dict[str, Any], wallet_state: dict[str, Any], strategies: list[dict[str, Any]], history_window: dict[str, Any] | None = None, decision_profile: dict[str, Any] | None = None) -> DecisionSnapshot: concern_account_id = str(concern.get("account_id") or "") concern_market_symbol = str(concern.get("market_symbol") or "").strip().lower() normalized = [ normalize_strategy_snapshot(s) for s in strategies if str(s.get("account_id") or "") == concern_account_id and ( not concern_market_symbol or not str(s.get("market_symbol") or "").strip() or str(s.get("market_symbol") or "").strip().lower() == concern_market_symbol ) ] breakout = _grid_breakout_pressure(narrative_payload, history_window=history_window) narrative_for_scoring = {**narrative_payload, "grid_breakout_pressure": breakout} fit_reports = [score_strategy_fit(strategy=s, narrative=narrative_for_scoring, wallet_state=wallet_state) for s in normalized] ranked = sorted(fit_reports, key=lambda item: item["score"], reverse=True) current_primary = _select_current_primary(normalized) best = ranked[0] if ranked else None stance = str(narrative_payload.get("stance") or "neutral_rotational") inventory_state = _inventory_state_label(wallet_state.get("inventory_state")) scoped = narrative_payload.get("scoped_state") if isinstance(narrative_payload.get("scoped_state"), dict) else {} micro = scoped.get("micro") if isinstance(scoped.get("micro"), dict) else {} micro_impulse = str(micro.get("impulse") or "mixed") micro_bias = str(micro.get("trend_bias") or "mixed") micro_reversal_risk = str(micro.get("reversal_risk") or "low") bullish_micro_clear = micro_impulse == "up" and micro_bias == "bullish" and micro_reversal_risk != "high" bearish_micro_clear = micro_impulse == "down" and micro_bias == "bearish" and micro_reversal_risk != "high" breakout_direction = _breakout_direction(breakout, stance) directional_micro_clear = bullish_micro_clear if breakout_direction == "bullish" else bearish_micro_clear if breakout_direction == "bearish" else False grid_fill = _grid_fill_proximity(current_primary, narrative_payload) if current_primary and current_primary["strategy_type"] == "grid_trader" else {"near_fill": False} grid_pressure = _grid_trend_pressure(current_primary, narrative_payload) if current_primary and current_primary["strategy_type"] == "grid_trader" else {"levels": 0.0, "rounded_levels": 0, "direction": "unknown"} severe_imbalance = inventory_state in SEVERE_INVENTORY_STATES action = "hold" mode = "observe" target_strategy = current_primary.get("id") if current_primary else (best.get("strategy_id") if best else None) reasons: list[str] = [] blocks: list[str] = [] trend = next((r for r in ranked if r["strategy_type"] == "trend_follower"), None) rebalance = next((r for r in ranked if r["strategy_type"] == "exposure_protector"), None) grid = next((r for r in ranked if r["strategy_type"] == "grid_trader"), None) grid_strategy = next((s for s in normalized if s["strategy_type"] == "grid_trader"), None) decision_signals = _extract_decision_signals( narrative_payload=narrative_payload, wallet_state=wallet_state, grid_strategy=grid_strategy, breakout=breakout, history_window=history_window, decision_profile=decision_profile, ) switch_tradeoff: dict[str, Any] = {} if current_primary and current_primary["strategy_type"] == "grid_trader": action, mode, target_strategy, reasons, blocks = _decide_for_grid( current_primary=current_primary, stance=stance, inventory_state=inventory_state, wallet_state=wallet_state, breakout=breakout, grid_fill=grid_fill, grid_pressure=grid_pressure, directional_micro_clear=directional_micro_clear, severe_imbalance=severe_imbalance, decision_signals=decision_signals, trend=trend, rebalance=rebalance, ) switch_tradeoff = _grid_switch_tradeoff( current_primary=current_primary, wallet_state=wallet_state, breakout=breakout, grid_fill=grid_fill, grid_pressure=grid_pressure, directional_micro_clear=directional_micro_clear, decision_signals=decision_signals, trend=trend, ) elif current_primary and current_primary["strategy_type"] == "trend_follower": action, mode, target_strategy, reasons, blocks = _decide_for_trend( current_primary=current_primary, stance=stance, narrative_payload=narrative_payload, wallet_state=wallet_state, grid=grid, rebalance=rebalance, profile_config=_decision_profile_config(decision_profile), decision_signals=decision_signals, ) elif current_primary and current_primary["strategy_type"] == "exposure_protector": action, mode, target_strategy, reasons, blocks = _decide_for_rebalancer( current_primary=current_primary, stance=stance, wallet_state=wallet_state, grid=grid, decision_signals=decision_signals, trend=trend, decision_profile=decision_profile, ) else: if best and best["score"] >= 0.55: action = f"enable_{best['strategy_type']}" target_strategy = best["strategy_id"] mode = "act" reasons.extend(best["reasons"]) else: action = "wait" mode = "observe" blocks.append("no strategy is yet a strong enough fit") profile_config = _decision_profile_config(decision_profile) switch_cost_penalty = _safe_float(profile_config.get("switch_cost_penalty")) if switch_cost_penalty is None: switch_cost_penalty = 1.0 action_cooldown_seconds = int(_safe_float(profile_config.get("action_cooldown_seconds")) or 0) current_score = float(next((r["score"] for r in ranked if current_primary and r["strategy_id"] == current_primary.get("id")), 0.0)) target_score = float(next((r["score"] for r in ranked if r["strategy_id"] == target_strategy), current_score)) switch_edge = round(target_score - current_score, 4) required_switch_edge = round(max(switch_cost_penalty - 1.0, 0.0) * 0.08, 4) cooldown_active, cooldown_remaining, cooldown_action = _recent_switch_cooldown_active(history_window, str(concern.get("id") or ""), action_cooldown_seconds) if mode == "act" and current_primary and target_strategy and target_strategy != current_primary.get("id"): if required_switch_edge > 0 and switch_edge < required_switch_edge: mode = "observe" action = f"keep_{current_primary['strategy_type'].replace('_trader', '').replace('_follower', '').replace('exposure_protector', 'rebalancer')}" target_strategy = current_primary.get("id") reasons = [] blocks = [f"switch edge {switch_edge:.2f} is below required friction {required_switch_edge:.2f}"] elif cooldown_active: mode = "observe" action = f"keep_{current_primary['strategy_type'].replace('_trader', '').replace('_follower', '').replace('exposure_protector', 'rebalancer')}" target_strategy = current_primary.get("id") reasons = [] blocks = [f"switch cooldown active for {cooldown_remaining:.0f}s after {cooldown_action or 'recent switch'}"] reason_summary = reasons[0] if reasons else (blocks[0] if blocks else "strategy posture unchanged") confidence = float(narrative_payload.get("confidence") or 0.4) if action.startswith("replace_with") or action.startswith("enable_"): confidence += 0.08 if wallet_state.get("rebalance_needed") and "grid" in action: confidence -= 0.08 confidence = round(_clamp(confidence, 0.2, 0.95), 3) payload = { "generated_at": datetime.now(timezone.utc).isoformat(), "wallet_state": wallet_state, "narrative_stance": stance, "strategy_fit_ranking": ranked, "current_primary_strategy": current_primary.get("id") if current_primary else None, "argus_decision_context": _argus_decision_context(narrative_payload), "history_window": history_window or {}, "grid_breakout_pressure": breakout, "grid_fill_context": grid_fill, "grid_switch_tradeoff": switch_tradeoff if current_primary and current_primary["strategy_type"] == "grid_trader" else {}, "decision_audit": decision_signals, "switch_friction": { "switch_cost_penalty": round(switch_cost_penalty, 4), "switch_edge": switch_edge, "required_switch_edge": required_switch_edge, "action_cooldown_seconds": action_cooldown_seconds, "cooldown_active": cooldown_active, "cooldown_remaining_seconds": cooldown_remaining, }, "decision_profile": { "id": decision_profile.get("id") if isinstance(decision_profile, dict) else None, "name": decision_profile.get("name") if isinstance(decision_profile, dict) else None, "status": decision_profile.get("status") if isinstance(decision_profile, dict) else None, "config": _decision_profile_config(decision_profile), } if decision_profile else None, "reason_chain": reasons, "blocks": blocks, "decision_version": 3, } return DecisionSnapshot( mode=mode, action=action, target_strategy=target_strategy, reason_summary=reason_summary, confidence=confidence, requires_action=mode == "act", payload=payload, )