| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172 |
- from __future__ import annotations
- """Deterministic strategy-supervision logic for Hermes.
- This is the first decision slice. Hermes is currently acting as a supervisor for
- existing trader strategies, not as a direct trading engine.
- Design intent:
- - prefer one active posture at a time over layered companions
- - detect when grid trading becomes unsafe because market posture or wallet
- balance no longer supports it
- - switch cleanly between directional, range, and rebalancing phases
- """
- import json
- from dataclasses import dataclass
- from datetime import datetime, timezone
- from typing import Any
- @dataclass(frozen=True)
- class DecisionSnapshot:
- mode: str
- action: str
- target_strategy: str | None
- reason_summary: str
- confidence: float
- requires_action: bool
- payload: dict[str, Any]
- def _clamp(value: float, lower: float, upper: float) -> float:
- return max(lower, min(upper, value))
- def _safe_float(value: Any) -> float | None:
- try:
- if value is None:
- return None
- return float(value)
- except Exception:
- return None
- def _inventory_state_label(value: Any) -> str:
- state = str(value or "unknown").strip().lower()
- aliases = {
- "critical": "critically_unbalanced",
- "critically_imbalanced": "critically_unbalanced",
- "depleted_base": "depleted_base_side",
- "depleted_quote": "depleted_quote_side",
- "one_sided_base": "depleted_base_side",
- "one_sided_quote": "depleted_quote_side",
- }
- return aliases.get(state, state)
- SEVERE_INVENTORY_STATES = {"critically_unbalanced", "depleted_base_side", "depleted_quote_side"}
- REBALANCE_INVENTORY_STATES = {"base_heavy", "quote_heavy", *SEVERE_INVENTORY_STATES}
- def _infer_market_pair(concern: dict[str, Any]) -> tuple[str, str]:
- base = str(concern.get("base_currency") or "").strip().upper()
- quote = str(concern.get("quote_currency") or "").strip().upper()
- if base and quote:
- return base, quote
- market = str(concern.get("market_symbol") or "").strip().upper().replace("/", "").replace("-", "")
- for suffix in ("USDT", "USDC", "USD", "EUR", "BTC", "ETH"):
- if market.endswith(suffix) and len(market) > len(suffix):
- inferred_base = market[:-len(suffix)]
- inferred_quote = suffix
- return base or inferred_base, quote or inferred_quote
- return base or market, quote or "USD"
- def assess_wallet_state(*, account_info: dict[str, Any], concern: dict[str, Any], price: float | None, strategies: list[dict[str, Any]] | None = None) -> dict[str, Any]:
- """Summarize inventory health for strategy switching.
- The key output is whether the wallet is balanced enough for range/grid
- harvesting, or so skewed that Hermes should prefer trend capture or
- rebalancing before grid is allowed again.
- """
- balances = account_info.get("balances") if isinstance(account_info.get("balances"), list) else []
- base, quote = _infer_market_pair(concern)
- base_available = 0.0
- quote_available = 0.0
- for item in balances:
- if not isinstance(item, dict):
- continue
- asset = str(item.get("asset_code") or item.get("asset") or "").upper()
- amount = _safe_float(item.get("available") if item.get("available") is not None else item.get("total"))
- if amount is None:
- continue
- if asset == base:
- base_available = amount
- elif asset == quote:
- quote_available = amount
- reserved_base = 0.0
- reserved_quote = 0.0
- for strategy in strategies or []:
- if not isinstance(strategy, dict):
- continue
- if str(strategy.get("account_id") or "").strip() != str(concern.get("account_id") or "").strip():
- continue
- market_symbol = str(strategy.get("market_symbol") or "").strip().lower()
- if market_symbol and market_symbol != str(concern.get("market_symbol") or "").strip().lower():
- continue
- if str(strategy.get("mode") or "off") == "off":
- continue
- state = strategy.get("state") if isinstance(strategy.get("state"), dict) else {}
- orders = state.get("orders") if isinstance(state.get("orders"), list) else []
- for order in orders:
- if not isinstance(order, dict):
- continue
- if str(order.get("status") or "open").lower() not in {"open", "live", "active"}:
- continue
- side = str(order.get("side") or "").lower()
- amount = _safe_float(order.get("amount") or order.get("amount_remaining")) or 0.0
- order_price = _safe_float(order.get("price")) or price or 0.0
- if side == "sell":
- reserved_base += amount
- elif side == "buy":
- reserved_quote += amount * order_price
- price = price or 0.0
- effective_base = base_available + reserved_base
- effective_quote = quote_available + reserved_quote
- base_value = effective_base * price if price > 0 else 0.0
- quote_value = effective_quote
- total_value = base_value + quote_value
- base_ratio = (base_value / total_value) if total_value > 0 else 0.5
- quote_ratio = (quote_value / total_value) if total_value > 0 else 0.5
- imbalance = abs(base_ratio - 0.5)
- if total_value <= 0:
- inventory_state = "unknown"
- elif base_ratio <= 0.02:
- inventory_state = "depleted_base_side"
- elif quote_ratio <= 0.02:
- inventory_state = "depleted_quote_side"
- elif base_ratio < 0.08:
- inventory_state = "critically_unbalanced"
- elif quote_ratio < 0.08:
- inventory_state = "critically_unbalanced"
- elif imbalance >= 0.35:
- inventory_state = "critically_unbalanced"
- elif base_ratio > 0.62:
- inventory_state = "base_heavy"
- elif quote_ratio > 0.62:
- inventory_state = "quote_heavy"
- else:
- inventory_state = "balanced"
- grid_ready = inventory_state == "balanced"
- rebalance_needed = inventory_state in REBALANCE_INVENTORY_STATES
- return {
- "generated_at": datetime.now(timezone.utc).isoformat(),
- "base_currency": base,
- "quote_currency": quote,
- "base_available": round(base_available, 8),
- "quote_available": round(quote_available, 8),
- "base_reserved": round(reserved_base, 8),
- "quote_reserved": round(reserved_quote, 8),
- "base_effective": round(effective_base, 8),
- "quote_effective": round(effective_quote, 8),
- "base_value": round(base_value, 4),
- "quote_value": round(quote_value, 4),
- "total_value": round(total_value, 4),
- "base_ratio": round(base_ratio, 4),
- "quote_ratio": round(quote_ratio, 4),
- "imbalance_score": round(imbalance, 4),
- "inventory_state": inventory_state,
- "grid_ready": grid_ready,
- "rebalance_needed": rebalance_needed,
- }
- def normalize_strategy_snapshot(strategy: dict[str, Any]) -> dict[str, Any]:
- strategy_type = str(strategy.get("strategy_type") or "unknown")
- mode = str(strategy.get("mode") or "off")
- state = strategy.get("state") if isinstance(strategy.get("state"), dict) else {}
- config = strategy.get("config") if isinstance(strategy.get("config"), dict) else {}
- report = strategy.get("report") if isinstance(strategy.get("report"), dict) else {}
- report_fit = report.get("fit") if isinstance(report.get("fit"), dict) else {}
- report_supervision = report.get("supervision") if isinstance(report.get("supervision"), dict) else {}
- report_state = report.get("state") if isinstance(report.get("state"), dict) else {}
- # Stable minimum contract used by Hermes while the trader-side strategy
- # metadata evolves. These values can later be sourced directly from richer
- # reports, but the decision layer keeps a normalized shape from day one.
- defaults = {
- "grid_trader": {
- "role": "primary",
- "inventory_behavior": "balanced",
- "requires_rebalance_before_start": False,
- "requires_rebalance_before_stop": False,
- "safe_when_unbalanced": False,
- "can_run_with": ["exposure_protector"],
- },
- "trend_follower": {
- "role": "primary",
- "inventory_behavior": "accumulative_long",
- "requires_rebalance_before_start": False,
- "requires_rebalance_before_stop": False,
- "safe_when_unbalanced": True,
- "can_run_with": [],
- "trade_side": "both",
- },
- "exposure_protector": {
- "role": "rebalancing",
- "inventory_behavior": "rebalancing",
- "requires_rebalance_before_start": False,
- "requires_rebalance_before_stop": False,
- "safe_when_unbalanced": True,
- "can_run_with": [],
- },
- }
- contract = defaults.get(strategy_type, {
- "role": "primary",
- "inventory_behavior": "unknown",
- "requires_rebalance_before_start": False,
- "requires_rebalance_before_stop": False,
- "safe_when_unbalanced": True,
- "can_run_with": [],
- })
- contract = {**contract, **report_fit}
- return {
- "id": strategy.get("id"),
- "strategy_type": strategy_type,
- "mode": mode,
- "enabled": mode != "off",
- "status": strategy.get("status") or ("running" if mode != "off" else "stopped"),
- "market_symbol": strategy.get("market_symbol"),
- "account_id": strategy.get("account_id"),
- "open_order_count": int(state.get("open_order_count") or report_state.get("open_order_count") or strategy.get("open_order_count") or 0),
- "last_action": state.get("last_action") or report_state.get("last_action") or strategy.get("last_side"),
- "last_error": state.get("last_error") or report_state.get("last_error") or "",
- "contract": contract,
- "trade_side": str(config.get("trade_side") or contract.get("trade_side") or "both"),
- "supervision": report_supervision,
- "config": config,
- "state": {**report_state, **state},
- }
- def _argus_decision_context(narrative_payload: dict[str, Any]) -> dict[str, Any]:
- argus = narrative_payload.get("argus_context") if isinstance(narrative_payload.get("argus_context"), dict) else {}
- regime = str(argus.get("regime") or argus.get("snapshot_regime") or "").strip()
- confidence_raw = argus.get("regime_confidence") if argus.get("regime_confidence") is not None else argus.get("snapshot_confidence")
- confidence = float(confidence_raw) if isinstance(confidence_raw, (int, float)) else 0.0
- components = argus.get("regime_components") if isinstance(argus.get("regime_components"), dict) else {}
- if not components:
- components = argus.get("snapshot_components") if isinstance(argus.get("snapshot_components"), dict) else {}
- compression = float(components.get("compression") or 0.0)
- compression_active = regime == "compression" and confidence >= 0.55 and compression >= 0.65
- return {
- "regime": regime,
- "confidence": round(confidence, 4),
- "components": components,
- "compression": round(compression, 4),
- "compression_active": compression_active,
- }
- def _parse_timestamp(value: Any) -> datetime | None:
- text = str(value or "").strip()
- if not text:
- return None
- if text.endswith("Z"):
- text = text[:-1] + "+00:00"
- try:
- parsed = datetime.fromisoformat(text)
- except Exception:
- return None
- if parsed.tzinfo is None:
- return parsed.replace(tzinfo=timezone.utc)
- return parsed.astimezone(timezone.utc)
- def score_strategy_fit(*, strategy: dict[str, Any], narrative: dict[str, Any], wallet_state: dict[str, Any]) -> dict[str, Any]:
- stance = str(narrative.get("stance") or "neutral_rotational")
- opportunity_map = narrative.get("opportunity_map") if isinstance(narrative.get("opportunity_map"), dict) else {}
- breakout_pressure = narrative.get("grid_breakout_pressure") if isinstance(narrative.get("grid_breakout_pressure"), dict) else {}
- breakout_phase = str(breakout_pressure.get("phase") or "none")
- continuation = float(opportunity_map.get("continuation") or 0.0)
- mean_reversion = float(opportunity_map.get("mean_reversion") or 0.0)
- reversal = float(opportunity_map.get("reversal") or 0.0)
- wait = float(opportunity_map.get("wait") or 0.0)
- inventory_state = _inventory_state_label(wallet_state.get("inventory_state"))
- argus_context = _argus_decision_context(narrative)
- strategy_type = strategy["strategy_type"]
- supervision = strategy.get("supervision") if isinstance(strategy.get("supervision"), dict) else {}
- inventory_pressure = str(supervision.get("inventory_pressure") or "")
- capacity_available = bool(supervision.get("capacity_available"))
- side_capacity = supervision.get("side_capacity") if isinstance(supervision.get("side_capacity"), dict) else {}
- score = 0.0
- reasons: list[str] = []
- blocks: list[str] = []
- if strategy_type == "grid_trader":
- score += mean_reversion * 1.8
- if stance in {"neutral_rotational", "breakout_watch"}:
- score += 0.45
- reasons.append("narrative still supports rotational structure")
- if continuation >= 0.45:
- score -= 0.8
- blocks.append("continuation pressure is too strong for safe grid harvesting")
- if inventory_state != "balanced":
- score -= 1.0
- blocks.append(f"wallet is not grid-ready: {inventory_state}")
- else:
- reasons.append("wallet is balanced enough for two-sided harvesting")
- if not capacity_available:
- score -= 0.25
- blocks.append("grid report shows one-sided capacity")
- if side_capacity and not (bool(side_capacity.get("buy", True)) and bool(side_capacity.get("sell", True))):
- score -= 0.25
- blocks.append("grid side capacity is asymmetric")
- if argus_context["compression_active"]:
- score += 0.2
- reasons.append("Argus compression supports staying selective with grid")
- elif strategy_type == "trend_follower":
- score += continuation * 1.9
- trade_side = _strategy_trade_side(strategy)
- narrative_direction = _narrative_direction(narrative)
- if stance in {"constructive_bullish", "cautious_bullish", "constructive_bearish", "cautious_bearish"}:
- score += 0.5
- reasons.append("narrative supports directional continuation")
- if trade_side == "buy":
- if narrative_direction == "bullish":
- score += 0.6
- reasons.append("buy-side trend instance matches bullish direction")
- elif narrative_direction == "bearish":
- score -= 0.9
- blocks.append("buy-side trend instance conflicts with bearish direction")
- elif trade_side == "sell":
- if narrative_direction == "bearish":
- score += 0.6
- reasons.append("sell-side trend instance matches bearish direction")
- elif narrative_direction == "bullish":
- score -= 0.9
- blocks.append("sell-side trend instance conflicts with bullish direction")
- if breakout_phase == "confirmed":
- score += 0.45
- reasons.append("confirmed breakout pressure supports directional continuation")
- elif breakout_phase == "developing":
- score += 0.2
- reasons.append("breakout pressure is developing in trend's favor")
- if wait >= 0.45 and breakout_phase != "confirmed":
- score -= 0.35
- blocks.append("market still has too much wait/uncertainty for trend commitment")
- if inventory_state in SEVERE_INVENTORY_STATES:
- score -= 0.25
- blocks.append("wallet may be too skewed for clean directional scaling")
- if inventory_pressure in {"base_heavy", "quote_heavy"}:
- score -= 0.1
- blocks.append("trend report shows rising inventory pressure")
- if not capacity_available:
- score -= 0.1
- blocks.append("trend strength is below its own capacity threshold")
- if trade_side == "both" and narrative_direction in {"bullish", "bearish"}:
- score += 0.15
- reasons.append("generic trend instance can follow either side")
- if argus_context["compression_active"] and breakout_phase != "confirmed":
- score -= 0.15
- blocks.append("Argus compression says the broader tape is still range-like")
- elif strategy_type == "exposure_protector":
- score += reversal * 0.4 + wait * 0.5
- if wallet_state.get("rebalance_needed"):
- score += 1.1
- reasons.append("wallet imbalance calls for rebalancing protection")
- if inventory_state in SEVERE_INVENTORY_STATES:
- score += 0.45
- reasons.append("inventory drift is high enough to justify defensive action")
- if stance in {"constructive_bullish", "constructive_bearish"} and continuation > 0.65:
- score -= 0.2
- if inventory_pressure in {"critical", "elevated"}:
- score += 0.25
- reasons.append("protector reports active inventory pressure")
- if strategy.get("last_error"):
- score -= 0.25
- blocks.append("strategy recently reported an error")
- if bool(supervision.get("degraded")):
- score -= 0.15
- blocks.append("strategy self-reports degraded supervision state")
- return {
- "strategy_id": strategy.get("id"),
- "strategy_type": strategy_type,
- "score": round(score, 4),
- "reasons": reasons,
- "blocks": blocks,
- "enabled": strategy.get("enabled", False),
- }
- def _breakout_phase_from_score(score: float) -> str:
- if score >= 3.45:
- return "confirmed"
- if score >= 2.45:
- return "developing"
- if score >= 1.4:
- return "probing"
- return "none"
- def _local_breakout_snapshot(narrative_payload: dict[str, Any]) -> dict[str, Any]:
- scoped = narrative_payload.get("scoped_state") if isinstance(narrative_payload.get("scoped_state"), dict) else {}
- cross = narrative_payload.get("cross_scope_summary") if isinstance(narrative_payload.get("cross_scope_summary"), dict) else {}
- micro = scoped.get("micro") if isinstance(scoped.get("micro"), dict) else {}
- meso = scoped.get("meso") if isinstance(scoped.get("meso"), dict) else {}
- macro = scoped.get("macro") if isinstance(scoped.get("macro"), dict) else {}
- micro_impulse = str(micro.get("impulse") or "mixed")
- micro_bias = str(micro.get("trend_bias") or "mixed")
- meso_structure = str(meso.get("structure") or "rotation")
- meso_bias = str(meso.get("momentum_bias") or "neutral")
- macro_bias = str(macro.get("bias") or "mixed")
- alignment = str(cross.get("alignment") or "partial_alignment")
- friction = str(cross.get("friction") or "medium")
- micro_directional = micro_impulse in {"up", "down"} and micro_bias in {"bullish", "bearish"}
- meso_directional = meso_structure == "trend_continuation" and meso_bias in {"bullish", "bearish"}
- macro_supportive = macro_bias in {"bullish", "bearish"}
- score = 0.0
- if micro_directional:
- score += 1.0
- if meso_directional:
- score += 1.1
- if macro_supportive:
- score += 0.55
- if alignment == "micro_meso_macro_aligned":
- score += 0.8
- elif alignment == "partial_alignment":
- score += 0.35
- if friction == "low":
- score += 0.45
- elif friction == "medium":
- score += 0.15
- return {
- "score": round(score, 4),
- "phase": _breakout_phase_from_score(score),
- "micro_impulse": micro_impulse,
- "micro_bias": micro_bias,
- "meso_structure": meso_structure,
- "meso_bias": meso_bias,
- "macro_bias": macro_bias,
- "alignment": alignment,
- "friction": friction,
- }
- def _breakout_memory(narrative_payload: dict[str, Any], history_window: dict[str, Any] | None, current_breakout: dict[str, Any]) -> dict[str, Any]:
- recent_states = history_window.get("recent_states") if isinstance(history_window, dict) and isinstance(history_window.get("recent_states"), list) else []
- window_seconds = int(history_window.get("window_seconds") or 0) if isinstance(history_window, dict) else 0
- current_ts = _parse_timestamp(narrative_payload.get("generated_at")) or datetime.now(timezone.utc)
- current_direction = str(current_breakout.get("meso_bias") or "neutral")
- directional = current_direction in {"bullish", "bearish"} and current_breakout.get("meso_structure") == "trend_continuation"
- if not directional:
- return {"window_seconds": window_seconds, "samples_considered": 0, "qualifying_samples": 0, "same_direction_seconds": 0, "promoted_to_confirmed": False}
- qualifying_samples = 0
- oldest_match: datetime | None = None
- for row in recent_states:
- if not isinstance(row, dict):
- continue
- try:
- payload = json.loads(row.get("payload_json") or "{}")
- except Exception:
- continue
- snapshot = _local_breakout_snapshot(payload)
- sample_ts = _parse_timestamp(row.get("created_at") or payload.get("generated_at"))
- if sample_ts is None:
- continue
- if snapshot.get("phase") not in {"developing", "confirmed"}:
- continue
- if str(snapshot.get("meso_bias") or "neutral") != current_direction:
- continue
- if str(snapshot.get("macro_bias") or "mixed") != str(current_breakout.get("macro_bias") or "mixed"):
- continue
- qualifying_samples += 1
- if oldest_match is None:
- oldest_match = sample_ts
- same_direction_seconds = int((current_ts - oldest_match).total_seconds()) if oldest_match else 0
- promoted = current_breakout.get("phase") == "developing" and qualifying_samples >= 2 and same_direction_seconds >= min(window_seconds, 8 * 60)
- return {
- "window_seconds": window_seconds,
- "samples_considered": len(recent_states),
- "qualifying_samples": qualifying_samples,
- "same_direction_seconds": max(0, same_direction_seconds),
- "promoted_to_confirmed": promoted,
- }
- def _grid_breakout_pressure(narrative_payload: dict[str, Any], history_window: dict[str, Any] | None = None) -> dict[str, Any]:
- argus_context = _argus_decision_context(narrative_payload)
- breakout = _local_breakout_snapshot(narrative_payload)
- memory = _breakout_memory(narrative_payload, history_window, breakout)
- phase = str(breakout.get("phase") or "none")
- if memory["promoted_to_confirmed"]:
- phase = "confirmed"
- persistent = phase == "confirmed"
- return {
- "persistent": persistent,
- "phase": phase,
- "score": breakout["score"],
- "micro_impulse": breakout["micro_impulse"],
- "micro_bias": breakout["micro_bias"],
- "meso_structure": breakout["meso_structure"],
- "meso_bias": breakout["meso_bias"],
- "macro_bias": breakout["macro_bias"],
- "alignment": breakout["alignment"],
- "friction": breakout["friction"],
- "time_window_memory": memory,
- "argus_regime": argus_context["regime"],
- "argus_confidence": argus_context["confidence"],
- "argus_compression_active": argus_context["compression_active"],
- }
- def _select_current_primary(strategies: list[dict[str, Any]]) -> dict[str, Any] | None:
- primaries = [s for s in strategies if s["strategy_type"] in {"grid_trader", "trend_follower", "exposure_protector"} and s.get("mode") != "off"]
- if not primaries:
- return None
- active = next((s for s in primaries if s.get("mode") == "active"), None)
- if active:
- return active
- return primaries[0]
- def _inventory_breakout_is_directionally_compatible(inventory_state: str, breakout: dict[str, Any]) -> bool:
- inventory_state = _inventory_state_label(inventory_state)
- macro_bias = str(breakout.get("macro_bias") or "mixed")
- meso_bias = str(breakout.get("meso_bias") or "neutral")
- bullish = macro_bias == "bullish" and meso_bias == "bullish"
- bearish = macro_bias == "bearish" and meso_bias == "bearish"
- if bullish and inventory_state in {"depleted_base_side", "quote_heavy"}:
- return True
- if bearish and inventory_state in {"depleted_quote_side", "base_heavy"}:
- return True
- return False
- def _trend_cooling_edge(narrative_payload: dict[str, Any], wallet_state: dict[str, Any]) -> bool:
- if not wallet_state.get("rebalance_needed"):
- return False
- scoped = narrative_payload.get("scoped_state") if isinstance(narrative_payload.get("scoped_state"), dict) else {}
- micro = scoped.get("micro") if isinstance(scoped.get("micro"), dict) else {}
- meso = scoped.get("meso") if isinstance(scoped.get("meso"), dict) else {}
- micro_impulse = str(micro.get("impulse") or "mixed")
- micro_bias = str(micro.get("trend_bias") or "mixed")
- micro_location = str(micro.get("location") or "unknown")
- micro_reversal_risk = str(micro.get("reversal_risk") or "low")
- meso_bias = str(meso.get("momentum_bias") or "neutral")
- meso_structure = str(meso.get("structure") or "rotation")
- inventory_state = _inventory_state_label(wallet_state.get("inventory_state"))
- early_reversal_warning = micro_reversal_risk in {"medium", "high"}
- bullish_cooling = (
- inventory_state in {"base_heavy", "critically_unbalanced"}
- and meso_structure == "trend_continuation"
- and meso_bias == "bullish"
- and (micro_impulse == "mixed" or early_reversal_warning)
- and micro_bias in {"mixed", "bearish", "bullish"}
- and micro_location in {"near_upper_band", "upper_half", "centered"}
- )
- bearish_cooling = (
- inventory_state in {"quote_heavy", "critically_unbalanced"}
- and meso_structure == "trend_continuation"
- and meso_bias == "bearish"
- and (micro_impulse == "mixed" or early_reversal_warning)
- and micro_bias in {"mixed", "bullish", "bearish"}
- and micro_location in {"near_lower_band", "lower_half", "centered"}
- )
- return bullish_cooling or bearish_cooling
- def _grid_fill_proximity(strategy: dict[str, Any], narrative_payload: dict[str, Any]) -> dict[str, Any]:
- state = strategy.get("state") if isinstance(strategy.get("state"), dict) else {}
- orders = state.get("orders") if isinstance(state.get("orders"), list) else []
- features = narrative_payload.get("features_by_timeframe") if isinstance(narrative_payload.get("features_by_timeframe"), dict) else {}
- micro_raw = features.get("1m", {}).get("raw", {}) if isinstance(features.get("1m"), dict) else {}
- current_price = _safe_float(micro_raw.get("price") or state.get("last_price") or state.get("center_price"))
- atr_percent = _safe_float(micro_raw.get("atr_percent")) or 0.0
- if not current_price or current_price <= 0:
- return {"near_fill": False}
- sell_prices: list[float] = []
- buy_prices: list[float] = []
- for order in orders:
- if not isinstance(order, dict):
- continue
- if str(order.get("status") or "open").lower() not in {"open", "live", "active"}:
- continue
- price = _safe_float(order.get("price"))
- if price is None or price <= 0:
- continue
- side = str(order.get("side") or "").lower()
- if side == "sell" and price >= current_price:
- sell_prices.append(price)
- elif side == "buy" and price <= current_price:
- buy_prices.append(price)
- next_sell = min(sell_prices) if sell_prices else None
- next_buy = max(buy_prices) if buy_prices else None
- next_sell_distance_pct = (((next_sell - current_price) / current_price) * 100.0) if next_sell else None
- next_buy_distance_pct = (((current_price - next_buy) / current_price) * 100.0) if next_buy else None
- threshold_pct = max(0.25, atr_percent * 1.5)
- near_sell_fill = bool(
- next_sell_distance_pct is not None
- and next_sell_distance_pct >= 0
- and next_sell_distance_pct <= threshold_pct
- and next_buy is not None
- )
- near_buy_fill = bool(
- next_buy_distance_pct is not None
- and next_buy_distance_pct >= 0
- and next_buy_distance_pct <= threshold_pct
- and next_sell is not None
- )
- near_fill_side: str | None = None
- if near_sell_fill and near_buy_fill:
- near_fill_side = "sell" if (next_sell_distance_pct or 0.0) <= (next_buy_distance_pct or 0.0) else "buy"
- elif near_sell_fill:
- near_fill_side = "sell"
- elif near_buy_fill:
- near_fill_side = "buy"
- return {
- "near_fill": bool(near_sell_fill or near_buy_fill),
- "near_fill_side": near_fill_side,
- "near_sell_fill": near_sell_fill,
- "near_buy_fill": near_buy_fill,
- "current_price": current_price,
- "next_sell": next_sell,
- "next_buy": next_buy,
- "next_sell_distance_pct": round(next_sell_distance_pct, 4) if next_sell_distance_pct is not None else None,
- "next_buy_distance_pct": round(next_buy_distance_pct, 4) if next_buy_distance_pct is not None else None,
- "threshold_pct": round(threshold_pct, 4),
- }
- def _grid_fill_fights_breakout(grid_fill: dict[str, Any], breakout: dict[str, Any]) -> bool:
- side = str(grid_fill.get("near_fill_side") or "")
- bias = str(breakout.get("meso_bias") or breakout.get("micro_bias") or "")
- if bias == "bullish":
- return side == "sell"
- if bias == "bearish":
- return side == "buy"
- return False
- def _breakout_direction(breakout: dict[str, Any], stance: str | None = None) -> str | None:
- meso_bias = str(breakout.get("meso_bias") or "")
- micro_bias = str(breakout.get("micro_bias") or "")
- if meso_bias in {"bullish", "bearish"}:
- return meso_bias
- if micro_bias in {"bullish", "bearish"}:
- return micro_bias
- stance_text = str(stance or "")
- if "bullish" in stance_text:
- return "bullish"
- if "bearish" in stance_text:
- return "bearish"
- return None
- def _narrative_direction(narrative: dict[str, Any]) -> str | None:
- stance = str(narrative.get("stance") or "")
- breakout = narrative.get("grid_breakout_pressure") if isinstance(narrative.get("grid_breakout_pressure"), dict) else {}
- direction = _breakout_direction(breakout, stance)
- if direction:
- return direction
- if stance in {"constructive_bullish", "cautious_bullish", "fragile_bullish"}:
- return "bullish"
- if stance in {"constructive_bearish", "cautious_bearish", "fragile_bearish"}:
- return "bearish"
- return None
- def _strategy_trade_side(strategy: dict[str, Any]) -> str:
- config = strategy.get("config") if isinstance(strategy.get("config"), dict) else {}
- state = strategy.get("state") if isinstance(strategy.get("state"), dict) else {}
- side = str(config.get("trade_side") or state.get("trade_side") or strategy.get("trade_side") or "both").strip().lower()
- return side if side in {"buy", "sell", "both"} else "both"
- def _trend_handoff_level_threshold(breakout: dict[str, Any]) -> float:
- memory = breakout.get("time_window_memory") if isinstance(breakout.get("time_window_memory"), dict) else {}
- if bool(memory.get("promoted_to_confirmed")):
- return 2.0
- return 2.75
- def _grid_switch_tradeoff(*,
- current_primary: dict[str, Any],
- wallet_state: dict[str, Any],
- breakout: dict[str, Any],
- grid_fill: dict[str, Any],
- grid_pressure: dict[str, Any],
- directional_micro_clear: bool,
- trend: dict[str, Any] | None,
- ) -> dict[str, Any]:
- inventory_state = _inventory_state_label(wallet_state.get("inventory_state"))
- open_order_count = int(current_primary.get("open_order_count") or 0)
- if not open_order_count:
- state = current_primary.get("state") if isinstance(current_primary.get("state"), dict) else {}
- open_order_count = int(state.get("open_order_count") or len(state.get("orders") or []) or 0)
- trend_score = float(trend.get("score") or 0.0) if trend else 0.0
- breakout_score = float(breakout.get("score") or 0.0)
- levels = float(grid_pressure.get("levels") or 0.0)
- near_fill = bool(grid_fill.get("near_fill"))
- fill_fights = _grid_fill_fights_breakout(grid_fill, breakout)
- persistent = bool(breakout.get("persistent"))
- trend_ready = trend_score > 0.45 and directional_micro_clear
- switch_benefit = 0.0
- if persistent:
- switch_benefit += 0.28
- if trend_ready:
- switch_benefit += 0.34
- if fill_fights:
- switch_benefit += 0.12
- if levels >= _trend_handoff_level_threshold(breakout):
- switch_benefit += 0.18
- switch_benefit += min(trend_score, 2.5) * 0.18
- switch_benefit += min(breakout_score, 5.0) * 0.04
- stay_cost = 0.0
- if inventory_state == "balanced":
- stay_cost += 0.06
- elif inventory_state in {"base_heavy", "quote_heavy"}:
- stay_cost += 0.16
- elif inventory_state in SEVERE_INVENTORY_STATES:
- stay_cost += 0.28
- else:
- stay_cost += 0.1
- stay_cost += min(levels, 6.0) * 0.06
- stay_cost += min(open_order_count, 8) * 0.025
- if near_fill:
- stay_cost += 0.06
- if fill_fights:
- stay_cost += 0.18
- if not persistent:
- stay_cost += 0.12
- margin = round(switch_benefit - stay_cost, 4)
- should_switch = persistent and trend_ready and margin > 0.0
- return {
- "trend_score": round(trend_score, 4),
- "breakout_score": round(breakout_score, 4),
- "switch_benefit": round(switch_benefit, 4),
- "stay_cost": round(stay_cost, 4),
- "margin": margin,
- "should_switch": should_switch,
- "trend_ready": trend_ready,
- "persistent": persistent,
- "levels": round(levels, 4),
- "open_order_count": open_order_count,
- "near_fill": near_fill,
- "fill_fights": fill_fights,
- "inventory_state": inventory_state,
- }
- def _grid_trend_pressure(strategy: dict[str, Any], narrative_payload: dict[str, Any]) -> dict[str, Any]:
- state = strategy.get("state") if isinstance(strategy.get("state"), dict) else {}
- config = strategy.get("config") if isinstance(strategy.get("config"), dict) else {}
- features = narrative_payload.get("features_by_timeframe") if isinstance(narrative_payload.get("features_by_timeframe"), dict) else {}
- micro_raw = features.get("1m", {}).get("raw", {}) if isinstance(features.get("1m"), dict) else {}
- current_price = _safe_float(micro_raw.get("price") or state.get("last_price") or state.get("center_price"))
- center_price = _safe_float(state.get("center_price") or state.get("last_price"))
- step_pct = _safe_float(config.get("grid_step_pct") or state.get("grid_step_pct") or state.get("recenter_pct_live")) or 0.0
- if not current_price or not center_price or current_price <= 0 or center_price <= 0 or step_pct <= 0:
- return {"levels": 0.0, "rounded_levels": 0, "direction": "unknown", "current_price": current_price, "center_price": center_price, "step_pct": step_pct}
- distance_pct = abs(current_price - center_price) / center_price
- levels = distance_pct / step_pct
- direction = "bullish" if current_price > center_price else "bearish" if current_price < center_price else "flat"
- return {
- "levels": round(levels, 4),
- "rounded_levels": int(levels),
- "direction": direction,
- "current_price": current_price,
- "center_price": center_price,
- "step_pct": step_pct,
- "distance_pct": round(distance_pct, 4),
- }
- def _grid_can_still_work(strategy: dict[str, Any], wallet_state: dict[str, Any], grid_fill: dict[str, Any]) -> bool:
- supervision = strategy.get("supervision") if isinstance(strategy.get("supervision"), dict) else {}
- side_capacity = supervision.get("side_capacity") if isinstance(supervision.get("side_capacity"), dict) else {}
- buy_capacity = bool(side_capacity.get("buy", False))
- sell_capacity = bool(side_capacity.get("sell", False))
- open_order_count = int(strategy.get("open_order_count") or 0)
- degraded = bool(supervision.get("degraded"))
- inventory_state = _inventory_state_label(wallet_state.get("inventory_state"))
- if degraded:
- return False
- if buy_capacity or sell_capacity:
- return True
- if open_order_count > 0:
- return True
- if grid_fill.get("near_fill"):
- return True
- return inventory_state not in SEVERE_INVENTORY_STATES
- def _grid_is_truly_stuck_for_recovery(strategy: dict[str, Any], wallet_state: dict[str, Any], grid_fill: dict[str, Any]) -> bool:
- if _grid_can_still_work(strategy, wallet_state, grid_fill):
- return False
- inventory_state = _inventory_state_label(wallet_state.get("inventory_state"))
- return wallet_state.get("rebalance_needed") and inventory_state in SEVERE_INVENTORY_STATES
- def _wallet_within_rebalance_tolerance(wallet_state: dict[str, Any], tolerance: float = 0.3) -> bool:
- imbalance = _safe_float(wallet_state.get("imbalance_score"))
- if imbalance is None:
- base_ratio = _safe_float(wallet_state.get("base_ratio"))
- if base_ratio is not None:
- imbalance = abs(base_ratio - 0.5)
- if imbalance is None:
- return str(wallet_state.get("inventory_state") or "").lower() == "balanced"
- return imbalance <= tolerance
- def _decide_for_grid(*,
- current_primary: dict[str, Any],
- stance: str,
- inventory_state: str,
- wallet_state: dict[str, Any],
- breakout: dict[str, Any],
- grid_fill: dict[str, Any],
- grid_pressure: dict[str, Any],
- directional_micro_clear: bool,
- severe_imbalance: bool,
- trend: dict[str, Any] | None,
- rebalance: dict[str, Any] | None,
- ) -> tuple[str, str, str | None, list[str], list[str]]:
- action = "keep_grid"
- mode = "observe"
- target_strategy = current_primary["id"]
- reasons: list[str] = []
- blocks: list[str] = []
- inventory_state = _inventory_state_label(inventory_state)
- # Grid is the base mode. Leave it only for a persistent breakout or when
- # the grid has genuinely lost its ability to recover on its own.
- grid_friendly_stance = stance in {"neutral_rotational", "breakout_watch", "cautious_bullish", "cautious_bearish", "fragile_bullish", "fragile_bearish"}
- grid_can_work = _grid_can_still_work(current_primary, wallet_state, grid_fill)
- grid_stuck_for_recovery = _grid_is_truly_stuck_for_recovery(current_primary, wallet_state, grid_fill)
- persistent_breakout = bool(breakout["persistent"])
- breakout_phase = str(breakout.get("phase") or "none")
- trend_handoff_ready = bool(
- trend
- and trend["score"] > 0.45
- and directional_micro_clear
- and grid_pressure.get("levels", 0.0) >= _trend_handoff_level_threshold(breakout)
- )
- fill_fights_breakout = _grid_fill_fights_breakout(grid_fill, breakout)
- switch_tradeoff = _grid_switch_tradeoff(
- current_primary=current_primary,
- wallet_state=wallet_state,
- breakout=breakout,
- grid_fill=grid_fill,
- grid_pressure=grid_pressure,
- directional_micro_clear=directional_micro_clear,
- trend=trend,
- )
- if severe_imbalance and persistent_breakout:
- reasons.append("grid imbalance now coincides with persistent breakout pressure")
- directional_inventory = _inventory_breakout_is_directionally_compatible(inventory_state, breakout)
- if switch_tradeoff["should_switch"] and trend_handoff_ready and (
- not wallet_state.get("rebalance_needed")
- or directional_inventory
- or not rebalance
- or trend["score"] >= rebalance["score"]
- ):
- action = "replace_with_trend_follower"
- target_strategy = trend["strategy_id"]
- mode = "act"
- if directional_inventory:
- reasons.append("inventory posture can be absorbed by the directional handoff")
- reasons.append(
- f"switch benefit ({switch_tradeoff['switch_benefit']:.2f}) exceeds stay cost ({switch_tradeoff['stay_cost']:.2f})"
- )
- elif wallet_state.get("rebalance_needed") and rebalance and rebalance["score"] > 0.35:
- action = "replace_with_exposure_protector"
- target_strategy = rebalance["strategy_id"]
- mode = "act"
- else:
- action = "suspend_grid"
- mode = "warn"
- elif severe_imbalance and grid_stuck_for_recovery and not persistent_breakout and rebalance and rebalance["score"] > 0.6:
- action = "replace_with_exposure_protector"
- target_strategy = rebalance["strategy_id"]
- mode = "act"
- reasons.append("grid has lost practical recovery capacity, so inventory repair should take over")
- elif persistent_breakout and trend_handoff_ready and inventory_state in {"balanced", "base_heavy", "quote_heavy"}:
- if not switch_tradeoff["should_switch"]:
- reasons.append(
- f"breakout is persistent, but staying in grid still looks cheaper than switching (benefit {switch_tradeoff['switch_benefit']:.2f} vs cost {switch_tradeoff['stay_cost']:.2f})"
- )
- if grid_fill.get("near_fill") and fill_fights_breakout:
- reasons.append("nearby opposing fill is only a warning here, not enough on its own to justify the handoff")
- else:
- action = "replace_with_trend_follower"
- target_strategy = trend["strategy_id"] if trend else target_strategy
- mode = "act"
- if grid_fill.get("near_fill") and fill_fights_breakout:
- reasons.append("confirmed trend should not be delayed by a nearby grid fill that trades against the move")
- elif grid_fill.get("near_fill"):
- reasons.append("confirmed directional pressure is strong enough that nearby grid fills should not delay the trend handoff")
- else:
- reasons.append("grid should yield because directional pressure is confirmed and the trend handoff is ready")
- elif not persistent_breakout and grid_can_work:
- if breakout_phase == "developing":
- reasons.append("breakout pressure is developing, but grid can still work and should not be abandoned yet")
- else:
- reasons.append("grid can still operate and self-heal, so inventory skew alone should not force a rebalance handoff")
- elif persistent_breakout and grid_fill.get("near_fill") and inventory_state in {"balanced", "base_heavy", "quote_heavy"}:
- reasons.append("grid is still close to a working fill, but trend handoff is not ready enough yet")
- elif not grid_friendly_stance and persistent_breakout:
- reasons.append("grid should yield because directional pressure is persistent across scopes")
- if trend_handoff_ready:
- action = "replace_with_trend_follower"
- target_strategy = trend["strategy_id"]
- mode = "act"
- else:
- mode = "warn"
- if grid_pressure.get("levels", 0.0) < _trend_handoff_level_threshold(breakout):
- blocks.append("grid has not yet been eaten by enough levels to justify leaving it")
- else:
- blocks.append("directional pressure is rising but the micro layer is not clear enough for a trend handoff")
- else:
- reasons.append("grid can likely self-heal because breakout pressure is not yet persistent")
- return action, mode, target_strategy, reasons, blocks
- def _decide_for_trend(*,
- current_primary: dict[str, Any],
- stance: str,
- narrative_payload: dict[str, Any],
- wallet_state: dict[str, Any],
- grid: dict[str, Any] | None,
- rebalance: dict[str, Any] | None = None,
- ) -> tuple[str, str, str | None, list[str], list[str]]:
- action = "keep_trend"
- mode = "observe"
- target_strategy = current_primary["id"]
- reasons: list[str] = []
- blocks: list[str] = []
- # Trend should cool into rebalancing first when the wallet is skewed, then
- # let rebalancer hand back to grid once the inventory is healthy again.
- cooling = _trend_cooling_edge(narrative_payload, wallet_state)
- if cooling:
- if wallet_state.get("rebalance_needed") and rebalance:
- action = "replace_with_exposure_protector"
- target_strategy = rebalance["strategy_id"]
- mode = "act"
- reasons.append("trend has cooled and rebalancing should repair the wallet before grid resumes")
- elif grid and wallet_state.get("grid_ready"):
- action = "replace_with_grid"
- target_strategy = grid["strategy_id"]
- mode = "act"
- reasons.append("trend has cooled and grid can resume because no rebalancer is available")
- else:
- mode = "warn"
- blocks.append("edge cooling is visible but the wallet is not yet ready for grid")
- elif stance == "neutral_rotational":
- if wallet_state.get("rebalance_needed") and rebalance:
- action = "replace_with_exposure_protector"
- target_strategy = rebalance["strategy_id"]
- mode = "act"
- reasons.append("trend conditions have cooled and rebalancing should repair the wallet before grid resumes")
- elif grid and wallet_state.get("grid_ready"):
- action = "replace_with_grid"
- target_strategy = grid["strategy_id"]
- mode = "act"
- reasons.append("trend conditions have cooled and wallet is grid-ready again")
- elif wallet_state.get("rebalance_needed"):
- mode = "warn"
- blocks.append("trend has cooled but rebalancing should be the next hop")
- else:
- action = "hold_trend"
- blocks.append("grid candidate not strong enough yet")
- else:
- reasons.append("trend strategy still fits the directional narrative")
- return action, mode, target_strategy, reasons, blocks
- def _decide_for_rebalancer(*,
- current_primary: dict[str, Any],
- stance: str,
- wallet_state: dict[str, Any],
- grid: dict[str, Any] | None,
- trend: dict[str, Any] | None = None,
- ) -> tuple[str, str, str | None, list[str], list[str]]:
- action = "keep_rebalancer"
- mode = "observe"
- target_strategy = current_primary["id"]
- reasons: list[str] = []
- blocks: list[str] = []
- # Rebalancing is a repair phase. Once the wallet is usable again, Hermes
- # should prefer handing back to grid, not directly to trend.
- trend_strength = float(trend["score"]) if trend and isinstance(trend.get("score"), (int, float)) else 0.0
- if trend and trend_strength >= 1.5:
- blocks.append("trend is still strong enough that rebalancer should keep repairing instead of resetting to grid")
- elif _wallet_within_rebalance_tolerance(wallet_state, 0.3):
- if grid:
- action = "replace_with_grid"
- target_strategy = grid["strategy_id"]
- mode = "act"
- reasons.append("wallet is within the 0.3 rebalance tolerance, so grid can resume before perfect balance")
- else:
- blocks.append("wallet is within the rebalance tolerance but no grid candidate is available")
- elif wallet_state.get("grid_ready") and stance == "neutral_rotational":
- if grid and grid["score"] >= 0.5:
- action = "replace_with_grid"
- target_strategy = grid["strategy_id"]
- mode = "act"
- reasons.append("rebalance is complete and rotational conditions support grid again")
- else:
- blocks.append("wallet is ready but grid fit is still too weak")
- elif grid and grid["score"] >= 0.5:
- action = "replace_with_grid"
- target_strategy = grid["strategy_id"]
- mode = "act"
- reasons.append("trend is directional but not yet sustained, so grid can resume first")
- else:
- blocks.append("trend candidate is not strong enough yet and grid fit is not ready, so rebalancer should not hand directly back to trend")
- return action, mode, target_strategy, reasons, blocks
- def make_decision(*, concern: dict[str, Any], narrative_payload: dict[str, Any], wallet_state: dict[str, Any], strategies: list[dict[str, Any]], history_window: dict[str, Any] | None = None) -> DecisionSnapshot:
- normalized = [normalize_strategy_snapshot(s) for s in strategies if str(s.get("account_id") or "") == str(concern.get("account_id") or "")]
- breakout = _grid_breakout_pressure(narrative_payload, history_window=history_window)
- narrative_for_scoring = {**narrative_payload, "grid_breakout_pressure": breakout}
- fit_reports = [score_strategy_fit(strategy=s, narrative=narrative_for_scoring, wallet_state=wallet_state) for s in normalized]
- ranked = sorted(fit_reports, key=lambda item: item["score"], reverse=True)
- current_primary = _select_current_primary(normalized)
- best = ranked[0] if ranked else None
- stance = str(narrative_payload.get("stance") or "neutral_rotational")
- inventory_state = _inventory_state_label(wallet_state.get("inventory_state"))
- scoped = narrative_payload.get("scoped_state") if isinstance(narrative_payload.get("scoped_state"), dict) else {}
- micro = scoped.get("micro") if isinstance(scoped.get("micro"), dict) else {}
- micro_impulse = str(micro.get("impulse") or "mixed")
- micro_bias = str(micro.get("trend_bias") or "mixed")
- micro_reversal_risk = str(micro.get("reversal_risk") or "low")
- bullish_micro_clear = micro_impulse == "up" and micro_bias == "bullish" and micro_reversal_risk != "high"
- bearish_micro_clear = micro_impulse == "down" and micro_bias == "bearish" and micro_reversal_risk != "high"
- breakout_direction = _breakout_direction(breakout, stance)
- directional_micro_clear = bullish_micro_clear if breakout_direction == "bullish" else bearish_micro_clear if breakout_direction == "bearish" else False
- grid_fill = _grid_fill_proximity(current_primary, narrative_payload) if current_primary and current_primary["strategy_type"] == "grid_trader" else {"near_fill": False}
- grid_pressure = _grid_trend_pressure(current_primary, narrative_payload) if current_primary and current_primary["strategy_type"] == "grid_trader" else {"levels": 0.0, "rounded_levels": 0, "direction": "unknown"}
- severe_imbalance = inventory_state in SEVERE_INVENTORY_STATES
- action = "hold"
- mode = "observe"
- target_strategy = current_primary.get("id") if current_primary else (best.get("strategy_id") if best else None)
- reasons: list[str] = []
- blocks: list[str] = []
- trend = next((r for r in ranked if r["strategy_type"] == "trend_follower"), None)
- rebalance = next((r for r in ranked if r["strategy_type"] == "exposure_protector"), None)
- grid = next((r for r in ranked if r["strategy_type"] == "grid_trader"), None)
- switch_tradeoff: dict[str, Any] = {}
- if current_primary and current_primary["strategy_type"] == "grid_trader":
- action, mode, target_strategy, reasons, blocks = _decide_for_grid(
- current_primary=current_primary,
- stance=stance,
- inventory_state=inventory_state,
- wallet_state=wallet_state,
- breakout=breakout,
- grid_fill=grid_fill,
- grid_pressure=grid_pressure,
- directional_micro_clear=directional_micro_clear,
- severe_imbalance=severe_imbalance,
- trend=trend,
- rebalance=rebalance,
- )
- switch_tradeoff = _grid_switch_tradeoff(
- current_primary=current_primary,
- wallet_state=wallet_state,
- breakout=breakout,
- grid_fill=grid_fill,
- grid_pressure=grid_pressure,
- directional_micro_clear=directional_micro_clear,
- trend=trend,
- )
- elif current_primary and current_primary["strategy_type"] == "trend_follower":
- action, mode, target_strategy, reasons, blocks = _decide_for_trend(
- current_primary=current_primary,
- stance=stance,
- narrative_payload=narrative_payload,
- wallet_state=wallet_state,
- grid=grid,
- rebalance=rebalance,
- )
- elif current_primary and current_primary["strategy_type"] == "exposure_protector":
- action, mode, target_strategy, reasons, blocks = _decide_for_rebalancer(
- current_primary=current_primary,
- stance=stance,
- wallet_state=wallet_state,
- grid=grid,
- trend=trend,
- )
- else:
- if best and best["score"] >= 0.55:
- action = f"enable_{best['strategy_type']}"
- target_strategy = best["strategy_id"]
- mode = "act"
- reasons.extend(best["reasons"])
- else:
- action = "wait"
- mode = "observe"
- blocks.append("no strategy is yet a strong enough fit")
- reason_summary = reasons[0] if reasons else (blocks[0] if blocks else "strategy posture unchanged")
- confidence = float(narrative_payload.get("confidence") or 0.4)
- if action.startswith("replace_with") or action.startswith("enable_"):
- confidence += 0.08
- if wallet_state.get("rebalance_needed") and "grid" in action:
- confidence -= 0.08
- confidence = round(_clamp(confidence, 0.2, 0.95), 3)
- payload = {
- "generated_at": datetime.now(timezone.utc).isoformat(),
- "wallet_state": wallet_state,
- "narrative_stance": stance,
- "strategy_fit_ranking": ranked,
- "current_primary_strategy": current_primary.get("id") if current_primary else None,
- "argus_decision_context": _argus_decision_context(narrative_payload),
- "history_window": history_window or {},
- "grid_breakout_pressure": breakout,
- "grid_fill_context": grid_fill,
- "grid_switch_tradeoff": switch_tradeoff if current_primary and current_primary["strategy_type"] == "grid_trader" else {},
- "reason_chain": reasons,
- "blocks": blocks,
- "decision_version": 2,
- }
- return DecisionSnapshot(
- mode=mode,
- action=action,
- target_strategy=target_strategy,
- reason_summary=reason_summary,
- confidence=confidence,
- requires_action=mode == "act",
- payload=payload,
- )
|