| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893 |
- from __future__ import annotations
- """Deterministic strategy-supervision logic for Hermes.
- This is the first decision slice. Hermes is currently acting as a supervisor for
- existing trader strategies, not as a direct trading engine.
- Design intent:
- - prefer one active posture at a time over layered companions
- - detect when grid trading becomes unsafe because market posture or wallet
- balance no longer supports it
- - switch cleanly between directional, range, and rebalancing phases
- """
- import json
- from dataclasses import dataclass
- from datetime import datetime, timezone
- from typing import Any
- @dataclass(frozen=True)
- class DecisionSnapshot:
- mode: str
- action: str
- target_strategy: str | None
- reason_summary: str
- confidence: float
- requires_action: bool
- payload: dict[str, Any]
- def _clamp(value: float, lower: float, upper: float) -> float:
- return max(lower, min(upper, value))
- def _safe_float(value: Any) -> float | None:
- try:
- if value is None:
- return None
- return float(value)
- except Exception:
- return None
- def _decision_profile_config(decision_profile: dict[str, Any] | None) -> dict[str, Any]:
- if not isinstance(decision_profile, dict):
- return {}
- config = decision_profile.get("config")
- if isinstance(config, dict):
- return config
- raw = decision_profile.get("config_json")
- if isinstance(raw, str) and raw.strip():
- try:
- parsed = json.loads(raw)
- if isinstance(parsed, dict):
- return parsed
- except Exception:
- return {}
- return {}
- def _inventory_state_label(value: Any) -> str:
- state = str(value or "unknown").strip().lower()
- aliases = {
- "critical": "critically_unbalanced",
- "critically_imbalanced": "critically_unbalanced",
- "depleted_base": "depleted_base_side",
- "depleted_quote": "depleted_quote_side",
- "one_sided_base": "depleted_base_side",
- "one_sided_quote": "depleted_quote_side",
- }
- return aliases.get(state, state)
- def _timeframe_direction(feature: dict[str, Any] | None) -> str:
- if not isinstance(feature, dict):
- return "mixed"
- trend = feature.get("trend") if isinstance(feature.get("trend"), dict) else {}
- momentum = feature.get("momentum") if isinstance(feature.get("momentum"), dict) else {}
- alignment = str(trend.get("alignment") or "")
- if alignment in {"fully_bullish", "bullish_pullback"}:
- return "bullish"
- if alignment in {"fully_bearish", "bearish_pullback"}:
- return "bearish"
- bias_score = _safe_float(trend.get("bias_score"))
- if bias_score is not None:
- if bias_score >= 0.55:
- return "bullish"
- if bias_score <= -0.55:
- return "bearish"
- impulse = str(momentum.get("impulse") or "")
- if impulse == "up":
- return "bullish"
- if impulse == "down":
- return "bearish"
- return "mixed"
- def _short_term_trend_dislocated(narrative_payload: dict[str, Any]) -> bool:
- features = narrative_payload.get("features_by_timeframe") if isinstance(narrative_payload.get("features_by_timeframe"), dict) else {}
- short_dirs = [_timeframe_direction(features.get(tf)) for tf in ("1m", "5m")]
- higher_dirs = [_timeframe_direction(features.get(tf)) for tf in ("15m", "1h", "4h", "1d")]
- short_clean = [d for d in short_dirs if d in {"bullish", "bearish"}]
- higher_clean = [d for d in higher_dirs if d in {"bullish", "bearish"}]
- if not short_clean:
- return bool(higher_clean) and len(set(higher_clean)) == 1
- if any(d == "mixed" for d in short_dirs):
- return bool(higher_clean) and len(set(higher_clean)) == 1
- short_direction = short_clean[0] if len(set(short_clean)) == 1 else "mixed"
- if short_direction == "mixed":
- return True
- if not higher_clean:
- return False
- higher_direction = higher_clean[0] if len(set(higher_clean)) == 1 else "mixed"
- return higher_direction in {"bullish", "bearish"} and short_direction != higher_direction
- def _short_term_trend_manifest_score(narrative_payload: dict[str, Any], direction: str) -> float:
- features = narrative_payload.get("features_by_timeframe") if isinstance(narrative_payload.get("features_by_timeframe"), dict) else {}
- if direction not in {"bullish", "bearish"}:
- return 0.0
- total = 0.0
- seen = 0
- for timeframe in ("1m", "5m"):
- feature = features.get(timeframe) if isinstance(features.get(timeframe), dict) else None
- if not feature:
- continue
- seen += 1
- trend = feature.get("trend") if isinstance(feature.get("trend"), dict) else {}
- if not trend:
- local = 0.68
- else:
- alignment = str(trend.get("alignment") or "")
- strength = _safe_float(trend.get("strength")) or 0.0
- bias_score = abs(_safe_float(trend.get("bias_score")) or 0.0)
- local = 0.0
- if direction == "bullish":
- if alignment == "fully_bullish":
- local = 1.0
- elif alignment == "bullish_pullback":
- local = 0.62
- else:
- if alignment == "fully_bearish":
- local = 1.0
- elif alignment == "bearish_pullback":
- local = 0.62
- if local == 0.0:
- short_direction = _timeframe_direction(feature)
- if short_direction == direction:
- local = 0.34
- elif short_direction == "mixed":
- local = 0.12
- if strength >= 0.55:
- local += 0.16
- elif strength <= 0.2:
- local -= 0.08
- if bias_score >= 0.75:
- local += 0.08
- elif bias_score <= 0.2:
- local -= 0.04
- total += _clamp(local, 0.0, 1.0)
- if not seen:
- return 0.0
- return round(total / seen, 4)
- SEVERE_INVENTORY_STATES = {"critically_unbalanced", "depleted_base_side", "depleted_quote_side"}
- REBALANCE_INVENTORY_STATES = {"base_heavy", "quote_heavy", *SEVERE_INVENTORY_STATES}
- def _infer_market_pair(concern: dict[str, Any]) -> tuple[str, str]:
- base = str(concern.get("base_currency") or "").strip().upper()
- quote = str(concern.get("quote_currency") or "").strip().upper()
- if base and quote:
- return base, quote
- market = str(concern.get("market_symbol") or "").strip().upper().replace("/", "").replace("-", "")
- for suffix in ("USDT", "USDC", "USD", "EUR", "BTC", "ETH"):
- if market.endswith(suffix) and len(market) > len(suffix):
- inferred_base = market[:-len(suffix)]
- inferred_quote = suffix
- return base or inferred_base, quote or inferred_quote
- return base or market, quote or "USD"
- def assess_wallet_state(*, account_info: dict[str, Any], concern: dict[str, Any], price: float | None, strategies: list[dict[str, Any]] | None = None) -> dict[str, Any]:
- """Summarize inventory health for strategy switching.
- The key output is whether the wallet is balanced enough for range/grid
- harvesting, or so skewed that Hermes should prefer trend capture or
- rebalancing before grid is allowed again.
- """
- balances = account_info.get("balances") if isinstance(account_info.get("balances"), list) else []
- base, quote = _infer_market_pair(concern)
- base_available = 0.0
- quote_available = 0.0
- for item in balances:
- if not isinstance(item, dict):
- continue
- asset = str(item.get("asset_code") or item.get("asset") or "").upper()
- amount = _safe_float(item.get("available") if item.get("available") is not None else item.get("total"))
- if amount is None:
- continue
- if asset == base:
- base_available = amount
- elif asset == quote:
- quote_available = amount
- reserved_base = 0.0
- reserved_quote = 0.0
- for strategy in strategies or []:
- if not isinstance(strategy, dict):
- continue
- if str(strategy.get("account_id") or "").strip() != str(concern.get("account_id") or "").strip():
- continue
- market_symbol = str(strategy.get("market_symbol") or "").strip().lower()
- if market_symbol and market_symbol != str(concern.get("market_symbol") or "").strip().lower():
- continue
- if str(strategy.get("mode") or "off") == "off":
- continue
- state = strategy.get("state") if isinstance(strategy.get("state"), dict) else {}
- orders = state.get("orders") if isinstance(state.get("orders"), list) else []
- for order in orders:
- if not isinstance(order, dict):
- continue
- if str(order.get("status") or "open").lower() not in {"open", "live", "active"}:
- continue
- side = str(order.get("side") or "").lower()
- amount = _safe_float(order.get("amount") or order.get("amount_remaining")) or 0.0
- order_price = _safe_float(order.get("price")) or price or 0.0
- if side == "sell":
- reserved_base += amount
- elif side == "buy":
- reserved_quote += amount * order_price
- price = price or 0.0
- effective_base = base_available + reserved_base
- effective_quote = quote_available + reserved_quote
- base_value = effective_base * price if price > 0 else 0.0
- quote_value = effective_quote
- total_value = base_value + quote_value
- base_ratio = (base_value / total_value) if total_value > 0 else 0.5
- quote_ratio = (quote_value / total_value) if total_value > 0 else 0.5
- imbalance = abs(base_ratio - 0.5)
- if total_value <= 0:
- inventory_state = "unknown"
- elif base_ratio <= 0.02:
- inventory_state = "depleted_base_side"
- elif quote_ratio <= 0.02:
- inventory_state = "depleted_quote_side"
- elif base_ratio < 0.08:
- inventory_state = "critically_unbalanced"
- elif quote_ratio < 0.08:
- inventory_state = "critically_unbalanced"
- elif imbalance >= 0.35:
- inventory_state = "critically_unbalanced"
- elif base_ratio > 0.62:
- inventory_state = "base_heavy"
- elif quote_ratio > 0.62:
- inventory_state = "quote_heavy"
- else:
- inventory_state = "balanced"
- grid_ready = inventory_state == "balanced"
- rebalance_needed = inventory_state in REBALANCE_INVENTORY_STATES
- return {
- "generated_at": datetime.now(timezone.utc).isoformat(),
- "base_currency": base,
- "quote_currency": quote,
- "base_available": round(base_available, 8),
- "quote_available": round(quote_available, 8),
- "base_reserved": round(reserved_base, 8),
- "quote_reserved": round(reserved_quote, 8),
- "base_effective": round(effective_base, 8),
- "quote_effective": round(effective_quote, 8),
- "base_value": round(base_value, 4),
- "quote_value": round(quote_value, 4),
- "total_value": round(total_value, 4),
- "base_ratio": round(base_ratio, 4),
- "quote_ratio": round(quote_ratio, 4),
- "imbalance_score": round(imbalance, 4),
- "inventory_state": inventory_state,
- "grid_ready": grid_ready,
- "rebalance_needed": rebalance_needed,
- }
- def normalize_strategy_snapshot(strategy: dict[str, Any]) -> dict[str, Any]:
- strategy_type = str(strategy.get("strategy_type") or "unknown")
- mode = str(strategy.get("mode") or "off")
- state = strategy.get("state") if isinstance(strategy.get("state"), dict) else {}
- config = strategy.get("config") if isinstance(strategy.get("config"), dict) else {}
- report = strategy.get("report") if isinstance(strategy.get("report"), dict) else {}
- report_fit = report.get("fit") if isinstance(report.get("fit"), dict) else {}
- report_supervision = report.get("supervision") if isinstance(report.get("supervision"), dict) else {}
- report_state = report.get("state") if isinstance(report.get("state"), dict) else {}
- # Stable minimum contract used by Hermes while the trader-side strategy
- # metadata evolves. These values can later be sourced directly from richer
- # reports, but the decision layer keeps a normalized shape from day one.
- defaults = {
- "grid_trader": {
- "role": "primary",
- "inventory_behavior": "balanced",
- "requires_rebalance_before_start": False,
- "requires_rebalance_before_stop": False,
- "safe_when_unbalanced": False,
- "can_run_with": ["exposure_protector"],
- },
- "trend_follower": {
- "role": "primary",
- "inventory_behavior": "accumulative_long",
- "requires_rebalance_before_start": False,
- "requires_rebalance_before_stop": False,
- "safe_when_unbalanced": True,
- "can_run_with": [],
- "trade_side": "both",
- },
- "exposure_protector": {
- "role": "rebalancing",
- "inventory_behavior": "rebalancing",
- "requires_rebalance_before_start": False,
- "requires_rebalance_before_stop": False,
- "safe_when_unbalanced": True,
- "can_run_with": [],
- "rebalance_tolerance": 0.3,
- },
- }
- contract = defaults.get(strategy_type, {
- "role": "primary",
- "inventory_behavior": "unknown",
- "requires_rebalance_before_start": False,
- "requires_rebalance_before_stop": False,
- "safe_when_unbalanced": True,
- "can_run_with": [],
- })
- contract = {**contract, **report_fit}
- return {
- "id": strategy.get("id"),
- "strategy_type": strategy_type,
- "mode": mode,
- "enabled": mode != "off",
- "status": strategy.get("status") or ("running" if mode != "off" else "stopped"),
- "market_symbol": strategy.get("market_symbol"),
- "account_id": strategy.get("account_id"),
- "open_order_count": int(state.get("open_order_count") or report_state.get("open_order_count") or strategy.get("open_order_count") or 0),
- "last_action": state.get("last_action") or report_state.get("last_action") or strategy.get("last_side"),
- "last_error": state.get("last_error") or report_state.get("last_error") or "",
- "contract": contract,
- "trade_side": str(config.get("trade_side") or contract.get("trade_side") or "both"),
- "supervision": report_supervision,
- "config": config,
- "state": {**report_state, **state},
- }
- def _argus_decision_context(narrative_payload: dict[str, Any]) -> dict[str, Any]:
- argus = narrative_payload.get("argus_context") if isinstance(narrative_payload.get("argus_context"), dict) else {}
- regime = str(argus.get("regime") or argus.get("snapshot_regime") or "").strip()
- confidence_raw = argus.get("regime_confidence") if argus.get("regime_confidence") is not None else argus.get("snapshot_confidence")
- confidence = float(confidence_raw) if isinstance(confidence_raw, (int, float)) else 0.0
- components = argus.get("regime_components") if isinstance(argus.get("regime_components"), dict) else {}
- if not components:
- components = argus.get("snapshot_components") if isinstance(argus.get("snapshot_components"), dict) else {}
- compression = float(components.get("compression") or 0.0)
- compression_active = regime == "compression" and confidence >= 0.55 and compression >= 0.65
- return {
- "regime": regime,
- "confidence": round(confidence, 4),
- "components": components,
- "compression": round(compression, 4),
- "compression_active": compression_active,
- }
- def _parse_timestamp(value: Any) -> datetime | None:
- text = str(value or "").strip()
- if not text:
- return None
- if text.endswith("Z"):
- text = text[:-1] + "+00:00"
- try:
- parsed = datetime.fromisoformat(text)
- except Exception:
- return None
- if parsed.tzinfo is None:
- return parsed.replace(tzinfo=timezone.utc)
- return parsed.astimezone(timezone.utc)
- def _recent_switch_cooldown_active(history_window: dict[str, Any] | None, concern_id: str, cooldown_seconds: int) -> tuple[bool, float | None, str | None]:
- if cooldown_seconds <= 0:
- return False, None, None
- rows = history_window.get("recent_decisions") if isinstance(history_window, dict) and isinstance(history_window.get("recent_decisions"), list) else []
- now = datetime.now(timezone.utc)
- for row in rows:
- if not isinstance(row, dict):
- continue
- if concern_id and str(row.get("concern_id") or "") != concern_id:
- continue
- mode = str(row.get("mode") or "").lower()
- action = str(row.get("action") or "")
- target = str(row.get("target_strategy") or "")
- if mode != "act" or not action or not target:
- continue
- created = _parse_timestamp(row.get("created_at"))
- if not created:
- continue
- elapsed = (now - created).total_seconds()
- if elapsed < cooldown_seconds:
- return True, round(max(cooldown_seconds - elapsed, 0.0), 1), action
- return False, None, action
- return False, None, None
- def score_strategy_fit(*, strategy: dict[str, Any], narrative: dict[str, Any], wallet_state: dict[str, Any]) -> dict[str, Any]:
- stance = str(narrative.get("stance") or "neutral_rotational")
- opportunity_map = narrative.get("opportunity_map") if isinstance(narrative.get("opportunity_map"), dict) else {}
- breakout_pressure = narrative.get("grid_breakout_pressure") if isinstance(narrative.get("grid_breakout_pressure"), dict) else {}
- breakout_phase = str(breakout_pressure.get("phase") or "none")
- continuation = float(opportunity_map.get("continuation") or 0.0)
- mean_reversion = float(opportunity_map.get("mean_reversion") or 0.0)
- reversal = float(opportunity_map.get("reversal") or 0.0)
- wait = float(opportunity_map.get("wait") or 0.0)
- inventory_state = _inventory_state_label(wallet_state.get("inventory_state"))
- argus_context = _argus_decision_context(narrative)
- strategy_type = strategy["strategy_type"]
- supervision = strategy.get("supervision") if isinstance(strategy.get("supervision"), dict) else {}
- inventory_pressure = str(supervision.get("inventory_pressure") or "")
- capacity_available = bool(supervision.get("capacity_available"))
- side_capacity = supervision.get("side_capacity") if isinstance(supervision.get("side_capacity"), dict) else {}
- score = 0.0
- reasons: list[str] = []
- blocks: list[str] = []
- if strategy_type == "grid_trader":
- score += mean_reversion * 1.8
- if stance in {"neutral_rotational", "breakout_watch"}:
- score += 0.45
- reasons.append("narrative still supports rotational structure")
- if continuation >= 0.45:
- score -= 0.8
- blocks.append("continuation pressure is too strong for safe grid harvesting")
- if inventory_state != "balanced":
- score -= 1.0
- blocks.append(f"wallet is not grid-ready: {inventory_state}")
- else:
- reasons.append("wallet is balanced enough for two-sided harvesting")
- if not capacity_available:
- score -= 0.25
- blocks.append("grid report shows one-sided capacity")
- if side_capacity and not (bool(side_capacity.get("buy", True)) and bool(side_capacity.get("sell", True))):
- score -= 0.25
- blocks.append("grid side capacity is asymmetric")
- if argus_context["compression_active"]:
- score += 0.2
- reasons.append("Argus compression supports staying selective with grid")
- elif strategy_type == "trend_follower":
- score += continuation * 1.9
- trade_side = _strategy_trade_side(strategy)
- narrative_direction = _narrative_direction(narrative)
- if stance in {"constructive_bullish", "cautious_bullish", "constructive_bearish", "cautious_bearish"}:
- score += 0.5
- reasons.append("narrative supports directional continuation")
- if trade_side == "buy":
- if narrative_direction == "bullish":
- score += 0.6
- reasons.append("buy-side trend instance matches bullish direction")
- elif narrative_direction == "bearish":
- score -= 0.9
- blocks.append("buy-side trend instance conflicts with bearish direction")
- elif trade_side == "sell":
- if narrative_direction == "bearish":
- score += 0.6
- reasons.append("sell-side trend instance matches bearish direction")
- elif narrative_direction == "bullish":
- score -= 0.9
- blocks.append("sell-side trend instance conflicts with bullish direction")
- if breakout_phase == "confirmed":
- score += 0.45
- reasons.append("confirmed breakout pressure supports directional continuation")
- elif breakout_phase == "developing":
- score += 0.2
- reasons.append("breakout pressure is developing in trend's favor")
- if wait >= 0.45 and breakout_phase != "confirmed":
- score -= 0.35
- blocks.append("market still has too much wait/uncertainty for trend commitment")
- if inventory_state in SEVERE_INVENTORY_STATES:
- score -= 0.25
- blocks.append("wallet may be too skewed for clean directional scaling")
- if inventory_pressure in {"base_heavy", "quote_heavy"}:
- score -= 0.1
- blocks.append("trend report shows rising inventory pressure")
- if not capacity_available:
- score -= 0.1
- blocks.append("trend strength is below its own capacity threshold")
- if trade_side == "both" and narrative_direction in {"bullish", "bearish"}:
- score += 0.15
- reasons.append("generic trend instance can follow either side")
- if argus_context["compression_active"] and breakout_phase != "confirmed":
- score -= 0.15
- blocks.append("Argus compression says the broader tape is still range-like")
- elif strategy_type == "exposure_protector":
- score += reversal * 0.4 + wait * 0.5
- if wallet_state.get("rebalance_needed"):
- score += 1.1
- reasons.append("wallet imbalance calls for rebalancing protection")
- if inventory_state in SEVERE_INVENTORY_STATES:
- score += 0.45
- reasons.append("inventory drift is high enough to justify defensive action")
- if stance in {"constructive_bullish", "constructive_bearish"} and continuation > 0.65:
- score -= 0.2
- if inventory_pressure in {"critical", "elevated"}:
- score += 0.25
- reasons.append("protector reports active inventory pressure")
- if strategy.get("last_error"):
- score -= 0.25
- blocks.append("strategy recently reported an error")
- if bool(supervision.get("degraded")):
- score -= 0.15
- blocks.append("strategy self-reports degraded supervision state")
- return {
- "strategy_id": strategy.get("id"),
- "strategy_type": strategy_type,
- "score": round(score, 4),
- "reasons": reasons,
- "blocks": blocks,
- "enabled": strategy.get("enabled", False),
- }
- def _breakout_phase_from_score(score: float) -> str:
- if score >= 3.45:
- return "confirmed"
- if score >= 2.45:
- return "developing"
- if score >= 1.4:
- return "probing"
- return "none"
- def _local_breakout_snapshot(narrative_payload: dict[str, Any]) -> dict[str, Any]:
- scoped = narrative_payload.get("scoped_state") if isinstance(narrative_payload.get("scoped_state"), dict) else {}
- cross = narrative_payload.get("cross_scope_summary") if isinstance(narrative_payload.get("cross_scope_summary"), dict) else {}
- micro = scoped.get("micro") if isinstance(scoped.get("micro"), dict) else {}
- meso = scoped.get("meso") if isinstance(scoped.get("meso"), dict) else {}
- macro = scoped.get("macro") if isinstance(scoped.get("macro"), dict) else {}
- micro_impulse = str(micro.get("impulse") or "mixed")
- micro_bias = str(micro.get("trend_bias") or "mixed")
- meso_structure = str(meso.get("structure") or "rotation")
- meso_bias = str(meso.get("momentum_bias") or "neutral")
- macro_bias = str(macro.get("bias") or "mixed")
- alignment = str(cross.get("alignment") or "partial_alignment")
- friction = str(cross.get("friction") or "medium")
- micro_directional = micro_impulse in {"up", "down"} and micro_bias in {"bullish", "bearish"}
- meso_directional = meso_structure == "trend_continuation" and meso_bias in {"bullish", "bearish"}
- macro_supportive = macro_bias in {"bullish", "bearish"}
- score = 0.0
- if micro_directional:
- score += 1.0
- if meso_directional:
- score += 1.1
- if macro_supportive:
- score += 0.55
- if alignment == "micro_meso_macro_aligned":
- score += 0.8
- elif alignment == "partial_alignment":
- score += 0.35
- if friction == "low":
- score += 0.45
- elif friction == "medium":
- score += 0.15
- return {
- "score": round(score, 4),
- "phase": _breakout_phase_from_score(score),
- "micro_impulse": micro_impulse,
- "micro_bias": micro_bias,
- "meso_structure": meso_structure,
- "meso_bias": meso_bias,
- "macro_bias": macro_bias,
- "alignment": alignment,
- "friction": friction,
- }
- def _breakout_memory(narrative_payload: dict[str, Any], history_window: dict[str, Any] | None, current_breakout: dict[str, Any]) -> dict[str, Any]:
- recent_states = history_window.get("recent_states") if isinstance(history_window, dict) and isinstance(history_window.get("recent_states"), list) else []
- window_seconds = int(history_window.get("window_seconds") or 0) if isinstance(history_window, dict) else 0
- current_ts = _parse_timestamp(narrative_payload.get("generated_at")) or datetime.now(timezone.utc)
- current_direction = str(current_breakout.get("meso_bias") or "neutral")
- directional = current_direction in {"bullish", "bearish"} and current_breakout.get("meso_structure") == "trend_continuation"
- if not directional:
- return {"window_seconds": window_seconds, "samples_considered": 0, "qualifying_samples": 0, "same_direction_seconds": 0, "promoted_to_confirmed": False}
- qualifying_samples = 0
- oldest_match: datetime | None = None
- for row in recent_states:
- if not isinstance(row, dict):
- continue
- try:
- payload = json.loads(row.get("payload_json") or "{}")
- except Exception:
- continue
- snapshot = _local_breakout_snapshot(payload)
- sample_ts = _parse_timestamp(row.get("created_at") or payload.get("generated_at"))
- if sample_ts is None:
- continue
- if snapshot.get("phase") not in {"developing", "confirmed"}:
- continue
- if str(snapshot.get("meso_bias") or "neutral") != current_direction:
- continue
- if str(snapshot.get("macro_bias") or "mixed") != str(current_breakout.get("macro_bias") or "mixed"):
- continue
- qualifying_samples += 1
- if oldest_match is None:
- oldest_match = sample_ts
- same_direction_seconds = int((current_ts - oldest_match).total_seconds()) if oldest_match else 0
- promoted = current_breakout.get("phase") == "developing" and qualifying_samples >= 2 and same_direction_seconds >= min(window_seconds, 8 * 60)
- return {
- "window_seconds": window_seconds,
- "samples_considered": len(recent_states),
- "qualifying_samples": qualifying_samples,
- "same_direction_seconds": max(0, same_direction_seconds),
- "promoted_to_confirmed": promoted,
- }
- def _grid_breakout_pressure(narrative_payload: dict[str, Any], history_window: dict[str, Any] | None = None) -> dict[str, Any]:
- argus_context = _argus_decision_context(narrative_payload)
- breakout = _local_breakout_snapshot(narrative_payload)
- memory = _breakout_memory(narrative_payload, history_window, breakout)
- phase = str(breakout.get("phase") or "none")
- if memory["promoted_to_confirmed"]:
- phase = "confirmed"
- persistent = phase == "confirmed"
- return {
- "persistent": persistent,
- "phase": phase,
- "score": breakout["score"],
- "micro_impulse": breakout["micro_impulse"],
- "micro_bias": breakout["micro_bias"],
- "meso_structure": breakout["meso_structure"],
- "meso_bias": breakout["meso_bias"],
- "macro_bias": breakout["macro_bias"],
- "alignment": breakout["alignment"],
- "friction": breakout["friction"],
- "time_window_memory": memory,
- "argus_regime": argus_context["regime"],
- "argus_confidence": argus_context["confidence"],
- "argus_compression_active": argus_context["compression_active"],
- }
- def _select_current_primary(strategies: list[dict[str, Any]]) -> dict[str, Any] | None:
- primaries = [s for s in strategies if s["strategy_type"] in {"grid_trader", "trend_follower", "exposure_protector"} and s.get("mode") != "off"]
- if not primaries:
- return None
- active = next((s for s in primaries if s.get("mode") == "active"), None)
- if active:
- return active
- return primaries[0]
- def _inventory_breakout_is_directionally_compatible(inventory_state: str, breakout: dict[str, Any]) -> bool:
- inventory_state = _inventory_state_label(inventory_state)
- macro_bias = str(breakout.get("macro_bias") or "mixed")
- meso_bias = str(breakout.get("meso_bias") or "neutral")
- bullish = macro_bias == "bullish" and meso_bias == "bullish"
- bearish = macro_bias == "bearish" and meso_bias == "bearish"
- if bullish and inventory_state in {"depleted_base_side", "quote_heavy"}:
- return True
- if bearish and inventory_state in {"depleted_quote_side", "base_heavy"}:
- return True
- return False
- def _trend_cooling_edge(narrative_payload: dict[str, Any], wallet_state: dict[str, Any], profile_config: dict[str, Any] | None = None) -> bool:
- profile_config = profile_config if isinstance(profile_config, dict) else {}
- scoped = narrative_payload.get("scoped_state") if isinstance(narrative_payload.get("scoped_state"), dict) else {}
- short_term_dislocated = _short_term_trend_dislocated(narrative_payload)
- micro = scoped.get("micro") if isinstance(scoped.get("micro"), dict) else {}
- meso = scoped.get("meso") if isinstance(scoped.get("meso"), dict) else {}
- micro_impulse = str(micro.get("impulse") or "mixed")
- micro_bias = str(micro.get("trend_bias") or "mixed")
- micro_location = str(micro.get("location") or "unknown")
- micro_reversal_risk = str(micro.get("reversal_risk") or "low")
- meso_bias = str(meso.get("momentum_bias") or "neutral")
- meso_structure = str(meso.get("structure") or "rotation")
- inventory_state = _inventory_state_label(wallet_state.get("inventory_state"))
- early_reversal_warning = micro_reversal_risk in {"medium", "high"}
- short_term_warning = short_term_dislocated and meso_structure == "trend_continuation"
- micro_weight = _safe_float(profile_config.get("micro_trend_weight"))
- if micro_weight is None:
- micro_weight = 0.8
- meso_weight = _safe_float(profile_config.get("meso_trend_weight"))
- if meso_weight is None:
- meso_weight = 1.0
- cooling_threshold = _safe_float(profile_config.get("trend_cooling_threshold"))
- if cooling_threshold is None:
- cooling_threshold = 0.45
- bullish_inventory_pressure = inventory_state in {"base_heavy", "critically_unbalanced", "depleted_quote_side"}
- bearish_inventory_pressure = inventory_state in {"quote_heavy", "critically_unbalanced", "depleted_base_side"}
- bullish_cooling_score = 0.0
- if meso_structure == "trend_continuation" and meso_bias == "bullish":
- bullish_cooling_score += 0.15 * meso_weight
- if micro_impulse == "mixed":
- bullish_cooling_score += 0.15 * micro_weight
- if early_reversal_warning:
- bullish_cooling_score += 0.25 * micro_weight
- if short_term_warning:
- bullish_cooling_score += 0.32 * micro_weight
- if micro_bias == "bearish":
- bullish_cooling_score += 0.15 * micro_weight
- if micro_location in {"near_upper_band", "upper_half", "centered"}:
- bullish_cooling_score += 0.15 * micro_weight
- bearish_cooling_score = 0.0
- if meso_structure == "trend_continuation" and meso_bias == "bearish":
- bearish_cooling_score += 0.15 * meso_weight
- if micro_impulse == "mixed":
- bearish_cooling_score += 0.15 * micro_weight
- if early_reversal_warning:
- bearish_cooling_score += 0.25 * micro_weight
- if short_term_warning:
- bearish_cooling_score += 0.32 * micro_weight
- if micro_bias == "bullish":
- bearish_cooling_score += 0.15 * micro_weight
- if micro_location in {"near_lower_band", "lower_half", "centered"}:
- bearish_cooling_score += 0.15 * micro_weight
- bullish_cooling = (
- meso_structure == "trend_continuation"
- and meso_bias == "bullish"
- and (micro_impulse == "mixed" or early_reversal_warning or short_term_warning)
- and micro_bias in {"mixed", "bearish", "bullish"}
- and (short_term_warning or micro_location in {"near_upper_band", "upper_half", "centered"})
- and bullish_cooling_score >= cooling_threshold
- )
- bearish_cooling = (
- meso_structure == "trend_continuation"
- and meso_bias == "bearish"
- and (micro_impulse == "mixed" or early_reversal_warning or short_term_warning)
- and micro_bias in {"mixed", "bullish", "bearish"}
- and (short_term_warning or micro_location in {"near_lower_band", "lower_half", "centered"})
- and bearish_cooling_score >= cooling_threshold
- )
- return bullish_cooling or bearish_cooling
- def _grid_fill_proximity(strategy: dict[str, Any], narrative_payload: dict[str, Any]) -> dict[str, Any]:
- state = strategy.get("state") if isinstance(strategy.get("state"), dict) else {}
- orders = state.get("orders") if isinstance(state.get("orders"), list) else []
- features = narrative_payload.get("features_by_timeframe") if isinstance(narrative_payload.get("features_by_timeframe"), dict) else {}
- micro_raw = features.get("1m", {}).get("raw", {}) if isinstance(features.get("1m"), dict) else {}
- current_price = _safe_float(micro_raw.get("price") or state.get("last_price") or state.get("center_price"))
- atr_percent = _safe_float(micro_raw.get("atr_percent")) or 0.0
- if not current_price or current_price <= 0:
- return {"near_fill": False}
- sell_prices: list[float] = []
- buy_prices: list[float] = []
- for order in orders:
- if not isinstance(order, dict):
- continue
- if str(order.get("status") or "open").lower() not in {"open", "live", "active"}:
- continue
- price = _safe_float(order.get("price"))
- if price is None or price <= 0:
- continue
- side = str(order.get("side") or "").lower()
- if side == "sell" and price >= current_price:
- sell_prices.append(price)
- elif side == "buy" and price <= current_price:
- buy_prices.append(price)
- next_sell = min(sell_prices) if sell_prices else None
- next_buy = max(buy_prices) if buy_prices else None
- next_sell_distance_pct = (((next_sell - current_price) / current_price) * 100.0) if next_sell else None
- next_buy_distance_pct = (((current_price - next_buy) / current_price) * 100.0) if next_buy else None
- threshold_pct = max(0.25, atr_percent * 1.5)
- near_sell_fill = bool(
- next_sell_distance_pct is not None
- and next_sell_distance_pct >= 0
- and next_sell_distance_pct <= threshold_pct
- and next_buy is not None
- )
- near_buy_fill = bool(
- next_buy_distance_pct is not None
- and next_buy_distance_pct >= 0
- and next_buy_distance_pct <= threshold_pct
- and next_sell is not None
- )
- near_fill_side: str | None = None
- if near_sell_fill and near_buy_fill:
- near_fill_side = "sell" if (next_sell_distance_pct or 0.0) <= (next_buy_distance_pct or 0.0) else "buy"
- elif near_sell_fill:
- near_fill_side = "sell"
- elif near_buy_fill:
- near_fill_side = "buy"
- return {
- "near_fill": bool(near_sell_fill or near_buy_fill),
- "near_fill_side": near_fill_side,
- "near_sell_fill": near_sell_fill,
- "near_buy_fill": near_buy_fill,
- "current_price": current_price,
- "next_sell": next_sell,
- "next_buy": next_buy,
- "next_sell_distance_pct": round(next_sell_distance_pct, 4) if next_sell_distance_pct is not None else None,
- "next_buy_distance_pct": round(next_buy_distance_pct, 4) if next_buy_distance_pct is not None else None,
- "threshold_pct": round(threshold_pct, 4),
- }
- def _grid_fill_fights_breakout(grid_fill: dict[str, Any], breakout: dict[str, Any]) -> bool:
- """Whether a nearby grid fill is trading against the breakout move.
- Current product requirement: grid proximity-to-fills should not block or trigger a handoff.
- We only care about overall regime/tradeoff (fees vs staying), not which side happens to fill.
- """
- return False
- def _recent_1m_price_trace(history_window: dict[str, Any] | None) -> list[tuple[datetime, float]]:
- recent_states = history_window.get("recent_states") if isinstance(history_window, dict) and isinstance(history_window.get("recent_states"), list) else []
- trace: list[tuple[datetime, float]] = []
- for row in recent_states:
- if not isinstance(row, dict):
- continue
- try:
- payload = json.loads(row.get("payload_json") or "{}")
- except Exception:
- continue
- features = payload.get("features_by_timeframe") if isinstance(payload.get("features_by_timeframe"), dict) else {}
- micro = features.get("1m") if isinstance(features.get("1m"), dict) else {}
- raw = micro.get("raw") if isinstance(micro.get("raw"), dict) else {}
- price = _safe_float(raw.get("price"))
- if price is None:
- continue
- timestamp = _parse_timestamp(row.get("created_at") or payload.get("generated_at"))
- if timestamp is None:
- continue
- trace.append((timestamp, price))
- trace.sort(key=lambda item: item[0])
- return trace
- def _breakout_direction(breakout: dict[str, Any], stance: str | None = None) -> str | None:
- meso_bias = str(breakout.get("meso_bias") or "")
- micro_bias = str(breakout.get("micro_bias") or "")
- if meso_bias in {"bullish", "bearish"}:
- return meso_bias
- if micro_bias in {"bullish", "bearish"}:
- return micro_bias
- stance_text = str(stance or "")
- if "bullish" in stance_text:
- return "bullish"
- if "bearish" in stance_text:
- return "bearish"
- return None
- def _narrative_direction(narrative: dict[str, Any]) -> str | None:
- stance = str(narrative.get("stance") or "")
- breakout = narrative.get("grid_breakout_pressure") if isinstance(narrative.get("grid_breakout_pressure"), dict) else {}
- direction = _breakout_direction(breakout, stance)
- if direction:
- return direction
- if stance in {"constructive_bullish", "cautious_bullish", "fragile_bullish"}:
- return "bullish"
- if stance in {"constructive_bearish", "cautious_bearish", "fragile_bearish"}:
- return "bearish"
- return None
- def _direction_label_from_score(score: float, bullish_threshold: float = 0.18) -> str:
- if score >= bullish_threshold:
- return "bullish"
- if score <= -bullish_threshold:
- return "bearish"
- return "mixed"
- def _extract_decision_signals(*,
- narrative_payload: dict[str, Any],
- wallet_state: dict[str, Any],
- grid_strategy: dict[str, Any] | None = None,
- breakout: dict[str, Any] | None = None,
- history_window: dict[str, Any] | None = None,
- decision_profile: dict[str, Any] | None = None,
- ) -> dict[str, Any]:
- scoped = narrative_payload.get("scoped_state") if isinstance(narrative_payload.get("scoped_state"), dict) else {}
- cross = narrative_payload.get("cross_scope_summary") if isinstance(narrative_payload.get("cross_scope_summary"), dict) else {}
- features = narrative_payload.get("features_by_timeframe") if isinstance(narrative_payload.get("features_by_timeframe"), dict) else {}
- opportunity_map = narrative_payload.get("opportunity_map") if isinstance(narrative_payload.get("opportunity_map"), dict) else {}
- embedded = narrative_payload.get("decision_inputs") if isinstance(narrative_payload.get("decision_inputs"), dict) else {}
- micro = scoped.get("micro") if isinstance(scoped.get("micro"), dict) else {}
- meso = scoped.get("meso") if isinstance(scoped.get("meso"), dict) else {}
- macro = scoped.get("macro") if isinstance(scoped.get("macro"), dict) else {}
- micro_features = features.get("1m") if isinstance(features.get("1m"), dict) else {}
- micro_vol = micro_features.get("volatility") if isinstance(micro_features.get("volatility"), dict) else {}
- micro_raw = micro_features.get("raw") if isinstance(micro_features.get("raw"), dict) else {}
- recent_prices = _recent_1m_price_trace(history_window)
- continuation = float(opportunity_map.get("continuation") or 0.0)
- alignment = str(cross.get("alignment") or "partial_alignment")
- friction = str(cross.get("friction") or "medium")
- micro_impulse = str(micro.get("impulse") or "mixed")
- micro_bias = str(micro.get("trend_bias") or "mixed")
- micro_location = str(micro.get("location") or embedded.get("micro_location") or "unknown")
- micro_reversal_risk = str(micro.get("reversal_risk") or "low")
- meso_structure = str(meso.get("structure") or "rotation")
- meso_bias = str(meso.get("momentum_bias") or "neutral")
- macro_bias = str(macro.get("bias") or "mixed")
- profile_config = _decision_profile_config(decision_profile)
- short_term_trend_min_score = _safe_float(profile_config.get("short_term_trend_min_score"))
- if short_term_trend_min_score is None:
- short_term_trend_min_score = _safe_float(profile_config.get("short_term_confirmation_min"))
- if short_term_trend_min_score is None:
- short_term_trend_min_score = 0.32
- breakout_persistence_min = _safe_float(profile_config.get("breakout_persistence_min"))
- if breakout_persistence_min is None:
- breakout_persistence_min = 0.65
- trend_hold_threshold = _safe_float(profile_config.get("trend_hold_threshold"))
- if trend_hold_threshold is None:
- trend_hold_threshold = 0.56
- grid_release_threshold = _safe_float(profile_config.get("grid_release_threshold"))
- if grid_release_threshold is None:
- grid_release_threshold = 0.35
- structural_direction = str(embedded.get("structural_direction") or "")
- if structural_direction not in {"bullish", "bearish"}:
- structural_direction = meso_bias if meso_bias in {"bullish", "bearish"} else macro_bias if macro_bias in {"bullish", "bearish"} else "mixed"
- structural_strength = _safe_float(embedded.get("structural_trend_strength"))
- if structural_strength is None:
- structural_strength = 0.0
- if meso_structure == "trend_continuation" and meso_bias in {"bullish", "bearish"}:
- structural_strength += 0.45
- elif meso_structure in {"bullish_pullback", "bearish_pullback"} and meso_bias in {"bullish", "bearish"}:
- structural_strength += 0.25
- if macro_bias in {"bullish", "bearish"} and macro_bias == structural_direction:
- structural_strength += 0.25
- if alignment == "micro_meso_macro_aligned":
- structural_strength += 0.2
- elif alignment == "partial_alignment":
- structural_strength += 0.1
- if friction == "high":
- structural_strength -= 0.18
- structural_strength = round(_clamp(structural_strength, 0.0, 1.0), 4)
- tactical_direction = str(embedded.get("tactical_direction") or "")
- if tactical_direction not in {"bullish", "bearish", "mixed"}:
- micro_score = 0.0
- if micro_impulse == "up":
- micro_score += 0.35
- elif micro_impulse == "down":
- micro_score -= 0.35
- if micro_bias == "bullish":
- micro_score += 0.45
- elif micro_bias == "bearish":
- micro_score -= 0.45
- tactical_direction = _direction_label_from_score(micro_score)
- tactical_strength = _safe_float(embedded.get("tactical_trend_strength"))
- if tactical_strength is None:
- tactical_strength = 0.0
- if micro_impulse in {"up", "down"} and micro_bias in {"bullish", "bearish"}:
- tactical_strength += 0.45
- elif micro_impulse in {"up", "down"}:
- tactical_strength += 0.2
- if micro_location in {"near_upper_band", "near_lower_band"}:
- tactical_strength += 0.1
- if micro_reversal_risk == "medium":
- tactical_strength -= 0.12
- elif micro_reversal_risk == "high":
- tactical_strength -= 0.25
- tactical_strength = round(_clamp(tactical_strength, 0.0, 1.0), 4)
- tactical_range_quality = _safe_float(embedded.get("tactical_range_quality"))
- if tactical_range_quality is None:
- tactical_range_quality = 0.0
- if micro_impulse == "mixed":
- tactical_range_quality += 0.35
- if micro_bias == "mixed":
- tactical_range_quality += 0.2
- if micro_location in {"centered", "lower_half", "upper_half"}:
- tactical_range_quality += 0.18
- if friction == "high":
- tactical_range_quality += 0.08
- if micro_reversal_risk == "high":
- tactical_range_quality -= 0.08
- tactical_range_quality = round(_clamp(tactical_range_quality, 0.0, 1.0), 4)
- tactical_easing = bool(embedded.get("tactical_easing"))
- if not tactical_easing:
- tactical_easing = bool(
- meso_structure == "trend_continuation"
- and meso_bias in {"bullish", "bearish"}
- and (
- micro_impulse == "mixed"
- or micro_bias == "mixed"
- or micro_reversal_risk in {"medium", "high"}
- or micro_location == "centered"
- )
- )
- breakout = breakout or {}
- breakout_phase = str(breakout.get("phase") or "none")
- breakout_persistence = 1.0 if bool(breakout.get("persistent")) else 0.65 if breakout_phase == "developing" else 0.35 if breakout_phase == "probing" else 0.0
- grid_step_pct = None
- if grid_strategy:
- state = grid_strategy.get("state") if isinstance(grid_strategy.get("state"), dict) else {}
- config = grid_strategy.get("config") if isinstance(grid_strategy.get("config"), dict) else {}
- grid_step_pct = _safe_float(config.get("grid_step_pct") or state.get("grid_step_pct") or state.get("recenter_pct_live"))
- atr_percent = _safe_float(embedded.get("micro_atr_percent"))
- if atr_percent is None:
- atr_percent = _safe_float(micro_raw.get("atr_percent"))
- band_width_pct = _safe_float(embedded.get("micro_bollinger_width_pct"))
- if band_width_pct is None:
- band_width_pct = _safe_float(micro_vol.get("bollinger_width_pct"))
- noise_pct = max(band_width_pct or 0.0, (atr_percent or 0.0) * 2.0)
- pullback_to_grid_ratio = None
- if grid_step_pct and grid_step_pct > 0:
- pullback_to_grid_ratio = noise_pct / max(grid_step_pct * 100.0, 0.0001)
- recent_move_pct = 0.0
- recent_move_window_minutes = 0
- recent_move_direction = "mixed"
- if recent_prices:
- current_price = _safe_float(micro_raw.get("price")) or recent_prices[-1][1]
- first_price = recent_prices[0][1]
- if first_price > 0:
- recent_move_pct = ((current_price - first_price) / first_price) * 100.0
- recent_move_window_minutes = max(0, int((recent_prices[-1][0] - recent_prices[0][0]).total_seconds() / 60.0))
- if recent_move_pct > 0:
- recent_move_direction = "bullish"
- elif recent_move_pct < 0:
- recent_move_direction = "bearish"
- rapid_directional_pressure = bool(
- recent_move_direction in {"bullish", "bearish"}
- and abs(recent_move_pct) >= max(0.8, (atr_percent or 0.0) * 2.5)
- and recent_move_window_minutes >= 10
- and structural_direction == recent_move_direction
- and tactical_direction == recent_move_direction
- and macro_bias == recent_move_direction
- )
- if breakout and isinstance(breakout, dict):
- rapid_directional_pressure = bool(
- rapid_directional_pressure
- or (
- breakout.get("persistent")
- and str(breakout.get("macro_bias") or "") == recent_move_direction
- and str(breakout.get("meso_bias") or "") == recent_move_direction
- and str(breakout.get("micro_bias") or "") == recent_move_direction
- and abs(recent_move_pct) >= max(0.6, (atr_percent or 0.0) * 1.8)
- )
- )
- rapid_downside_pressure = bool(rapid_directional_pressure and recent_move_direction == "bearish")
- short_term_trend_score = _short_term_trend_manifest_score(narrative_payload, structural_direction)
- harvestability_score = tactical_range_quality * 0.45
- if pullback_to_grid_ratio is not None:
- harvestability_score += min(pullback_to_grid_ratio, 2.0) * 0.22
- elif atr_percent is not None:
- harvestability_score += min((atr_percent or 0.0) / 0.5, 1.0) * 0.18
- if tactical_easing:
- harvestability_score += 0.18
- if micro_location in {"centered", "lower_half", "upper_half"}:
- harvestability_score += 0.08
- if breakout_persistence >= 1.0 and not tactical_easing and tactical_strength >= 0.5:
- harvestability_score -= 0.3
- harvestability_score = round(_clamp(harvestability_score, 0.0, 1.0), 4)
- inventory_state = _inventory_state_label(wallet_state.get("inventory_state"))
- within_rebalance_tolerance = _wallet_within_rebalance_tolerance(wallet_state, 0.3)
- if wallet_state.get("grid_ready"):
- wallet_grid_usability = 1.0
- elif within_rebalance_tolerance:
- wallet_grid_usability = 0.78
- elif inventory_state in {"base_heavy", "quote_heavy"}:
- wallet_grid_usability = 0.42
- elif inventory_state in SEVERE_INVENTORY_STATES:
- wallet_grid_usability = 0.12
- else:
- wallet_grid_usability = 0.3
- if scoped:
- trend_hold_strength = (
- structural_strength * 0.34
- + tactical_strength * 0.24
- + breakout_persistence * 0.14
- + min(short_term_trend_score, 1.0) * 0.10
- + continuation * 0.18
- )
- else:
- trend_hold_strength = continuation * 0.9 + breakout_persistence * 0.1
- if tactical_easing:
- trend_hold_strength -= 0.18
- if tactical_direction not in {"mixed", structural_direction}:
- trend_hold_strength -= 0.16
- if short_term_trend_score < short_term_trend_min_score:
- trend_hold_strength -= min(short_term_trend_min_score - short_term_trend_score, 0.25)
- trend_hold_strength = round(_clamp(trend_hold_strength, 0.0, 1.0), 4)
- trend_following_pressure = bool(
- (
- structural_direction in {"bullish", "bearish"}
- and tactical_direction == structural_direction
- and breakout_persistence >= breakout_persistence_min
- and trend_hold_strength >= trend_hold_threshold
- )
- or (
- not scoped
- and continuation >= 0.7
- and not tactical_easing
- )
- )
- grid_harvestable_now = bool(
- harvestability_score >= 0.48
- and wallet_grid_usability >= 0.35
- )
- rebalancer_release_ready = bool(
- within_rebalance_tolerance
- and (
- (
- harvestability_score >= 0.35
- and (tactical_easing or breakout_persistence < 1.0 or tactical_range_quality >= 0.35)
- )
- or (wallet_state.get("grid_ready") and breakout_persistence < 1.0)
- or (tactical_range_quality >= grid_release_threshold and breakout_persistence < 0.75)
- )
- )
- return {
- "structural_direction": structural_direction,
- "structural_trend_strength": structural_strength,
- "tactical_direction": tactical_direction,
- "tactical_trend_strength": tactical_strength,
- "tactical_range_quality": tactical_range_quality,
- "tactical_easing": tactical_easing,
- "breakout_persistence_score": round(breakout_persistence, 4),
- "micro_location": micro_location,
- "micro_atr_percent": atr_percent,
- "micro_bollinger_width_pct": band_width_pct,
- "grid_step_pct": round(grid_step_pct, 6) if grid_step_pct is not None else None,
- "pullback_to_grid_ratio": round(pullback_to_grid_ratio, 4) if pullback_to_grid_ratio is not None else None,
- "grid_harvestability_score": harvestability_score,
- "wallet_grid_usability": round(wallet_grid_usability, 4),
- "within_rebalance_tolerance": within_rebalance_tolerance,
- "rebalance_tolerance": 0.3,
- "trend_following_pressure": trend_following_pressure,
- "trend_hold_strength": trend_hold_strength,
- "trend_hold_threshold": round(trend_hold_threshold, 4),
- "rapid_directional_pressure": rapid_directional_pressure,
- "rapid_downside_pressure": rapid_downside_pressure,
- "recent_move_pct": round(recent_move_pct, 4),
- "recent_move_window_minutes": recent_move_window_minutes,
- "short_term_trend_min_score": round(short_term_trend_min_score, 4),
- "breakout_persistence_min": round(breakout_persistence_min, 4),
- "grid_release_threshold": round(grid_release_threshold, 4),
- "short_term_trend_score": short_term_trend_score,
- "grid_harvestable_now": grid_harvestable_now,
- "rebalancer_release_ready": rebalancer_release_ready,
- }
- def _strategy_trade_side(strategy: dict[str, Any]) -> str:
- config = strategy.get("config") if isinstance(strategy.get("config"), dict) else {}
- state = strategy.get("state") if isinstance(strategy.get("state"), dict) else {}
- side = str(config.get("trade_side") or state.get("trade_side") or strategy.get("trade_side") or "both").strip().lower()
- return side if side in {"buy", "sell", "both"} else "both"
- def _trend_handoff_level_threshold(breakout: dict[str, Any]) -> float:
- memory = breakout.get("time_window_memory") if isinstance(breakout.get("time_window_memory"), dict) else {}
- if bool(memory.get("promoted_to_confirmed")):
- return 2.0
- return 2.75
- def _grid_switch_tradeoff(*,
- current_primary: dict[str, Any],
- wallet_state: dict[str, Any],
- breakout: dict[str, Any],
- grid_fill: dict[str, Any],
- grid_pressure: dict[str, Any],
- directional_micro_clear: bool,
- decision_signals: dict[str, Any],
- trend: dict[str, Any] | None,
- ) -> dict[str, Any]:
- inventory_state = _inventory_state_label(wallet_state.get("inventory_state"))
- supervision = current_primary.get("supervision") if isinstance(current_primary.get("supervision"), dict) else {}
- open_order_count = int(current_primary.get("open_order_count") or 0)
- if not open_order_count:
- state = current_primary.get("state") if isinstance(current_primary.get("state"), dict) else {}
- open_order_count = int(state.get("open_order_count") or len(state.get("orders") or []) or 0)
- adverse_side = str(supervision.get("adverse_side") or "unknown")
- adverse_count = int(supervision.get("adverse_side_open_order_count") or 0)
- adverse_notional = float(supervision.get("adverse_side_open_order_notional_quote") or 0.0)
- adverse_distance = _safe_float(supervision.get("adverse_side_nearest_distance_pct"))
- base_order_notional = 1.0
- config = current_primary.get("config") if isinstance(current_primary.get("config"), dict) else {}
- for candidate in (config.get("order_notional_quote"), config.get("max_order_notional_quote")):
- candidate_value = _safe_float(candidate)
- if candidate_value and candidate_value > base_order_notional:
- base_order_notional = candidate_value
- trend_score = float(trend.get("score") or 0.0) if trend else 0.0
- structural_strength = float(decision_signals.get("structural_trend_strength") or 0.0)
- tactical_strength = float(decision_signals.get("tactical_trend_strength") or 0.0)
- harvestability_score = float(decision_signals.get("grid_harvestability_score") or 0.0)
- breakout_score = float(breakout.get("score") or 0.0)
- short_term_trend_score = float(decision_signals.get("short_term_trend_score") or 0.0)
- levels = float(grid_pressure.get("levels") or 0.0)
- near_fill = bool(grid_fill.get("near_fill"))
- fill_fights = _grid_fill_fights_breakout(grid_fill, breakout)
- persistent = bool(breakout.get("persistent"))
- trend_ready = bool(decision_signals.get("trend_following_pressure")) and directional_micro_clear
- stay_cost = 0.0
- switch_benefit = 0.0
- if persistent:
- switch_benefit += 0.28
- if trend_ready:
- switch_benefit += 0.34
- # Requirement: ignore nearby fill timing/side when estimating the stay-vs-switch tradeoff.
- if levels >= _trend_handoff_level_threshold(breakout):
- switch_benefit += 0.18
- switch_benefit += structural_strength * 0.26
- switch_benefit += tactical_strength * 0.16
- switch_benefit += min(trend_score, 2.0) * 0.04
- switch_benefit += min(breakout_score, 5.0) * 0.04
- if short_term_trend_score < 0.68:
- short_term_gap = 0.68 - short_term_trend_score
- switch_benefit -= short_term_gap * 1.15
- stay_cost += short_term_gap * 0.42
- if adverse_side in {"buy", "sell"} and adverse_count > 0:
- adverse_notional_ratio = adverse_notional / max(base_order_notional, 1.0)
- switch_benefit += min(adverse_count, 8) * 0.02
- if adverse_distance is not None and adverse_distance <= 1.25:
- switch_benefit += 0.08
- stay_cost += min(adverse_notional_ratio, 4.0) * 0.07
- else:
- adverse_notional_ratio = 0.0
- if inventory_state == "balanced":
- stay_cost += 0.06
- elif inventory_state in {"base_heavy", "quote_heavy"}:
- stay_cost += 0.16
- elif inventory_state in SEVERE_INVENTORY_STATES:
- stay_cost += 0.28
- else:
- stay_cost += 0.1
- stay_cost += min(levels, 6.0) * 0.06
- stay_cost += min(open_order_count, 8) * 0.025
- # Requirement: ignore nearby fill timing/side when estimating the stay-vs-switch tradeoff.
- if not persistent:
- stay_cost += 0.12
- if adverse_notional_ratio >= 1.0:
- stay_cost += 0.08
- stay_cost += harvestability_score * 0.18
- margin = round(switch_benefit - stay_cost, 4)
- should_switch = persistent and trend_ready and margin > 0.0
- return {
- "trend_score": round(trend_score, 4),
- "structural_trend_strength": round(structural_strength, 4),
- "tactical_trend_strength": round(tactical_strength, 4),
- "grid_harvestability_score": round(harvestability_score, 4),
- "short_term_trend_score": round(short_term_trend_score, 4),
- "breakout_score": round(breakout_score, 4),
- "switch_benefit": round(switch_benefit, 4),
- "stay_cost": round(stay_cost, 4),
- "margin": margin,
- "should_switch": should_switch,
- "trend_ready": trend_ready,
- "persistent": persistent,
- "levels": round(levels, 4),
- "open_order_count": open_order_count,
- "near_fill": near_fill,
- "fill_fights": fill_fights,
- "adverse_side": adverse_side,
- "adverse_side_open_order_count": adverse_count,
- "adverse_side_open_order_notional_quote": round(adverse_notional, 4),
- "adverse_side_nearest_distance_pct": round(adverse_distance, 4) if adverse_distance is not None else None,
- "inventory_state": inventory_state,
- }
- def _grid_trend_pressure(strategy: dict[str, Any], narrative_payload: dict[str, Any]) -> dict[str, Any]:
- state = strategy.get("state") if isinstance(strategy.get("state"), dict) else {}
- config = strategy.get("config") if isinstance(strategy.get("config"), dict) else {}
- features = narrative_payload.get("features_by_timeframe") if isinstance(narrative_payload.get("features_by_timeframe"), dict) else {}
- micro_raw = features.get("1m", {}).get("raw", {}) if isinstance(features.get("1m"), dict) else {}
- current_price = _safe_float(micro_raw.get("price") or state.get("last_price") or state.get("center_price"))
- center_price = _safe_float(state.get("center_price") or state.get("last_price"))
- step_pct = _safe_float(config.get("grid_step_pct") or state.get("grid_step_pct") or state.get("recenter_pct_live")) or 0.0
- if not current_price or not center_price or current_price <= 0 or center_price <= 0 or step_pct <= 0:
- return {"levels": 0.0, "rounded_levels": 0, "direction": "unknown", "current_price": current_price, "center_price": center_price, "step_pct": step_pct}
- distance_pct = abs(current_price - center_price) / center_price
- levels = distance_pct / step_pct
- direction = "bullish" if current_price > center_price else "bearish" if current_price < center_price else "flat"
- return {
- "levels": round(levels, 4),
- "rounded_levels": int(levels),
- "direction": direction,
- "current_price": current_price,
- "center_price": center_price,
- "step_pct": step_pct,
- "distance_pct": round(distance_pct, 4),
- }
- def _grid_can_still_work(strategy: dict[str, Any], wallet_state: dict[str, Any], grid_fill: dict[str, Any]) -> bool:
- supervision = strategy.get("supervision") if isinstance(strategy.get("supervision"), dict) else {}
- side_capacity = supervision.get("side_capacity") if isinstance(supervision.get("side_capacity"), dict) else {}
- buy_capacity = bool(side_capacity.get("buy", False))
- sell_capacity = bool(side_capacity.get("sell", False))
- open_order_count = int(strategy.get("open_order_count") or 0)
- degraded = bool(supervision.get("degraded"))
- inventory_state = _inventory_state_label(wallet_state.get("inventory_state"))
- if degraded:
- return False
- if buy_capacity or sell_capacity:
- return True
- if open_order_count > 0:
- return True
- if grid_fill.get("near_fill"):
- return True
- return inventory_state not in SEVERE_INVENTORY_STATES
- def _grid_is_truly_stuck_for_recovery(strategy: dict[str, Any], wallet_state: dict[str, Any], grid_fill: dict[str, Any]) -> bool:
- if _grid_can_still_work(strategy, wallet_state, grid_fill):
- return False
- inventory_state = _inventory_state_label(wallet_state.get("inventory_state"))
- return wallet_state.get("rebalance_needed") and inventory_state in SEVERE_INVENTORY_STATES
- def _wallet_within_rebalance_tolerance(wallet_state: dict[str, Any], tolerance: float = 0.3) -> bool:
- imbalance = _safe_float(wallet_state.get("imbalance_score"))
- if imbalance is None:
- base_ratio = _safe_float(wallet_state.get("base_ratio"))
- if base_ratio is not None:
- imbalance = abs(base_ratio - 0.5)
- if imbalance is None:
- return str(wallet_state.get("inventory_state") or "").lower() == "balanced"
- return imbalance <= tolerance
- def _decide_for_grid(*,
- current_primary: dict[str, Any],
- stance: str,
- inventory_state: str,
- wallet_state: dict[str, Any],
- breakout: dict[str, Any],
- grid_fill: dict[str, Any],
- grid_pressure: dict[str, Any],
- directional_micro_clear: bool,
- severe_imbalance: bool,
- decision_signals: dict[str, Any],
- trend: dict[str, Any] | None,
- rebalance: dict[str, Any] | None,
- ) -> tuple[str, str, str | None, list[str], list[str]]:
- action = "keep_grid"
- mode = "observe"
- target_strategy = current_primary["id"]
- reasons: list[str] = []
- blocks: list[str] = []
- inventory_state = _inventory_state_label(inventory_state)
- # Grid is the base mode. Leave it only for a persistent breakout or when
- # the grid has genuinely lost its ability to recover on its own.
- grid_friendly_stance = stance in {"neutral_rotational", "breakout_watch", "cautious_bullish", "cautious_bearish", "fragile_bullish", "fragile_bearish"}
- grid_can_work = _grid_can_still_work(current_primary, wallet_state, grid_fill)
- grid_stuck_for_recovery = _grid_is_truly_stuck_for_recovery(current_primary, wallet_state, grid_fill)
- persistent_breakout = bool(breakout["persistent"])
- breakout_phase = str(breakout.get("phase") or "none")
- breakout_direction = _breakout_direction(breakout, stance)
- trend_handoff_ready = bool(
- trend
- and bool(decision_signals.get("trend_following_pressure"))
- and grid_pressure.get("levels", 0.0) >= _trend_handoff_level_threshold(breakout)
- )
- fill_fights_breakout = _grid_fill_fights_breakout(grid_fill, breakout)
- switch_tradeoff = _grid_switch_tradeoff(
- current_primary=current_primary,
- wallet_state=wallet_state,
- breakout=breakout,
- grid_fill=grid_fill,
- grid_pressure=grid_pressure,
- directional_micro_clear=directional_micro_clear,
- decision_signals=decision_signals,
- trend=trend,
- )
- rapid_directional = bool(decision_signals.get("rapid_directional_pressure"))
- directional_pressure = breakout_direction if breakout_direction in {"bullish", "bearish"} else "mixed"
- all_scopes_aligned = (
- directional_pressure in {"bullish", "bearish"}
- and str(decision_signals.get("structural_direction") or "") == directional_pressure
- and str(decision_signals.get("tactical_direction") or "") == directional_pressure
- and str(grid_pressure.get("direction") or "") == directional_pressure
- )
- repair_inventory_match = bool(
- (directional_pressure == "bullish" and inventory_state in {"quote_heavy", "critically_unbalanced"})
- or (directional_pressure == "bearish" and inventory_state in {"base_heavy", "critically_unbalanced"})
- )
- urgent_rebalance_exit = bool(
- rebalance
- and wallet_state.get("rebalance_needed")
- and rapid_directional
- and all_scopes_aligned
- and repair_inventory_match
- )
- if urgent_rebalance_exit:
- action = "replace_with_exposure_protector"
- target_strategy = rebalance["strategy_id"]
- mode = "act"
- reasons.append("wallet is skewed and the directional move is accelerating, so exposure repair should happen before the trend handoff")
- reasons.append(
- f"recent 1m history moved {decision_signals.get('recent_move_pct', 0.0):.2f}% over about {decision_signals.get('recent_move_window_minutes', 0)} minutes"
- )
- return action, mode, target_strategy, reasons, blocks
- urgent_trend_exit = bool(
- trend
- and persistent_breakout
- and bool(decision_signals.get("trend_following_pressure"))
- and all_scopes_aligned
- and (
- rapid_directional
- or grid_fill.get("near_fill")
- or inventory_state in SEVERE_INVENTORY_STATES
- )
- )
- if urgent_trend_exit:
- action = "replace_with_trend_follower"
- target_strategy = trend["strategy_id"] if trend else target_strategy
- mode = "act"
- reasons.append("all scopes line up and the tape is moving fast, so grid should yield early")
- if rapid_directional:
- reasons.append(
- f"recent 1m history moved {decision_signals.get('recent_move_pct', 0.0):.2f}% over about {decision_signals.get('recent_move_window_minutes', 0)} minutes"
- )
- if grid_pressure.get("levels", 0.0) < _trend_handoff_level_threshold(breakout):
- reasons.append("handoff is happening early, before the normal level threshold, because directional acceleration is sharp")
- if grid_fill.get("near_fill"):
- reasons.append("grid fill pressure is already near the market")
- return action, mode, target_strategy, reasons, blocks
- if severe_imbalance and persistent_breakout:
- reasons.append("grid imbalance now coincides with persistent breakout pressure")
- directional_inventory = _inventory_breakout_is_directionally_compatible(inventory_state, breakout)
- if switch_tradeoff["should_switch"] and trend_handoff_ready and (
- not wallet_state.get("rebalance_needed")
- or directional_inventory
- or not rebalance
- or trend["score"] >= rebalance["score"]
- ):
- action = "replace_with_trend_follower"
- target_strategy = trend["strategy_id"]
- mode = "act"
- if switch_tradeoff.get("adverse_side_open_order_count", 0) > 0:
- reasons.append(
- f"{switch_tradeoff.get('adverse_side')} ladder is exposed near market"
- )
- if directional_inventory:
- reasons.append("inventory posture can be absorbed by the directional handoff")
- reasons.append(
- f"switch benefit ({switch_tradeoff['switch_benefit']:.2f}) exceeds stay cost ({switch_tradeoff['stay_cost']:.2f})"
- )
- elif wallet_state.get("rebalance_needed") and rebalance and rebalance["score"] > 0.35:
- action = "replace_with_exposure_protector"
- target_strategy = rebalance["strategy_id"]
- mode = "act"
- else:
- action = "suspend_grid"
- mode = "warn"
- elif severe_imbalance and grid_stuck_for_recovery and not persistent_breakout and rebalance and rebalance["score"] > 0.6:
- action = "replace_with_exposure_protector"
- target_strategy = rebalance["strategy_id"]
- mode = "act"
- reasons.append("grid has lost practical recovery capacity, so inventory repair should take over")
- elif persistent_breakout and trend_handoff_ready and inventory_state in {"balanced", "base_heavy", "quote_heavy"}:
- if not switch_tradeoff["should_switch"]:
- reasons.append(
- f"breakout is persistent, but staying in grid still looks cheaper than switching (benefit {switch_tradeoff['switch_benefit']:.2f} vs cost {switch_tradeoff['stay_cost']:.2f})"
- )
- if switch_tradeoff.get("adverse_side_open_order_count", 0) > 0:
- reasons.append(
- f"{switch_tradeoff.get('adverse_side')} ladder exposure is not yet costly enough to justify the handoff"
- )
- if grid_fill.get("near_fill") and fill_fights_breakout:
- reasons.append("nearby opposing fill is only a warning here, not enough on its own to justify the handoff")
- else:
- action = "replace_with_trend_follower"
- target_strategy = trend["strategy_id"] if trend else target_strategy
- mode = "act"
- if grid_fill.get("near_fill") and fill_fights_breakout:
- reasons.append("confirmed trend should not be delayed by a nearby grid fill that trades against the move")
- elif grid_fill.get("near_fill"):
- reasons.append("confirmed directional pressure is strong enough that nearby grid fills should not delay the trend handoff")
- else:
- reasons.append("grid should yield because directional pressure is confirmed and the trend handoff is ready")
- elif not persistent_breakout and grid_can_work:
- if breakout_phase == "developing":
- reasons.append("breakout pressure is developing, but grid can still work and should not be abandoned yet")
- else:
- reasons.append("grid can still operate and self-heal, so inventory skew alone should not force a rebalance handoff")
- if decision_signals.get("grid_harvestable_now"):
- reasons.append("tactical range quality still looks harvestable for the grid")
- elif persistent_breakout and grid_fill.get("near_fill") and inventory_state in {"balanced", "base_heavy", "quote_heavy"}:
- reasons.append("grid is still close to a working fill, but trend handoff is not ready enough yet")
- elif not grid_friendly_stance and persistent_breakout:
- reasons.append("grid should yield because directional pressure is persistent across scopes")
- if trend_handoff_ready:
- action = "replace_with_trend_follower"
- target_strategy = trend["strategy_id"]
- mode = "act"
- else:
- mode = "warn"
- if grid_pressure.get("levels", 0.0) < _trend_handoff_level_threshold(breakout):
- blocks.append("grid has not yet been eaten by enough levels to justify leaving it")
- else:
- blocks.append("directional pressure is rising but the micro layer is not clear enough for a trend handoff")
- else:
- reasons.append("grid can likely self-heal because breakout pressure is not yet persistent")
- return action, mode, target_strategy, reasons, blocks
- def _decide_for_trend(*,
- current_primary: dict[str, Any],
- stance: str,
- narrative_payload: dict[str, Any],
- wallet_state: dict[str, Any],
- grid: dict[str, Any] | None,
- rebalance: dict[str, Any] | None = None,
- profile_config: dict[str, Any] | None = None,
- decision_signals: dict[str, Any] | None = None,
- ) -> tuple[str, str, str | None, list[str], list[str]]:
- action = "keep_trend"
- mode = "observe"
- target_strategy = current_primary["id"]
- reasons: list[str] = []
- blocks: list[str] = []
- decision_signals = decision_signals if isinstance(decision_signals, dict) else {}
- trend_pressure = bool(decision_signals.get("trend_following_pressure"))
- trend_hold_strength = float(decision_signals.get("trend_hold_strength") or 0.0)
- trend_hold_threshold = float(decision_signals.get("trend_hold_threshold") or 0.56)
- grid_harvestable_now = bool(decision_signals.get("grid_harvestable_now"))
- # Trend should cool into rebalancing first when the wallet is skewed, then
- # let rebalancer hand back to grid once the inventory is healthy again.
- cooling = _trend_cooling_edge(narrative_payload, wallet_state, profile_config)
- if cooling:
- if wallet_state.get("rebalance_needed") and rebalance:
- action = "replace_with_exposure_protector"
- target_strategy = rebalance["strategy_id"]
- mode = "act"
- reasons.append("trend has cooled enough that directional mode no longer justifies staying active")
- elif grid and wallet_state.get("grid_ready"):
- action = "replace_with_grid"
- target_strategy = grid["strategy_id"]
- mode = "act"
- reasons.append("trend has cooled and the tape looks suitable for grid again")
- else:
- mode = "warn"
- blocks.append("trend is easing, but neither grid nor rebalancer is ready for a clean handoff")
- elif not trend_pressure:
- if grid and wallet_state.get("grid_ready") and grid_harvestable_now:
- action = "replace_with_grid"
- target_strategy = grid["strategy_id"]
- mode = "act"
- reasons.append(f"trend hold strength {trend_hold_strength:.2f} fell below threshold {trend_hold_threshold:.2f}, so grid can resume")
- elif wallet_state.get("rebalance_needed") and rebalance:
- action = "replace_with_exposure_protector"
- target_strategy = rebalance["strategy_id"]
- mode = "act"
- reasons.append(f"trend hold strength {trend_hold_strength:.2f} fell below threshold {trend_hold_threshold:.2f}, so directional mode should yield")
- else:
- action = "hold_trend"
- mode = "warn"
- blocks.append(f"trend hold strength {trend_hold_strength:.2f} is below threshold {trend_hold_threshold:.2f}, but no clean handoff is available yet")
- elif stance == "neutral_rotational":
- if wallet_state.get("rebalance_needed") and rebalance:
- action = "replace_with_exposure_protector"
- target_strategy = rebalance["strategy_id"]
- mode = "act"
- reasons.append("trend conditions have cooled and rebalancing should repair the wallet before grid resumes")
- elif grid and wallet_state.get("grid_ready"):
- action = "replace_with_grid"
- target_strategy = grid["strategy_id"]
- mode = "act"
- reasons.append("trend conditions have cooled and wallet is grid-ready again")
- elif wallet_state.get("rebalance_needed"):
- mode = "warn"
- blocks.append("trend has cooled but rebalancing should be the next hop")
- else:
- action = "hold_trend"
- blocks.append("grid candidate not strong enough yet")
- else:
- reasons.append(f"trend hold strength {trend_hold_strength:.2f} still clears threshold {trend_hold_threshold:.2f}")
- return action, mode, target_strategy, reasons, blocks
- def _decide_for_rebalancer(*,
- current_primary: dict[str, Any],
- stance: str,
- wallet_state: dict[str, Any],
- grid: dict[str, Any] | None,
- decision_signals: dict[str, Any],
- trend: dict[str, Any] | None = None,
- decision_profile: dict[str, Any] | None = None,
- ) -> tuple[str, str, str | None, list[str], list[str]]:
- action = "keep_rebalancer"
- mode = "observe"
- target_strategy = current_primary["id"]
- reasons: list[str] = []
- blocks: list[str] = []
- # Rebalancing is a repair phase. Once the wallet is usable again, Hermes
- # should prefer handing back to grid, not directly to trend.
- trend_strength = float(trend["score"]) if trend and isinstance(trend.get("score"), (int, float)) else 0.0
- within_tolerance = bool(decision_signals.get("within_rebalance_tolerance"))
- release_ready = bool(decision_signals.get("rebalancer_release_ready"))
- trend_pressure = bool(decision_signals.get("trend_following_pressure"))
- grid_harvestable_now = bool(decision_signals.get("grid_harvestable_now"))
- profile_config = _decision_profile_config(decision_profile)
- force_grid_when_balanced = bool(profile_config.get("force_grid_when_balanced", True))
- hold_rebalancer_until_cooldown = bool(profile_config.get("hold_rebalancer_until_cooldown", False))
- if wallet_state.get("grid_ready") and grid and force_grid_when_balanced and not hold_rebalancer_until_cooldown:
- action = "replace_with_grid"
- target_strategy = grid["strategy_id"]
- mode = "act"
- reasons.append("wallet is rebalanced, so grid should resume first and let the tape prove itself again")
- elif trend_pressure and not release_ready:
- blocks.append("trend is still strong enough that rebalancer should keep repairing instead of resetting to grid")
- elif release_ready:
- if grid:
- action = "replace_with_grid"
- target_strategy = grid["strategy_id"]
- mode = "act"
- reasons.append("wallet is usable enough and micro conditions are easing, so grid can resume harvesting")
- else:
- blocks.append("wallet is within the rebalance tolerance but no grid candidate is available")
- elif within_tolerance and not grid_harvestable_now:
- blocks.append("wallet is close enough, but the local tape is still not harvestable enough for grid release")
- elif wallet_state.get("grid_ready") and stance == "neutral_rotational":
- if grid and grid["score"] >= 0.5:
- action = "replace_with_grid"
- target_strategy = grid["strategy_id"]
- mode = "act"
- reasons.append("rebalance is complete and rotational conditions support grid again")
- else:
- blocks.append("wallet is ready but grid fit is still too weak")
- elif grid and grid_harvestable_now:
- action = "replace_with_grid"
- target_strategy = grid["strategy_id"]
- mode = "act"
- reasons.append("local price action looks harvestable enough that grid can resume before perfect balance")
- else:
- blocks.append("trend candidate is not strong enough yet and grid fit is not ready, so rebalancer should not hand directly back to trend")
- return action, mode, target_strategy, reasons, blocks
- def make_decision(*, concern: dict[str, Any], narrative_payload: dict[str, Any], wallet_state: dict[str, Any], strategies: list[dict[str, Any]], history_window: dict[str, Any] | None = None, decision_profile: dict[str, Any] | None = None) -> DecisionSnapshot:
- concern_account_id = str(concern.get("account_id") or "")
- concern_market_symbol = str(concern.get("market_symbol") or "").strip().lower()
- normalized = [
- normalize_strategy_snapshot(s)
- for s in strategies
- if str(s.get("account_id") or "") == concern_account_id
- and (
- not concern_market_symbol
- or not str(s.get("market_symbol") or "").strip()
- or str(s.get("market_symbol") or "").strip().lower() == concern_market_symbol
- )
- ]
- breakout = _grid_breakout_pressure(narrative_payload, history_window=history_window)
- narrative_for_scoring = {**narrative_payload, "grid_breakout_pressure": breakout}
- fit_reports = [score_strategy_fit(strategy=s, narrative=narrative_for_scoring, wallet_state=wallet_state) for s in normalized]
- ranked = sorted(fit_reports, key=lambda item: item["score"], reverse=True)
- current_primary = _select_current_primary(normalized)
- best = ranked[0] if ranked else None
- stance = str(narrative_payload.get("stance") or "neutral_rotational")
- inventory_state = _inventory_state_label(wallet_state.get("inventory_state"))
- scoped = narrative_payload.get("scoped_state") if isinstance(narrative_payload.get("scoped_state"), dict) else {}
- micro = scoped.get("micro") if isinstance(scoped.get("micro"), dict) else {}
- micro_impulse = str(micro.get("impulse") or "mixed")
- micro_bias = str(micro.get("trend_bias") or "mixed")
- micro_reversal_risk = str(micro.get("reversal_risk") or "low")
- bullish_micro_clear = micro_impulse == "up" and micro_bias == "bullish" and micro_reversal_risk != "high"
- bearish_micro_clear = micro_impulse == "down" and micro_bias == "bearish" and micro_reversal_risk != "high"
- breakout_direction = _breakout_direction(breakout, stance)
- directional_micro_clear = bullish_micro_clear if breakout_direction == "bullish" else bearish_micro_clear if breakout_direction == "bearish" else False
- grid_fill = _grid_fill_proximity(current_primary, narrative_payload) if current_primary and current_primary["strategy_type"] == "grid_trader" else {"near_fill": False}
- grid_pressure = _grid_trend_pressure(current_primary, narrative_payload) if current_primary and current_primary["strategy_type"] == "grid_trader" else {"levels": 0.0, "rounded_levels": 0, "direction": "unknown"}
- severe_imbalance = inventory_state in SEVERE_INVENTORY_STATES
- action = "hold"
- mode = "observe"
- target_strategy = current_primary.get("id") if current_primary else (best.get("strategy_id") if best else None)
- reasons: list[str] = []
- blocks: list[str] = []
- trend = next((r for r in ranked if r["strategy_type"] == "trend_follower"), None)
- rebalance = next((r for r in ranked if r["strategy_type"] == "exposure_protector"), None)
- grid = next((r for r in ranked if r["strategy_type"] == "grid_trader"), None)
- grid_strategy = next((s for s in normalized if s["strategy_type"] == "grid_trader"), None)
- decision_signals = _extract_decision_signals(
- narrative_payload=narrative_payload,
- wallet_state=wallet_state,
- grid_strategy=grid_strategy,
- breakout=breakout,
- history_window=history_window,
- decision_profile=decision_profile,
- )
- switch_tradeoff: dict[str, Any] = {}
- if current_primary and current_primary["strategy_type"] == "grid_trader":
- action, mode, target_strategy, reasons, blocks = _decide_for_grid(
- current_primary=current_primary,
- stance=stance,
- inventory_state=inventory_state,
- wallet_state=wallet_state,
- breakout=breakout,
- grid_fill=grid_fill,
- grid_pressure=grid_pressure,
- directional_micro_clear=directional_micro_clear,
- severe_imbalance=severe_imbalance,
- decision_signals=decision_signals,
- trend=trend,
- rebalance=rebalance,
- )
- switch_tradeoff = _grid_switch_tradeoff(
- current_primary=current_primary,
- wallet_state=wallet_state,
- breakout=breakout,
- grid_fill=grid_fill,
- grid_pressure=grid_pressure,
- directional_micro_clear=directional_micro_clear,
- decision_signals=decision_signals,
- trend=trend,
- )
- elif current_primary and current_primary["strategy_type"] == "trend_follower":
- action, mode, target_strategy, reasons, blocks = _decide_for_trend(
- current_primary=current_primary,
- stance=stance,
- narrative_payload=narrative_payload,
- wallet_state=wallet_state,
- grid=grid,
- rebalance=rebalance,
- profile_config=_decision_profile_config(decision_profile),
- decision_signals=decision_signals,
- )
- elif current_primary and current_primary["strategy_type"] == "exposure_protector":
- action, mode, target_strategy, reasons, blocks = _decide_for_rebalancer(
- current_primary=current_primary,
- stance=stance,
- wallet_state=wallet_state,
- grid=grid,
- decision_signals=decision_signals,
- trend=trend,
- decision_profile=decision_profile,
- )
- else:
- if best and best["score"] >= 0.55:
- action = f"enable_{best['strategy_type']}"
- target_strategy = best["strategy_id"]
- mode = "act"
- reasons.extend(best["reasons"])
- else:
- action = "wait"
- mode = "observe"
- blocks.append("no strategy is yet a strong enough fit")
- profile_config = _decision_profile_config(decision_profile)
- switch_cost_penalty = _safe_float(profile_config.get("switch_cost_penalty"))
- if switch_cost_penalty is None:
- switch_cost_penalty = 1.0
- action_cooldown_seconds = int(_safe_float(profile_config.get("action_cooldown_seconds")) or 0)
- current_score = float(next((r["score"] for r in ranked if current_primary and r["strategy_id"] == current_primary.get("id")), 0.0))
- target_score = float(next((r["score"] for r in ranked if r["strategy_id"] == target_strategy), current_score))
- switch_edge = round(target_score - current_score, 4)
- required_switch_edge = round(max(switch_cost_penalty - 1.0, 0.0) * 0.08, 4)
- cooldown_active, cooldown_remaining, cooldown_action = _recent_switch_cooldown_active(history_window, str(concern.get("id") or ""), action_cooldown_seconds)
- if mode == "act" and current_primary and target_strategy and target_strategy != current_primary.get("id"):
- if required_switch_edge > 0 and switch_edge < required_switch_edge:
- mode = "observe"
- action = f"keep_{current_primary['strategy_type'].replace('_trader', '').replace('_follower', '').replace('exposure_protector', 'rebalancer')}"
- target_strategy = current_primary.get("id")
- reasons = []
- blocks = [f"switch edge {switch_edge:.2f} is below required friction {required_switch_edge:.2f}"]
- elif cooldown_active:
- mode = "observe"
- action = f"keep_{current_primary['strategy_type'].replace('_trader', '').replace('_follower', '').replace('exposure_protector', 'rebalancer')}"
- target_strategy = current_primary.get("id")
- reasons = []
- blocks = [f"switch cooldown active for {cooldown_remaining:.0f}s after {cooldown_action or 'recent switch'}"]
- reason_summary = reasons[0] if reasons else (blocks[0] if blocks else "strategy posture unchanged")
- confidence = float(narrative_payload.get("confidence") or 0.4)
- if action.startswith("replace_with") or action.startswith("enable_"):
- confidence += 0.08
- if wallet_state.get("rebalance_needed") and "grid" in action:
- confidence -= 0.08
- confidence = round(_clamp(confidence, 0.2, 0.95), 3)
- payload = {
- "generated_at": datetime.now(timezone.utc).isoformat(),
- "wallet_state": wallet_state,
- "narrative_stance": stance,
- "strategy_fit_ranking": ranked,
- "current_primary_strategy": current_primary.get("id") if current_primary else None,
- "argus_decision_context": _argus_decision_context(narrative_payload),
- "history_window": history_window or {},
- "grid_breakout_pressure": breakout,
- "grid_fill_context": grid_fill,
- "grid_switch_tradeoff": switch_tradeoff if current_primary and current_primary["strategy_type"] == "grid_trader" else {},
- "decision_audit": decision_signals,
- "switch_friction": {
- "switch_cost_penalty": round(switch_cost_penalty, 4),
- "switch_edge": switch_edge,
- "required_switch_edge": required_switch_edge,
- "action_cooldown_seconds": action_cooldown_seconds,
- "cooldown_active": cooldown_active,
- "cooldown_remaining_seconds": cooldown_remaining,
- },
- "decision_profile": {
- "id": decision_profile.get("id") if isinstance(decision_profile, dict) else None,
- "name": decision_profile.get("name") if isinstance(decision_profile, dict) else None,
- "status": decision_profile.get("status") if isinstance(decision_profile, dict) else None,
- "config": _decision_profile_config(decision_profile),
- } if decision_profile else None,
- "reason_chain": reasons,
- "blocks": blocks,
- "decision_version": 3,
- }
- return DecisionSnapshot(
- mode=mode,
- action=action,
- target_strategy=target_strategy,
- reason_summary=reason_summary,
- confidence=confidence,
- requires_action=mode == "act",
- payload=payload,
- )
|