from __future__ import annotations from contextlib import asynccontextmanager import asyncio import json import time from datetime import datetime, timezone from uuid import uuid4 import anyio from fastapi import FastAPI, Request from fastapi.responses import JSONResponse from mcp.server.fastmcp import FastMCP from mcp.server.transport_security import TransportSecuritySettings from mcp import ClientSession from mcp.client.sse import sse_client from .config import load_config from .argus_client import get_regime as argus_get_regime, get_snapshot as argus_get_snapshot from .crypto_client import get_price, get_regime from .decision_engine import assess_wallet_state from .decision_families import make_family_decision from .narrative_engine import build_narrative from .replay import build_replay_input from .state_engine import synthesize_state from .store import delete_concern, get_decision_profile, get_state, init_db, list_concerns, list_strategy_assignments, list_strategy_groups, latest_cycle, latest_cycles, latest_decisions, latest_narratives, latest_observations, latest_regime_samples, prune_older_than, recent_regime_samples, recent_states_for_concern, sync_concerns_from_strategies, upsert_concern, upsert_cycle, upsert_decision, upsert_decision_profile, upsert_narrative, upsert_observation, upsert_regime_sample, upsert_state, latest_states, upsert_strategy_assignment, upsert_strategy_group from .trader_client import apply_control_decision as trader_apply_control_decision, cancel_all_orders as trader_cancel_all_orders, get_strategy as trader_get_strategy, list_strategies mcp = FastMCP( "hermes-mcp", transport_security=TransportSecuritySettings(enable_dns_rebinding_protection=False), ) def _build_trader_control_payload(*, decision_id: str, concern: dict, decision: object) -> dict | None: action = str(getattr(decision, "action", "") or "").strip() target_strategy = str(getattr(decision, "target_strategy", "") or "").strip() or None decision_payload = getattr(decision, "payload", {}) if isinstance(getattr(decision, "payload", {}), dict) else {} current_primary = str(decision_payload.get("current_primary_strategy") or "").strip() or None trader_action: str | None = None risk_mode: str | None = None if action.startswith("replace_with_") or action.startswith("enable_"): trader_action = "switch" elif action == "suspend_grid": trader_action = "pause" target_strategy = current_primary elif action == "set_risk_mode": trader_action = "set_risk_mode" risk_mode = str(decision_payload.get("risk_mode") or "").strip() or None else: return None account_id = str(concern.get("account_id") or "").strip() market_symbol = str(concern.get("market_symbol") or "").strip().lower() concern_id = str(concern.get("id") or "").strip() or None reason = str(getattr(decision, "reason_summary", "") or "").strip() confidence = float(getattr(decision, "confidence", 0.0) or 0.0) payload = { "decision_id": decision_id, "concern_id": concern_id, "account_id": account_id, "market_symbol": market_symbol, "action": trader_action, "target_strategy_id": target_strategy, "expected_active_strategy_id": current_primary, "risk_mode": risk_mode, "reason": reason, "confidence": confidence, "dry_run": False, "override": False, "source": "hermes-mcp", "source_action": action, } return payload async def _maybe_dispatch_trader_action(*, cfg: object, decision_id: str, concern: dict, decision: object, trader_available: bool = True, retry_after_seconds: int | None = None) -> dict: if str(concern.get("status") or "active").strip().lower() != "active": return { "dispatch": "blocked", "reason": "concern is inactive", } if not bool(getattr(decision, "requires_action", False)): return {"dispatch": "not_required"} payload = _build_trader_control_payload(decision_id=decision_id, concern=concern, decision=decision) if payload is None: return { "dispatch": "skipped", "reason": f"no trader action mapping for {getattr(decision, 'action', 'unknown')}", } if not bool(getattr(cfg, "hermes_allow_actions", False)): return { "dispatch": "blocked", "reason": "HERMES_ALLOW_ACTIONS is false", "payload": payload, } if not trader_available: return { "dispatch": "deferred", "reason": "trader unavailable", "retry_after_seconds": retry_after_seconds, "payload": payload, } try: result = await trader_apply_control_decision(getattr(cfg, "trader_url"), payload) return { "dispatch": "sent", "payload": payload, "result": result, } except Exception as exc: return { "dispatch": "failed", "payload": payload, "error": str(exc), } @mcp.tool(description="Return Hermes current state, narrative, uncertainty, and a short self-assessment report.") def report() -> dict: state = get_state() cfg = load_config() concerns = list_concerns() groups_by_concern: dict[str, list[dict[str, Any]]] = {} for group in list_strategy_groups(): groups_by_concern.setdefault(str(group.get("concern_id") or ""), []).append(group) try: accounts_by_id, markets_by_symbol, total_values = anyio.run(_load_exec_enrichment, cfg.exec_url, cfg.crypto_url, concerns) except Exception: accounts_by_id, markets_by_symbol, total_values = {}, {}, {} concern_summaries = [] for concern in concerns: concern_id = str(concern.get("id") or "") account_id = str(concern.get("account_id") or "").strip() market_symbol = str(concern.get("market_symbol") or "").strip().lower() account_info = accounts_by_id.get(account_id, {}) market_info = markets_by_symbol.get(market_symbol, {}) groups = groups_by_concern.get(concern_id, []) active_playbook = next((g for g in groups if str(g.get("status") or "").lower() == "active"), None) assignments = list_strategy_assignments(strategy_group_id=str(active_playbook.get("id") or "")) if active_playbook else [] concern_summaries.append({ "concern_id": concern_id, "account_id": account_id or None, "account": account_info.get("display_name") or account_id or None, "market_symbol": str(concern.get("market_symbol") or "") or None, "market": market_info.get("name") or str(concern.get("market_symbol") or "") or None, "status": str(concern.get("status") or "active"), "active_playbook": { "id": str(active_playbook.get("id") or "") or None, "name": str(active_playbook.get("name") or "") or None, "family": str(active_playbook.get("strategy_family") or "") or None, } if active_playbook else None, "active_strategies": [ { "strategy_id": str(a.get("strategy_id") or "") or None, "role": str(a.get("role") or "member") or "member", "strategy_type": str(a.get("strategy_type") or "") or None, } for a in assignments ], "balances": _compact_balances(account_info.get("balances") or account_info.get("balance") or account_info.get("wallets") or []), "total_value_usd": total_values.get(account_id) if total_values.get(account_id) is not None else account_info.get("total_value_usd"), }) return { "status": state.get("status", "stub"), "thinking": state.get("thinking", "Hermes scaffold is ready."), "confidence": state.get("confidence", 0.0), "uncertainty": state.get("uncertainty", ["no live adapters wired yet"]), "layers": state.get("layers", []), "concerns": concern_summaries, } @asynccontextmanager async def lifespan(_: FastAPI): cfg = load_config() init_db() trader_gate = {"failures": 0, "down_until": 0.0, "last_error": "", "last_ok": 0.0} cached_strategy_inventory: list[dict] = [] def _trader_available() -> bool: return time.monotonic() >= float(trader_gate["down_until"] or 0.0) def _mark_trader_success() -> None: trader_gate["failures"] = 0 trader_gate["down_until"] = 0.0 trader_gate["last_error"] = "" trader_gate["last_ok"] = time.monotonic() def _mark_trader_failure(error: Exception) -> None: failures = int(trader_gate["failures"] or 0) + 1 trader_gate["failures"] = failures trader_gate["last_error"] = str(error) backoff = min(300, max(5, 5 * (2 ** min(failures - 1, 5)))) trader_gate["down_until"] = time.monotonic() + backoff try: sync_concerns_from_strategies(await list_strategies(cfg.trader_url)) except Exception: pass try: prune_older_than(cfg.retention_days) except Exception: pass async def _poll_loop() -> None: nonlocal cached_strategy_inventory while True: started = datetime.now(timezone.utc).isoformat() cycle_id = str(uuid4()) concerns = list_concerns() profile_ids = sorted({str(c.get("decision_profile_id") or "").strip() for c in concerns if str(c.get("decision_profile_id") or "").strip()}) decision_profiles = {} for profile_id in profile_ids: profile = get_decision_profile(profile_id=profile_id) if not profile: continue try: profile_config = json.loads(profile.get("config_json") or "{}") except Exception: profile_config = {} if isinstance(profile_config, dict): decision_profiles[profile_id] = {**profile, "config": profile_config} playbook_groups = list_strategy_groups() playbook_assignments = { str(group.get("id") or ""): list_strategy_assignments(strategy_group_id=str(group.get("id") or "")) for group in playbook_groups } strategy_inventory = cached_strategy_inventory if _trader_available(): try: strategy_inventory = await list_strategies(cfg.trader_url) enriched_inventory = [] for strategy in strategy_inventory: instance_id = str(strategy.get("id") or "").strip() if not instance_id: enriched_inventory.append(strategy) continue try: detail = await trader_get_strategy(cfg.trader_url, instance_id, include_state=True, include_report=True) enriched_inventory.append({**strategy, **detail}) except Exception: enriched_inventory.append(strategy) strategy_inventory = enriched_inventory cached_strategy_inventory = strategy_inventory _mark_trader_success() except Exception as exc: _mark_trader_failure(exc) strategy_inventory = cached_strategy_inventory try: sync_concerns_from_strategies(strategy_inventory) except Exception: pass upsert_cycle(id=cycle_id, started_at=started, finished_at=None, status="running", trigger="interval", notes=f"polling {len(concerns)} concerns") argus_snapshot: dict = {} argus_regime: dict = {} try: argus_snapshot = await argus_get_snapshot(cfg.argus_url) except Exception: argus_snapshot = {} try: argus_regime = await argus_get_regime(cfg.argus_url) except Exception: argus_regime = {} if argus_snapshot or argus_regime: upsert_observation( id=f"{cycle_id}:argus", cycle_id=cycle_id, concern_id=None, source="argus-mcp", kind="macro_snapshot", payload_json=json.dumps({"snapshot": argus_snapshot, "regime": argus_regime}, ensure_ascii=False), observed_at=datetime.now(timezone.utc).isoformat(), ) for concern in concerns: symbol = _resolve_regime_symbol(concern) if not symbol: continue concern_id = str(concern.get("id") or "") account_id = str(concern.get("account_id") or "").strip() account_info = {} if account_id: try: payload = await _call_exec_tool(cfg.exec_url, "get_account_info", {"account_id": account_id}) account_info = payload if isinstance(payload, dict) else {} except Exception: account_info = {} current_regimes: list[dict] = [] for timeframe in cfg.crypto_timeframes: regime = await get_regime(cfg.crypto_url, str(symbol), timeframe) current_regimes.append({**regime, "timeframe": timeframe}) upsert_regime_sample( id=f"{cycle_id}:{concern['id']}:{timeframe}", cycle_id=cycle_id, concern_id=str(concern["id"]), timeframe=timeframe, regime_json=json.dumps(regime, ensure_ascii=False), captured_at=datetime.now(timezone.utc).isoformat(), ) try: state = synthesize_state( concern=concern, regimes=current_regimes, account_info=account_info, argus_snapshot=argus_snapshot, argus_regime=argus_regime, ) upsert_state( id=f"{cycle_id}:{concern['id']}", cycle_id=cycle_id, concern_id=str(concern["id"]), market_regime=state.market_regime, volatility_state=state.volatility_state, liquidity_state=state.liquidity_state, sentiment_pressure=state.sentiment_pressure, event_risk=state.event_risk, execution_quality=state.execution_quality, confidence=state.confidence, payload_json=json.dumps(state.payload, ensure_ascii=False), created_at=state.payload.get("generated_at"), ) narrative = build_narrative(concern=concern, state_payload=state.payload) upsert_narrative( id=f"{cycle_id}:{concern['id']}", cycle_id=cycle_id, concern_id=str(concern["id"]), summary=narrative.summary, key_drivers_json=json.dumps(narrative.key_drivers, ensure_ascii=False), risk_flags_json=json.dumps(narrative.risk_flags, ensure_ascii=False), uncertainties_json=json.dumps(narrative.uncertainties, ensure_ascii=False), confidence=narrative.confidence, created_at=narrative.payload.get("generated_at"), ) latest_price = None if current_regimes: latest_price = next((r.get("price") for r in reversed(current_regimes) if r.get("price") is not None), None) wallet_state = assess_wallet_state( account_info=account_info, concern=concern, price=float(latest_price) if latest_price is not None else None, strategies=strategy_inventory, ) active_playbook = next((g for g in playbook_groups if str(g.get("concern_id") or "") == concern_id and str(g.get("status") or "").lower() == "active"), None) assignment_by_strategy_id = { str(a.get("strategy_id") or "").strip(): a for a in playbook_assignments.get(str(active_playbook.get("id") or ""), []) if str(a.get("strategy_id") or "").strip() } if active_playbook else {} assigned_strategy_ids = { str(a.get("strategy_id") or "").strip() for a in playbook_assignments.get(str(active_playbook.get("id") or ""), []) if str(a.get("strategy_id") or "").strip() } if active_playbook else set() candidate_strategies = [ { **s, "playbook_role": str(assignment_by_strategy_id.get(str(s.get("id") or "").strip(), {}).get("role") or "").strip() or None, "playbook_assignment_id": str(assignment_by_strategy_id.get(str(s.get("id") or "").strip(), {}).get("id") or "").strip() or None, } for s in strategy_inventory if str(s.get("account_id") or "").strip() == account_id and str(s.get("market_symbol") or "").strip().lower() == str(concern.get("market_symbol") or "").strip().lower() and (not assigned_strategy_ids or str(s.get("id") or "").strip() in assigned_strategy_ids) ] breakout_window_seconds = max(300, int(getattr(cfg, "breakout_memory_window_seconds", 900) or 900)) recent_state_rows = recent_states_for_concern(concern_id=str(concern["id"]), since_seconds=breakout_window_seconds, limit=12) decision = make_family_decision( family=str(active_playbook.get("strategy_family") or "grid-trend-rebalancer") if active_playbook else "grid-trend-rebalancer", concern=concern, narrative_payload={ **state.payload, **narrative.payload, "confidence": narrative.confidence, }, wallet_state=wallet_state, strategies=candidate_strategies, history_window={ "window_seconds": breakout_window_seconds, "recent_states": recent_state_rows, }, decision_profile=decision_profiles.get(str(concern.get("decision_profile_id") or "").strip()), ) decision_id = f"{cycle_id}:{concern['id']}" dispatch_record = await _maybe_dispatch_trader_action( cfg=cfg, decision_id=decision_id, concern=concern, decision=decision, trader_available=_trader_available(), retry_after_seconds=max(0, int(trader_gate["down_until"] - time.monotonic())) if not _trader_available() else None, ) decision_payload = { **decision.payload, "replay_input": build_replay_input( concern=concern, narrative_payload={ **state.payload, **narrative.payload, "confidence": narrative.confidence, }, wallet_state=wallet_state, strategies=candidate_strategies, history_window={ "window_seconds": breakout_window_seconds, "recent_states": recent_state_rows, }, ), "dispatch": dispatch_record, "decision_family": str(active_playbook.get("strategy_family") or "grid-trend-rebalancer") if active_playbook else "grid-trend-rebalancer", "active_playbook_id": str(active_playbook.get("id") or "") if active_playbook else None, "candidate_strategy_ids": sorted(assigned_strategy_ids) if assigned_strategy_ids else [str(s.get("id") or "") for s in candidate_strategies if str(s.get("id") or "")], } upsert_decision( id=decision_id, cycle_id=cycle_id, concern_id=str(concern["id"]), mode=decision.mode, action=decision.action, target_strategy=decision.target_strategy, target_policy_json=json.dumps(decision_payload, ensure_ascii=False), reason_summary=decision.reason_summary, confidence=decision.confidence, requires_action=decision.requires_action, created_at=decision.payload.get("generated_at"), ) except Exception: pass upsert_cycle(id=cycle_id, started_at=started, finished_at=datetime.now(timezone.utc).isoformat(), status="ok", trigger="interval", notes=f"polled {len(concerns)} concerns over {','.join(cfg.crypto_timeframes)}") await asyncio.sleep(max(10, cfg.cycle_seconds)) asyncio.create_task(_poll_loop()) yield app = FastAPI(title="Hermes MCP", lifespan=lifespan) app.mount("/mcp", mcp.sse_app()) @app.get("/") def root() -> dict: return {"status": "ok", "mount": "/mcp/sse", "dashboard": "/dashboard"} @app.get("/health") def health() -> dict: return {"status": "ok", "db": "sqlite", "tool": "report"} @app.delete("/concerns/{concern_id}") def remove_concern(concern_id: str) -> JSONResponse: deleted = delete_concern(concern_id=concern_id) if not deleted.get("concerns"): return JSONResponse({"ok": False, "error": "concern not found", "deleted": deleted}, status_code=404) return JSONResponse({"ok": True, "deleted": deleted}) def _strip_sse(url: str) -> str: root = url.rstrip("/") return root[:-8] if root.endswith("/mcp/sse") else root async def _call_exec_tool(exec_url: str, tool: str, arguments: dict) -> object: async with sse_client(exec_url) as (read_stream, write_stream): async with ClientSession(read_stream, write_stream) as session: await session.initialize() result = await session.call_tool(tool, arguments) content = getattr(result, "content", None) or [] if not content: return None first = content[0] text = getattr(first, "text", None) if not isinstance(first, dict) else first.get("text") if text is None: return None try: return json.loads(text) except Exception: return text async def _load_exec_enrichment(exec_url: str, crypto_url: str, concerns: list[dict]) -> tuple[dict[str, dict], dict[str, dict], dict[str, float | None]]: account_ids = sorted({str(c.get("account_id") or "").strip() for c in concerns if str(c.get("account_id") or "").strip()}) market_symbols = sorted({str(c.get("market_symbol") or "").strip().lower() for c in concerns if str(c.get("market_symbol") or "").strip()}) account_payloads = await asyncio.gather(*[_call_exec_tool(exec_url, "get_account_info", {"account_id": account_id}) for account_id in account_ids]) market_payload = await _call_exec_tool(exec_url, "list_markets", {}) accounts_by_id = {account_id: payload for account_id, payload in zip(account_ids, account_payloads) if isinstance(payload, dict)} total_values: dict[str, float | None] = {} for account_id, payload in accounts_by_id.items(): total_values[account_id] = await _live_total_value(crypto_url, payload) markets_by_symbol: dict[str, dict] = {} if isinstance(market_payload, list): for market in market_payload: if not isinstance(market, dict): continue symbol = str(market.get("symbol") or market.get("market_symbol") or "").strip().lower() if symbol in market_symbols or symbol: markets_by_symbol[symbol] = market return accounts_by_id, markets_by_symbol, total_values async def _live_total_value(crypto_url: str, account_info: dict) -> float | None: balances = account_info.get("balances") if not isinstance(balances, list): return None total = 0.0 seen = False for item in balances: if not isinstance(item, dict): continue asset = str(item.get("asset_code") or item.get("asset") or "").strip().lower() amount = item.get("total") if not asset or amount is None: continue try: amount_f = float(amount) except Exception: continue seen = True if asset == "usd": total += amount_f continue price_payload = await get_price(crypto_url, asset) try: price = float(price_payload.get("price")) except Exception: price = 0.0 total += amount_f * price return total if seen else None def _compact_balances(payload: object) -> str: if not isinstance(payload, list): return "-" parts: list[str] = [] for item in payload: if not isinstance(item, dict): continue asset = str(item.get("asset_code") or item.get("asset") or "").upper().strip() total = item.get("total") available = item.get("available") value_usd = item.get("value_usd") if not asset: continue segment = f"{asset} {float(total):.8g}" if total is not None else asset if available is not None and total is not None and available != total: segment += f" (avail {float(available):.8g})" if isinstance(value_usd, (int, float)): segment += f" ≈ ${float(value_usd):,.2f}" parts.append(segment) return " | ".join(parts[:5]) or "-" def _resolve_regime_symbol(concern: dict) -> str | None: base = str(concern.get("base_currency") or "").strip().upper() if base: return base market = str(concern.get("market_symbol") or "").strip().upper().replace("/", "").replace("-", "") for suffix in ("USDT", "USDC", "USD", "EUR", "BTC", "ETH"): if market.endswith(suffix) and len(market) > len(suffix): return market[:-len(suffix)] return market or None def _default_playbook_name(strategies: list[dict]) -> str: types = {str(s.get("strategy_type") or "").strip() for s in strategies} if {"grid_trader", "trend_follower", "exposure_protector"}.issubset(types): return "grid-trend-rebalancer" if types == {"trend_follower"} or ("trend_follower" in types and "grid_trader" not in types and "exposure_protector" not in types): return "trend-only" labels = sorted(t.replace("_", "-") for t in types if t) return "+".join(labels) if labels else "playbook" def _default_playbook_family(strategies: list[dict]) -> str: types = {str(s.get("strategy_type") or "").strip() for s in strategies} if {"grid_trader", "trend_follower", "exposure_protector"}.issubset(types): return "grid-trend-rebalancer" if "trend_follower" in types and "grid_trader" not in types and "exposure_protector" not in types: return "trend-only" return "mixed" def _default_profile_config(family: str | None = None) -> dict[str, object]: normalized = str(family or "").strip().lower() if normalized in {"trend-only", "trend_only", "trend"}: return { "estimated_turn_cost_pct": 0.7, "micro_trend_weight": 0.8, "meso_trend_weight": 1.0, "macro_trend_weight": 0.7, "persistence_bonus_weight": 0.45, "argus_compression_penalty": 0.18, "activation_edge_threshold": 1.15, "flip_edge_threshold": 1.35, "flip_confirmation_gap": 0.25, } return { "breakout_persistence_min": 0.65, "short_term_confirmation_min": 0.32, "switch_cost_penalty": 1.0, "rebalance_imbalance_threshold": 0.30, "force_grid_when_balanced": True, "grid_release_threshold": 0.35, "trend_cooling_threshold": 0.45, "trend_inventory_stress_threshold": 0.55, "action_cooldown_seconds": 600, } def _profile_allowed_keys(family: str | None = None) -> set[str]: return set(_default_profile_config(family).keys()) def _normalize_profile_config(config: dict[str, object] | None, family: str | None = None) -> dict[str, object]: defaults = _default_profile_config(family) allowed = _profile_allowed_keys(family) current = config if isinstance(config, dict) else {} return {**defaults, **{k: v for k, v in current.items() if k in allowed}} def _ensure_profile_for_family(*, profile_id: str, family: str | None, name: str, description: str | None = None, status: str = "active") -> dict[str, Any]: family_label = str(family or "").strip() or "playbook" profile = get_decision_profile(profile_id=profile_id) config: dict[str, object] = {} if profile: try: raw = json.loads(profile.get("config_json") or "{}") except Exception: raw = {} config = _normalize_profile_config(raw if isinstance(raw, dict) else {}, family) current_name = str(profile.get("name") or "").strip() generic_names = {"grid-trend-rebalancer profile", "trend-only profile", "playbook profile"} profile_name = name if not current_name or current_name in generic_names else current_name upsert_decision_profile( id=profile_id, name=profile_name, description=str(profile.get("description") or description or "").strip() or None, config=config, status=str(profile.get("status") or status or "active"), ) return {**profile, "name": profile_name, "config": config} config = _default_profile_config(family) upsert_decision_profile( id=profile_id, name=name or f"{family_label} profile", description=description, config=config, status=status, ) created = get_decision_profile(profile_id=profile_id) or {"id": profile_id, "name": name, "description": description, "status": status} return {**created, "config": config} def _strategy_display_label(strategy: dict) -> str: for key in ("label", "display_name", "name", "title"): value = str(strategy.get(key) or "").strip() if value: return value strategy_type = str(strategy.get("strategy_type") or "strategy").strip().replace("_", " ") instance_id = str(strategy.get("id") or "").strip() return f"{strategy_type} ({instance_id[:8]})" if instance_id else strategy_type @app.get("/dashboard/data") def dashboard_data() -> JSONResponse: cfg = load_config() concerns = list_concerns() accounts_by_id: dict[str, dict] = {} markets_by_symbol: dict[str, dict] = {} strategy_inventory: list[dict] = [] strategy_inventory_available = True try: accounts_by_id, markets_by_symbol, total_values = anyio.run(_load_exec_enrichment, cfg.exec_url, cfg.crypto_url, concerns) except Exception: total_values = {} pass try: strategy_inventory = anyio.run(list_strategies, cfg.trader_url) except Exception: strategy_inventory = [] strategy_inventory_available = False live_scopes = { (str(strategy.get("account_id") or "").strip(), str(strategy.get("market_symbol") or "").strip().lower()) for strategy in strategy_inventory if str(strategy.get("account_id") or "").strip() and str(strategy.get("market_symbol") or "").strip() } enriched = [] concern_lookup: dict[str, dict] = {} for concern in concerns: account_id = str(concern.get("account_id") or "").strip() market_symbol = str(concern.get("market_symbol") or "").strip().lower() account_info = accounts_by_id.get(account_id, {}) market_info = markets_by_symbol.get(market_symbol, {}) enriched.append({ **concern, "account_display": account_info.get("display_name") or account_id, "balances": account_info.get("balances") or account_info.get("balance") or account_info.get("wallets") or [], "balance_summary": _compact_balances(account_info.get("balances") or account_info.get("balance") or account_info.get("wallets") or []), "total_value_usd": total_values.get(account_id) if total_values.get(account_id) is not None else account_info.get("total_value_usd"), "market_display": market_info.get("name") or concern.get("market_symbol") or "", "market_description": market_info.get("description") or "", "orphaned": strategy_inventory_available and (account_id, market_symbol) not in live_scopes, }) concern_lookup[str(concern.get("id") or "")] = enriched[-1] regimes = [] histories_by_key: dict[str, list[dict]] = {} for sample in recent_regime_samples(1000): concern_id = str(sample.get("concern_id") or "") timeframe = str(sample.get("timeframe") or "") key = f"{concern_id}::{timeframe}" bucket = histories_by_key.setdefault(key, []) if len(bucket) < 24: bucket.append(sample) for sample in latest_regime_samples(20): concern_meta = concern_lookup.get(str(sample.get("concern_id") or ""), {}) regimes.append({**sample, **{ "account_display": concern_meta.get("account_display"), "market_display": concern_meta.get("market_display"), "market_symbol": concern_meta.get("market_symbol"), }}) return JSONResponse({ "latest_cycle": latest_cycle(), "cycles": latest_cycles(10), "argus_observations": latest_observations(20, source="argus-mcp"), "concerns": enriched, "regime_samples": regimes, "regime_histories": histories_by_key, "state_samples": latest_states(20), "state_history": latest_states(100), "narrative_samples": latest_narratives(20), "decision_samples": latest_decisions(20), "decision_history": latest_decisions(100), }) @app.get("/dashboard/concerns/{concern_id}/data") def dashboard_concern_detail_data(concern_id: str) -> JSONResponse: cfg = load_config() concern_id = str(concern_id or "").strip() concern = next((c for c in list_concerns() if str(c.get("id") or "") == concern_id), None) if not concern: return JSONResponse({"ok": False, "error": "concern not found"}, status_code=404) account_id = str(concern.get("account_id") or "").strip() market_symbol = str(concern.get("market_symbol") or "").strip().lower() concerns = [concern] try: accounts_by_id, markets_by_symbol, total_values = anyio.run(_load_exec_enrichment, cfg.exec_url, cfg.crypto_url, concerns) except Exception: accounts_by_id, markets_by_symbol, total_values = {}, {}, {} account_info = accounts_by_id.get(account_id, {}) market_info = markets_by_symbol.get(market_symbol, {}) enriched_concern = { **concern, "account_display": account_info.get("display_name") or account_id, "balances": account_info.get("balances") or account_info.get("balance") or account_info.get("wallets") or [], "balance_summary": _compact_balances(account_info.get("balances") or account_info.get("balance") or account_info.get("wallets") or []), "total_value_usd": total_values.get(account_id) if total_values.get(account_id) is not None else account_info.get("total_value_usd"), "market_display": market_info.get("name") or concern.get("market_symbol") or "", "market_description": market_info.get("description") or "", } try: strategy_inventory = anyio.run(list_strategies, cfg.trader_url) except Exception: strategy_inventory = [] concern_strategies = [ s for s in strategy_inventory if str(s.get("account_id") or "").strip() == account_id and str(s.get("market_symbol") or "").strip().lower() == market_symbol ] strategies_by_id = {str(s.get("id") or "").strip(): s for s in concern_strategies if str(s.get("id") or "").strip()} profile_id = str(concern.get("decision_profile_id") or "").strip() existing_groups = list_strategy_groups(concern_id=concern_id) if not existing_groups and concern_strategies: seeded_group_id = f"playbook:{concern_id}:default" seeded_family = _default_playbook_family(concern_strategies) seeded_profile_id = profile_id or f"profile:{concern_id}:default" if not profile_id: upsert_decision_profile( id=seeded_profile_id, name=f"{_default_playbook_name(concern_strategies)} profile", description="Auto-seeded default profile for this concern.", config=_default_profile_config(seeded_family), status="active", ) upsert_concern( id=str(concern.get("id") or ""), account_id=account_id or None, market_symbol=market_symbol or None, base_currency=str(concern.get("base_currency") or "").strip() or None, quote_currency=str(concern.get("quote_currency") or "").strip() or None, strategy_id=str(concern.get("strategy_id") or "").strip() or None, decision_profile_id=seeded_profile_id, source=str(concern.get("source") or "dashboard"), status=str(concern.get("status") or "active"), notes=str(concern.get("notes") or "").strip() or None, ) concern = {**concern, "decision_profile_id": seeded_profile_id} profile_id = seeded_profile_id upsert_strategy_group( id=seeded_group_id, concern_id=concern_id, name=_default_playbook_name(concern_strategies), strategy_family=seeded_family, decision_profile_id=profile_id or None, notes="auto-seeded from trader strategies", status="active", ) for strategy in concern_strategies: strategy_id = str(strategy.get("id") or "").strip() if not strategy_id: continue upsert_strategy_assignment( id=f"assign:{seeded_group_id}:{strategy_id}", strategy_group_id=seeded_group_id, strategy_id=strategy_id, strategy_type=str(strategy.get("strategy_type") or "").strip() or None, role="member", status="active", notes="auto-seeded from trader inventory", ) existing_groups = list_strategy_groups(concern_id=concern_id) playbooks = [] active_playbook_profile_id = None for group in existing_groups: assignments = list_strategy_assignments(strategy_group_id=str(group.get("id") or "")) if str(group.get("strategy_family") or "").strip().lower() == "mixed" and assignments: assigned_strategies = [ strategies_by_id.get(str(a.get("strategy_id") or "").strip(), {"strategy_type": a.get("strategy_type")}) for a in assignments ] inferred_family = _default_playbook_family(assigned_strategies) if inferred_family != "mixed": upsert_strategy_group( id=str(group.get("id") or ""), concern_id=concern_id, name=str(group.get("name") or group.get("id") or "playbook"), strategy_family=inferred_family, decision_profile_id=str(group.get("decision_profile_id") or "").strip() or None, notes=str(group.get("notes") or "").strip() or None, status=str(group.get("status") or "active"), ) group = {**group, "strategy_family": inferred_family} group_profile_id = str(group.get("decision_profile_id") or "").strip() if not group_profile_id: group_profile_id = f"profile:{concern_id}:{str(group.get('id') or '').strip() or 'default'}" _ensure_profile_for_family( profile_id=group_profile_id, family=str(group.get("strategy_family") or ""), name=f"{str(group.get('name') or group.get('id') or 'playbook')} profile", description="Auto-created for this playbook.", status="active", ) upsert_strategy_group( id=str(group.get("id") or ""), concern_id=concern_id, name=str(group.get("name") or group.get("id") or "playbook"), strategy_family=str(group.get("strategy_family") or "").strip() or None, decision_profile_id=group_profile_id, notes=str(group.get("notes") or "").strip() or None, status=str(group.get("status") or "active"), ) group = {**group, "decision_profile_id": group_profile_id} else: _ensure_profile_for_family( profile_id=group_profile_id, family=str(group.get("strategy_family") or ""), name=f"{str(group.get('name') or group.get('id') or 'playbook')} profile", description="Auto-created for this playbook.", status="active", ) if str(group.get("status") or "").lower() == "active" and str(group.get("decision_profile_id") or "").strip(): active_playbook_profile_id = str(group.get("decision_profile_id") or "").strip() enriched_assignments = [] for assignment in assignments: strategy = strategies_by_id.get(str(assignment.get("strategy_id") or "").strip(), {}) enriched_assignments.append({ **assignment, "strategy_label": _strategy_display_label(strategy) if strategy else str(assignment.get("strategy_id") or "").strip(), }) playbooks.append({**group, "assignments": enriched_assignments}) concern_strategies = [{**s, "display_label": _strategy_display_label(s)} for s in concern_strategies] if active_playbook_profile_id and profile_id != active_playbook_profile_id: upsert_concern( id=str(concern.get("id") or ""), account_id=account_id or None, market_symbol=market_symbol or None, base_currency=str(concern.get("base_currency") or "").strip() or None, quote_currency=str(concern.get("quote_currency") or "").strip() or None, strategy_id=str(concern.get("strategy_id") or "").strip() or None, decision_profile_id=active_playbook_profile_id, source=str(concern.get("source") or "dashboard"), status=str(concern.get("status") or "active"), notes=str(concern.get("notes") or "").strip() or None, ) concern = {**concern, "decision_profile_id": active_playbook_profile_id} profile_id = active_playbook_profile_id active_family = next((str(p.get("strategy_family") or "") for p in playbooks if str(p.get("status") or "").lower() == "active"), "") decision_profile = ( _ensure_profile_for_family( profile_id=profile_id, family=active_family, name=f"{str((next((p for p in playbooks if str(p.get('status') or '').lower() == 'active'), {}) or {}).get('name') or 'playbook')} profile", description="Auto-created for this playbook.", status="active", ) if profile_id else None ) latest_state = next((s for s in latest_states(200) if str(s.get("concern_id") or "") == concern_id), None) latest_narrative = next((n for n in latest_narratives(200) if str(n.get("concern_id") or "") == concern_id), None) latest_decision = next((d for d in latest_decisions(200) if str(d.get("concern_id") or "") == concern_id), None) latest_regimes = [s for s in recent_regime_samples(500) if str(s.get("concern_id") or "") == concern_id][:24] return JSONResponse({ "ok": True, "concern": enriched_concern, "decision_profile": decision_profile, "playbooks": playbooks, "strategies": concern_strategies, "latest_state": latest_state, "latest_narrative": latest_narrative, "latest_decision": latest_decision, "latest_regimes": latest_regimes, }) @app.post("/dashboard/concerns/{concern_id}/playbooks/{playbook_id}/activate") def dashboard_activate_playbook(concern_id: str, playbook_id: str) -> JSONResponse: concern_id = str(concern_id or "").strip() playbook_id = str(playbook_id or "").strip() concern = next((c for c in list_concerns() if str(c.get("id") or "") == concern_id), None) if not concern: return JSONResponse({"ok": False, "error": "concern not found"}, status_code=404) groups = list_strategy_groups(concern_id=concern_id) target = next((g for g in groups if str(g.get("id") or "") == playbook_id), None) if not target: return JSONResponse({"ok": False, "error": "playbook not found"}, status_code=404) target_profile_id = str(target.get("decision_profile_id") or "").strip() or f"profile:{concern_id}:{playbook_id}" _ensure_profile_for_family( profile_id=target_profile_id, family=str(target.get("strategy_family") or ""), name=f"{str(target.get('name') or playbook_id)} profile", description="Auto-created for this playbook.", status="active", ) if str(target.get("decision_profile_id") or "").strip() != target_profile_id: upsert_strategy_group( id=str(target.get("id") or ""), concern_id=concern_id, name=str(target.get("name") or target.get("id") or "playbook"), strategy_family=str(target.get("strategy_family") or "").strip() or None, decision_profile_id=target_profile_id, notes=str(target.get("notes") or "").strip() or None, status=str(target.get("status") or "active"), ) target = {**target, "decision_profile_id": target_profile_id} for group in groups: upsert_strategy_group( id=str(group.get("id") or ""), concern_id=concern_id, name=str(group.get("name") or group.get("id") or "playbook"), strategy_family=str(group.get("strategy_family") or "").strip() or None, decision_profile_id=(target_profile_id if str(group.get("id") or "") == playbook_id else str(group.get("decision_profile_id") or "").strip() or None), notes=str(group.get("notes") or "").strip() or None, status="active" if str(group.get("id") or "") == playbook_id else "standby", ) upsert_concern( id=str(concern.get("id") or ""), account_id=str(concern.get("account_id") or "").strip() or None, market_symbol=str(concern.get("market_symbol") or "").strip() or None, base_currency=str(concern.get("base_currency") or "").strip() or None, quote_currency=str(concern.get("quote_currency") or "").strip() or None, strategy_id=str(concern.get("strategy_id") or "").strip() or None, decision_profile_id=target_profile_id, source=str(concern.get("source") or "dashboard"), status=str(concern.get("status") or "active"), notes=str(concern.get("notes") or "").strip() or None, ) return JSONResponse({"ok": True, "activated_playbook_id": playbook_id}) @app.post("/dashboard/concerns/{concern_id}/playbooks/{playbook_id}/tuning") async def dashboard_update_playbook_tuning(concern_id: str, playbook_id: str, request: Request) -> JSONResponse: concern_id = str(concern_id or "").strip() playbook_id = str(playbook_id or "").strip() concern = next((c for c in list_concerns() if str(c.get("id") or "") == concern_id), None) if not concern: return JSONResponse({"ok": False, "error": "concern not found"}, status_code=404) groups = list_strategy_groups(concern_id=concern_id) target = next((g for g in groups if str(g.get("id") or "") == playbook_id), None) if not target: return JSONResponse({"ok": False, "error": "playbook not found"}, status_code=404) profile_id = str(target.get("decision_profile_id") or "").strip() or f"profile:{concern_id}:{playbook_id}" if not str(target.get("decision_profile_id") or "").strip(): _ensure_profile_for_family( profile_id=profile_id, family=str(target.get("strategy_family") or ""), name=f"{str(target.get('name') or playbook_id)} profile", description="Auto-created while saving tuning from the dashboard.", status="active", ) upsert_strategy_group( id=str(target.get("id") or ""), concern_id=concern_id, name=str(target.get("name") or playbook_id), strategy_family=str(target.get("strategy_family") or "").strip() or None, decision_profile_id=profile_id, notes=str(target.get("notes") or "").strip() or None, status=str(target.get("status") or "active"), ) _ensure_profile_for_family( profile_id=profile_id, family=str(target.get("strategy_family") or ""), name=f"{str(target.get('name') or playbook_id)} profile", description="Auto-created while saving tuning from the dashboard.", status="active", ) if str(target.get("status") or "").lower() == "active" and str(concern.get("decision_profile_id") or "").strip() != profile_id: upsert_concern( id=str(concern.get("id") or ""), account_id=str(concern.get("account_id") or "").strip() or None, market_symbol=str(concern.get("market_symbol") or "").strip() or None, base_currency=str(concern.get("base_currency") or "").strip() or None, quote_currency=str(concern.get("quote_currency") or "").strip() or None, strategy_id=str(concern.get("strategy_id") or "").strip() or None, decision_profile_id=profile_id, source=str(concern.get("source") or "dashboard"), status=str(concern.get("status") or "active"), notes=str(concern.get("notes") or "").strip() or None, ) payload = await request.json() updates = payload if isinstance(payload, dict) else {} profile = get_decision_profile(profile_id=profile_id) if not profile: return JSONResponse({"ok": False, "error": "decision profile not found"}, status_code=404) try: current_config = json.loads(profile.get("config_json") or "{}") except Exception: current_config = {} if not isinstance(current_config, dict): current_config = {} allowed_keys = { "breakout_persistence_min", "short_term_confirmation_min", "switch_cost_penalty", "rebalance_imbalance_threshold", "force_grid_when_balanced", "grid_release_threshold", "trend_cooling_threshold", "trend_inventory_stress_threshold", "action_cooldown_seconds", "estimated_turn_cost_pct", "micro_trend_weight", "meso_trend_weight", "macro_trend_weight", "persistence_bonus_weight", "argus_compression_penalty", "activation_edge_threshold", "flip_edge_threshold", "flip_confirmation_gap", } merged = _normalize_profile_config(current_config, str(target.get("strategy_family") or "")) for key, value in updates.items(): if key not in allowed_keys: continue if key == "force_grid_when_balanced": merged[key] = bool(value) continue try: merged[key] = float(value) if key != "action_cooldown_seconds" else int(float(value)) except Exception: continue upsert_decision_profile( id=profile_id, name=str(profile.get("name") or profile_id), description=str(profile.get("description") or "").strip() or None, config=merged, status=str(profile.get("status") or "active"), ) return JSONResponse({"ok": True, "profile_id": profile_id, "config": merged}) @app.get("/dashboard/playbooks/data") def dashboard_playbooks_data() -> JSONResponse: concerns = {str(c.get("id") or ""): c for c in list_concerns()} groups = list_strategy_groups() out = [] for group in groups: concern = concerns.get(str(group.get("concern_id") or ""), {}) assignments = list_strategy_assignments(strategy_group_id=str(group.get("id") or "")) out.append({ **group, "concern": concern, "assignment_count": len(assignments), }) return JSONResponse({"ok": True, "playbooks": out}) @app.get("/dashboard/playbooks/{playbook_id}/data") def dashboard_playbook_detail_data(playbook_id: str) -> JSONResponse: playbook_id = str(playbook_id or "").strip() group = next((g for g in list_strategy_groups() if str(g.get("id") or "") == playbook_id), None) if not group: return JSONResponse({"ok": False, "error": "playbook not found"}, status_code=404) concern_id = str(group.get("concern_id") or "").strip() concern = next((c for c in list_concerns() if str(c.get("id") or "") == concern_id), None) if not concern: return JSONResponse({"ok": False, "error": "concern not found"}, status_code=404) cfg = load_config() account_id = str(concern.get("account_id") or "").strip() market_symbol = str(concern.get("market_symbol") or "").strip().lower() try: strategy_inventory = anyio.run(list_strategies, cfg.trader_url) except Exception: strategy_inventory = [] concern_strategies = [ {**s, "display_label": _strategy_display_label(s)} for s in strategy_inventory if str(s.get("account_id") or "").strip() == account_id and str(s.get("market_symbol") or "").strip().lower() == market_symbol ] strategies_by_id = {str(s.get("id") or "").strip(): s for s in concern_strategies if str(s.get("id") or "").strip()} assignments = [] raw_assignments = list_strategy_assignments(strategy_group_id=playbook_id) if str(group.get("strategy_family") or "").strip().lower() == "mixed" and raw_assignments: inferred_family = _default_playbook_family([ strategies_by_id.get(str(a.get("strategy_id") or "").strip(), {"strategy_type": a.get("strategy_type")}) for a in raw_assignments ]) if inferred_family != "mixed": upsert_strategy_group( id=str(group.get("id") or ""), concern_id=concern_id, name=str(group.get("name") or group.get("id") or "playbook"), strategy_family=inferred_family, decision_profile_id=str(group.get("decision_profile_id") or concern.get("decision_profile_id") or "").strip() or None, notes=str(group.get("notes") or "").strip() or None, status=str(group.get("status") or "active"), ) group = {**group, "strategy_family": inferred_family} for assignment in raw_assignments: strategy = strategies_by_id.get(str(assignment.get("strategy_id") or "").strip(), {}) assignments.append({ **assignment, "strategy_label": _strategy_display_label(strategy) if strategy else str(assignment.get("strategy_id") or "").strip(), }) profile_id = str(group.get("decision_profile_id") or concern.get("decision_profile_id") or "").strip() profile = get_decision_profile(profile_id=profile_id) if profile_id else None if profile: try: profile = {**profile, "config": json.loads(profile.get("config_json") or "{}")} except Exception: profile = {**profile, "config": {}} return JSONResponse({ "ok": True, "playbook": group, "concern": concern, "decision_profile": profile, "assignments": assignments, "available_strategies": concern_strategies, }) @app.post("/dashboard/concerns/{concern_id}/playbooks/create") async def dashboard_create_playbook(concern_id: str, request: Request) -> JSONResponse: concern_id = str(concern_id or "").strip() concern = next((c for c in list_concerns() if str(c.get("id") or "") == concern_id), None) if not concern: return JSONResponse({"ok": False, "error": "concern not found"}, status_code=404) payload = await request.json() name = str((payload or {}).get("name") or "").strip() strategy_family = str((payload or {}).get("strategy_family") or "manual").strip() or "manual" if not name: return JSONResponse({"ok": False, "error": "name is required"}, status_code=400) playbook_id = f"playbook:{concern_id}:{uuid4().hex[:8]}" profile_id = str(concern.get("decision_profile_id") or "").strip() or f"profile:{playbook_id}" if not get_decision_profile(profile_id=profile_id): upsert_decision_profile( id=profile_id, name=f"{name} profile", description="Auto-created profile for a new playbook.", config=_default_profile_config(strategy_family), status="active", ) upsert_strategy_group( id=playbook_id, concern_id=concern_id, name=name, strategy_family=strategy_family, decision_profile_id=profile_id, notes="created from dashboard playbooks page", status="standby", ) return JSONResponse({"ok": True, "playbook_id": playbook_id}) @app.post("/dashboard/playbooks/{playbook_id}/assignments/upsert") async def dashboard_playbook_assignment_upsert(playbook_id: str, request: Request) -> JSONResponse: playbook_id = str(playbook_id or "").strip() group = next((g for g in list_strategy_groups() if str(g.get("id") or "") == playbook_id), None) if not group: return JSONResponse({"ok": False, "error": "playbook not found"}, status_code=404) payload = await request.json() strategy_id = str((payload or {}).get("strategy_id") or "").strip() strategy_type = str((payload or {}).get("strategy_type") or "").strip() or None role = str((payload or {}).get("role") or "member").strip() or "member" if not strategy_id: return JSONResponse({"ok": False, "error": "strategy_id is required"}, status_code=400) assignment_id = f"assign:{playbook_id}:{strategy_id}" upsert_strategy_assignment( id=assignment_id, strategy_group_id=playbook_id, strategy_id=strategy_id, strategy_type=strategy_type, role=role, status="active", notes="managed from dashboard playbook editor", ) return JSONResponse({"ok": True, "assignment_id": assignment_id}) @app.post("/dashboard/concerns/{concern_id}/status") async def dashboard_set_concern_status(concern_id: str, request: Request) -> JSONResponse: concern_id = str(concern_id or "").strip() concern = next((c for c in list_concerns() if str(c.get("id") or "") == concern_id), None) if not concern: return JSONResponse({"ok": False, "error": "concern not found"}, status_code=404) payload = await request.json() status = str((payload or {}).get("status") or "").strip().lower() if status not in {"active", "inactive"}: return JSONResponse({"ok": False, "error": "status must be active or inactive"}, status_code=400) account_id = str(concern.get("account_id") or "").strip() if status == "inactive" and account_id: try: await trader_cancel_all_orders(cfg.trader_url if (cfg := load_config()) else "", account_id) except Exception: pass upsert_concern( id=str(concern.get("id") or ""), account_id=account_id or None, market_symbol=str(concern.get("market_symbol") or "").strip() or None, base_currency=str(concern.get("base_currency") or "").strip() or None, quote_currency=str(concern.get("quote_currency") or "").strip() or None, strategy_id=str(concern.get("strategy_id") or "").strip() or None, decision_profile_id=str(concern.get("decision_profile_id") or "").strip() or None, source=str(concern.get("source") or "dashboard"), status=status, notes=str(concern.get("notes") or "").strip() or None, ) return JSONResponse({"ok": True, "status": status}) @app.post("/dashboard/playbooks/{playbook_id}/assignments/{assignment_id}/delete") def dashboard_playbook_assignment_delete(playbook_id: str, assignment_id: str) -> JSONResponse: assignment_id = str(assignment_id or "").strip() init_db() from .store import _connect # local import to avoid widening the public store API for one dashboard mutation with _connect() as conn: deleted = conn.execute("delete from strategy_assignments where id = ? and strategy_group_id = ?", (assignment_id, str(playbook_id or "").strip())).rowcount or 0 if not deleted: return JSONResponse({"ok": False, "error": "assignment not found"}, status_code=404) return JSONResponse({"ok": True, "deleted": deleted})