| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302 |
- from __future__ import annotations
- from contextlib import asynccontextmanager
- import asyncio
- import json
- import time
- from datetime import datetime, timezone
- from uuid import uuid4
- import anyio
- from fastapi import FastAPI, Request
- from fastapi.responses import JSONResponse
- from mcp.server.fastmcp import FastMCP
- from mcp.server.transport_security import TransportSecuritySettings
- from mcp import ClientSession
- from mcp.client.sse import sse_client
- from .config import load_config
- from .argus_client import get_regime as argus_get_regime, get_snapshot as argus_get_snapshot
- from .crypto_client import get_price, get_regime
- from .decision_engine import assess_wallet_state
- from .decision_families import make_family_decision
- from .narrative_engine import build_narrative
- from .replay import build_replay_input
- from .state_engine import synthesize_state
- from .store import delete_concern, get_decision_profile, get_state, init_db, list_concerns, list_strategy_assignments, list_strategy_groups, latest_cycle, latest_cycles, latest_decisions, latest_narratives, latest_observations, latest_regime_samples, prune_older_than, recent_regime_samples, recent_states_for_concern, sync_concerns_from_strategies, upsert_concern, upsert_cycle, upsert_decision, upsert_decision_profile, upsert_narrative, upsert_observation, upsert_regime_sample, upsert_state, latest_states, upsert_strategy_assignment, upsert_strategy_group
- from .trader_client import apply_control_decision as trader_apply_control_decision, cancel_all_orders as trader_cancel_all_orders, get_strategy as trader_get_strategy, list_strategies
- mcp = FastMCP(
- "hermes-mcp",
- transport_security=TransportSecuritySettings(enable_dns_rebinding_protection=False),
- )
- def _build_trader_control_payload(*, decision_id: str, concern: dict, decision: object) -> dict | None:
- action = str(getattr(decision, "action", "") or "").strip()
- target_strategy = str(getattr(decision, "target_strategy", "") or "").strip() or None
- decision_payload = getattr(decision, "payload", {}) if isinstance(getattr(decision, "payload", {}), dict) else {}
- current_primary = str(decision_payload.get("current_primary_strategy") or "").strip() or None
- trader_action: str | None = None
- risk_mode: str | None = None
- if action.startswith("replace_with_") or action.startswith("enable_"):
- trader_action = "switch"
- elif action == "suspend_grid":
- trader_action = "pause"
- target_strategy = current_primary
- elif action == "set_risk_mode":
- trader_action = "set_risk_mode"
- risk_mode = str(decision_payload.get("risk_mode") or "").strip() or None
- else:
- return None
- account_id = str(concern.get("account_id") or "").strip()
- market_symbol = str(concern.get("market_symbol") or "").strip().lower()
- concern_id = str(concern.get("id") or "").strip() or None
- reason = str(getattr(decision, "reason_summary", "") or "").strip()
- confidence = float(getattr(decision, "confidence", 0.0) or 0.0)
- payload = {
- "decision_id": decision_id,
- "concern_id": concern_id,
- "account_id": account_id,
- "market_symbol": market_symbol,
- "action": trader_action,
- "target_strategy_id": target_strategy,
- "expected_active_strategy_id": current_primary,
- "risk_mode": risk_mode,
- "reason": reason,
- "confidence": confidence,
- "dry_run": False,
- "override": False,
- "source": "hermes-mcp",
- "source_action": action,
- }
- return payload
- async def _maybe_dispatch_trader_action(*, cfg: object, decision_id: str, concern: dict, decision: object, trader_available: bool = True, retry_after_seconds: int | None = None) -> dict:
- if str(concern.get("status") or "active").strip().lower() != "active":
- return {
- "dispatch": "blocked",
- "reason": "concern is inactive",
- }
- if not bool(getattr(decision, "requires_action", False)):
- return {"dispatch": "not_required"}
- payload = _build_trader_control_payload(decision_id=decision_id, concern=concern, decision=decision)
- if payload is None:
- return {
- "dispatch": "skipped",
- "reason": f"no trader action mapping for {getattr(decision, 'action', 'unknown')}",
- }
- if not bool(getattr(cfg, "hermes_allow_actions", False)):
- return {
- "dispatch": "blocked",
- "reason": "HERMES_ALLOW_ACTIONS is false",
- "payload": payload,
- }
- if not trader_available:
- return {
- "dispatch": "deferred",
- "reason": "trader unavailable",
- "retry_after_seconds": retry_after_seconds,
- "payload": payload,
- }
- try:
- result = await trader_apply_control_decision(getattr(cfg, "trader_url"), payload)
- return {
- "dispatch": "sent",
- "payload": payload,
- "result": result,
- }
- except Exception as exc:
- return {
- "dispatch": "failed",
- "payload": payload,
- "error": str(exc),
- }
- @mcp.tool(description="Return Hermes current state, narrative, uncertainty, and a short self-assessment report.")
- def report() -> dict:
- state = get_state()
- cfg = load_config()
- concerns = list_concerns()
- groups_by_concern: dict[str, list[dict[str, Any]]] = {}
- for group in list_strategy_groups():
- groups_by_concern.setdefault(str(group.get("concern_id") or ""), []).append(group)
- try:
- accounts_by_id, markets_by_symbol, total_values = anyio.run(_load_exec_enrichment, cfg.exec_url, cfg.crypto_url, concerns)
- except Exception:
- accounts_by_id, markets_by_symbol, total_values = {}, {}, {}
- concern_summaries = []
- for concern in concerns:
- concern_id = str(concern.get("id") or "")
- account_id = str(concern.get("account_id") or "").strip()
- market_symbol = str(concern.get("market_symbol") or "").strip().lower()
- account_info = accounts_by_id.get(account_id, {})
- market_info = markets_by_symbol.get(market_symbol, {})
- groups = groups_by_concern.get(concern_id, [])
- active_playbook = next((g for g in groups if str(g.get("status") or "").lower() == "active"), None)
- assignments = list_strategy_assignments(strategy_group_id=str(active_playbook.get("id") or "")) if active_playbook else []
- concern_summaries.append({
- "concern_id": concern_id,
- "account_id": account_id or None,
- "account": account_info.get("display_name") or account_id or None,
- "market_symbol": str(concern.get("market_symbol") or "") or None,
- "market": market_info.get("name") or str(concern.get("market_symbol") or "") or None,
- "status": str(concern.get("status") or "active"),
- "active_playbook": {
- "id": str(active_playbook.get("id") or "") or None,
- "name": str(active_playbook.get("name") or "") or None,
- "family": str(active_playbook.get("strategy_family") or "") or None,
- } if active_playbook else None,
- "active_strategies": [
- {
- "strategy_id": str(a.get("strategy_id") or "") or None,
- "role": str(a.get("role") or "member") or "member",
- "strategy_type": str(a.get("strategy_type") or "") or None,
- }
- for a in assignments
- ],
- "balances": _compact_balances(account_info.get("balances") or account_info.get("balance") or account_info.get("wallets") or []),
- "total_value_usd": total_values.get(account_id) if total_values.get(account_id) is not None else account_info.get("total_value_usd"),
- })
- return {
- "status": state.get("status", "stub"),
- "thinking": state.get("thinking", "Hermes scaffold is ready."),
- "confidence": state.get("confidence", 0.0),
- "uncertainty": state.get("uncertainty", ["no live adapters wired yet"]),
- "layers": state.get("layers", []),
- "concerns": concern_summaries,
- }
- @asynccontextmanager
- async def lifespan(_: FastAPI):
- cfg = load_config()
- init_db()
- trader_gate = {"failures": 0, "down_until": 0.0, "last_error": "", "last_ok": 0.0}
- cached_strategy_inventory: list[dict] = []
- def _trader_available() -> bool:
- return time.monotonic() >= float(trader_gate["down_until"] or 0.0)
- def _mark_trader_success() -> None:
- trader_gate["failures"] = 0
- trader_gate["down_until"] = 0.0
- trader_gate["last_error"] = ""
- trader_gate["last_ok"] = time.monotonic()
- def _mark_trader_failure(error: Exception) -> None:
- failures = int(trader_gate["failures"] or 0) + 1
- trader_gate["failures"] = failures
- trader_gate["last_error"] = str(error)
- backoff = min(300, max(5, 5 * (2 ** min(failures - 1, 5))))
- trader_gate["down_until"] = time.monotonic() + backoff
- try:
- sync_concerns_from_strategies(await list_strategies(cfg.trader_url))
- except Exception:
- pass
- try:
- prune_older_than(cfg.retention_days)
- except Exception:
- pass
- async def _poll_loop() -> None:
- nonlocal cached_strategy_inventory
- while True:
- started = datetime.now(timezone.utc).isoformat()
- cycle_id = str(uuid4())
- concerns = list_concerns()
- profile_ids = sorted({str(c.get("decision_profile_id") or "").strip() for c in concerns if str(c.get("decision_profile_id") or "").strip()})
- decision_profiles = {}
- for profile_id in profile_ids:
- profile = get_decision_profile(profile_id=profile_id)
- if not profile:
- continue
- try:
- profile_config = json.loads(profile.get("config_json") or "{}")
- except Exception:
- profile_config = {}
- if isinstance(profile_config, dict):
- decision_profiles[profile_id] = {**profile, "config": profile_config}
- playbook_groups = list_strategy_groups()
- playbook_assignments = {
- str(group.get("id") or ""): list_strategy_assignments(strategy_group_id=str(group.get("id") or ""))
- for group in playbook_groups
- }
- strategy_inventory = cached_strategy_inventory
- if _trader_available():
- try:
- strategy_inventory = await list_strategies(cfg.trader_url)
- enriched_inventory = []
- for strategy in strategy_inventory:
- instance_id = str(strategy.get("id") or "").strip()
- if not instance_id:
- enriched_inventory.append(strategy)
- continue
- try:
- detail = await trader_get_strategy(cfg.trader_url, instance_id, include_state=True, include_report=True)
- enriched_inventory.append({**strategy, **detail})
- except Exception:
- enriched_inventory.append(strategy)
- strategy_inventory = enriched_inventory
- cached_strategy_inventory = strategy_inventory
- _mark_trader_success()
- except Exception as exc:
- _mark_trader_failure(exc)
- strategy_inventory = cached_strategy_inventory
- try:
- sync_concerns_from_strategies(strategy_inventory)
- except Exception:
- pass
- upsert_cycle(id=cycle_id, started_at=started, finished_at=None, status="running", trigger="interval", notes=f"polling {len(concerns)} concerns")
- argus_snapshot: dict = {}
- argus_regime: dict = {}
- try:
- argus_snapshot = await argus_get_snapshot(cfg.argus_url)
- except Exception:
- argus_snapshot = {}
- try:
- argus_regime = await argus_get_regime(cfg.argus_url)
- except Exception:
- argus_regime = {}
- if argus_snapshot or argus_regime:
- upsert_observation(
- id=f"{cycle_id}:argus",
- cycle_id=cycle_id,
- concern_id=None,
- source="argus-mcp",
- kind="macro_snapshot",
- payload_json=json.dumps({"snapshot": argus_snapshot, "regime": argus_regime}, ensure_ascii=False),
- observed_at=datetime.now(timezone.utc).isoformat(),
- )
- for concern in concerns:
- symbol = _resolve_regime_symbol(concern)
- if not symbol:
- continue
- concern_id = str(concern.get("id") or "")
- account_id = str(concern.get("account_id") or "").strip()
- account_info = {}
- if account_id:
- try:
- payload = await _call_exec_tool(cfg.exec_url, "get_account_info", {"account_id": account_id})
- account_info = payload if isinstance(payload, dict) else {}
- except Exception:
- account_info = {}
- current_regimes: list[dict] = []
- for timeframe in cfg.crypto_timeframes:
- regime = await get_regime(cfg.crypto_url, str(symbol), timeframe)
- current_regimes.append({**regime, "timeframe": timeframe})
- upsert_regime_sample(
- id=f"{cycle_id}:{concern['id']}:{timeframe}",
- cycle_id=cycle_id,
- concern_id=str(concern["id"]),
- timeframe=timeframe,
- regime_json=json.dumps(regime, ensure_ascii=False),
- captured_at=datetime.now(timezone.utc).isoformat(),
- )
- try:
- state = synthesize_state(
- concern=concern,
- regimes=current_regimes,
- account_info=account_info,
- argus_snapshot=argus_snapshot,
- argus_regime=argus_regime,
- )
- upsert_state(
- id=f"{cycle_id}:{concern['id']}",
- cycle_id=cycle_id,
- concern_id=str(concern["id"]),
- market_regime=state.market_regime,
- volatility_state=state.volatility_state,
- liquidity_state=state.liquidity_state,
- sentiment_pressure=state.sentiment_pressure,
- event_risk=state.event_risk,
- execution_quality=state.execution_quality,
- confidence=state.confidence,
- payload_json=json.dumps(state.payload, ensure_ascii=False),
- created_at=state.payload.get("generated_at"),
- )
- narrative = build_narrative(concern=concern, state_payload=state.payload)
- upsert_narrative(
- id=f"{cycle_id}:{concern['id']}",
- cycle_id=cycle_id,
- concern_id=str(concern["id"]),
- summary=narrative.summary,
- key_drivers_json=json.dumps(narrative.key_drivers, ensure_ascii=False),
- risk_flags_json=json.dumps(narrative.risk_flags, ensure_ascii=False),
- uncertainties_json=json.dumps(narrative.uncertainties, ensure_ascii=False),
- confidence=narrative.confidence,
- created_at=narrative.payload.get("generated_at"),
- )
- latest_price = None
- if current_regimes:
- latest_price = next((r.get("price") for r in reversed(current_regimes) if r.get("price") is not None), None)
- wallet_state = assess_wallet_state(
- account_info=account_info,
- concern=concern,
- price=float(latest_price) if latest_price is not None else None,
- strategies=strategy_inventory,
- )
- active_playbook = next((g for g in playbook_groups if str(g.get("concern_id") or "") == concern_id and str(g.get("status") or "").lower() == "active"), None)
- assignment_by_strategy_id = {
- str(a.get("strategy_id") or "").strip(): a
- for a in playbook_assignments.get(str(active_playbook.get("id") or ""), [])
- if str(a.get("strategy_id") or "").strip()
- } if active_playbook else {}
- assigned_strategy_ids = {
- str(a.get("strategy_id") or "").strip()
- for a in playbook_assignments.get(str(active_playbook.get("id") or ""), [])
- if str(a.get("strategy_id") or "").strip()
- } if active_playbook else set()
- candidate_strategies = [
- {
- **s,
- "playbook_role": str(assignment_by_strategy_id.get(str(s.get("id") or "").strip(), {}).get("role") or "").strip() or None,
- "playbook_assignment_id": str(assignment_by_strategy_id.get(str(s.get("id") or "").strip(), {}).get("id") or "").strip() or None,
- }
- for s in strategy_inventory
- if str(s.get("account_id") or "").strip() == account_id
- and str(s.get("market_symbol") or "").strip().lower() == str(concern.get("market_symbol") or "").strip().lower()
- and (not assigned_strategy_ids or str(s.get("id") or "").strip() in assigned_strategy_ids)
- ]
- breakout_window_seconds = max(300, int(getattr(cfg, "breakout_memory_window_seconds", 900) or 900))
- recent_state_rows = recent_states_for_concern(concern_id=str(concern["id"]), since_seconds=breakout_window_seconds, limit=12)
- decision = make_family_decision(
- family=str(active_playbook.get("strategy_family") or "grid-trend-rebalancer") if active_playbook else "grid-trend-rebalancer",
- concern=concern,
- narrative_payload={
- **state.payload,
- **narrative.payload,
- "confidence": narrative.confidence,
- },
- wallet_state=wallet_state,
- strategies=candidate_strategies,
- history_window={
- "window_seconds": breakout_window_seconds,
- "recent_states": recent_state_rows,
- },
- decision_profile=decision_profiles.get(str(concern.get("decision_profile_id") or "").strip()),
- )
- decision_id = f"{cycle_id}:{concern['id']}"
- dispatch_record = await _maybe_dispatch_trader_action(
- cfg=cfg,
- decision_id=decision_id,
- concern=concern,
- decision=decision,
- trader_available=_trader_available(),
- retry_after_seconds=max(0, int(trader_gate["down_until"] - time.monotonic())) if not _trader_available() else None,
- )
- decision_payload = {
- **decision.payload,
- "replay_input": build_replay_input(
- concern=concern,
- narrative_payload={
- **state.payload,
- **narrative.payload,
- "confidence": narrative.confidence,
- },
- wallet_state=wallet_state,
- strategies=candidate_strategies,
- history_window={
- "window_seconds": breakout_window_seconds,
- "recent_states": recent_state_rows,
- },
- ),
- "dispatch": dispatch_record,
- "decision_family": str(active_playbook.get("strategy_family") or "grid-trend-rebalancer") if active_playbook else "grid-trend-rebalancer",
- "active_playbook_id": str(active_playbook.get("id") or "") if active_playbook else None,
- "candidate_strategy_ids": sorted(assigned_strategy_ids) if assigned_strategy_ids else [str(s.get("id") or "") for s in candidate_strategies if str(s.get("id") or "")],
- }
- upsert_decision(
- id=decision_id,
- cycle_id=cycle_id,
- concern_id=str(concern["id"]),
- mode=decision.mode,
- action=decision.action,
- target_strategy=decision.target_strategy,
- target_policy_json=json.dumps(decision_payload, ensure_ascii=False),
- reason_summary=decision.reason_summary,
- confidence=decision.confidence,
- requires_action=decision.requires_action,
- created_at=decision.payload.get("generated_at"),
- )
- except Exception:
- pass
- upsert_cycle(id=cycle_id, started_at=started, finished_at=datetime.now(timezone.utc).isoformat(), status="ok", trigger="interval", notes=f"polled {len(concerns)} concerns over {','.join(cfg.crypto_timeframes)}")
- await asyncio.sleep(max(10, cfg.cycle_seconds))
- asyncio.create_task(_poll_loop())
- yield
- app = FastAPI(title="Hermes MCP", lifespan=lifespan)
- app.mount("/mcp", mcp.sse_app())
- @app.get("/")
- def root() -> dict:
- return {"status": "ok", "mount": "/mcp/sse", "dashboard": "/dashboard"}
- @app.get("/health")
- def health() -> dict:
- return {"status": "ok", "db": "sqlite", "tool": "report"}
- @app.delete("/concerns/{concern_id}")
- def remove_concern(concern_id: str) -> JSONResponse:
- deleted = delete_concern(concern_id=concern_id)
- if not deleted.get("concerns"):
- return JSONResponse({"ok": False, "error": "concern not found", "deleted": deleted}, status_code=404)
- return JSONResponse({"ok": True, "deleted": deleted})
- def _strip_sse(url: str) -> str:
- root = url.rstrip("/")
- return root[:-8] if root.endswith("/mcp/sse") else root
- async def _call_exec_tool(exec_url: str, tool: str, arguments: dict) -> object:
- async with sse_client(exec_url) as (read_stream, write_stream):
- async with ClientSession(read_stream, write_stream) as session:
- await session.initialize()
- result = await session.call_tool(tool, arguments)
- content = getattr(result, "content", None) or []
- if not content:
- return None
- first = content[0]
- text = getattr(first, "text", None) if not isinstance(first, dict) else first.get("text")
- if text is None:
- return None
- try:
- return json.loads(text)
- except Exception:
- return text
- async def _load_exec_enrichment(exec_url: str, crypto_url: str, concerns: list[dict]) -> tuple[dict[str, dict], dict[str, dict], dict[str, float | None]]:
- account_ids = sorted({str(c.get("account_id") or "").strip() for c in concerns if str(c.get("account_id") or "").strip()})
- market_symbols = sorted({str(c.get("market_symbol") or "").strip().lower() for c in concerns if str(c.get("market_symbol") or "").strip()})
- account_payloads = await asyncio.gather(*[_call_exec_tool(exec_url, "get_account_info", {"account_id": account_id}) for account_id in account_ids])
- market_payload = await _call_exec_tool(exec_url, "list_markets", {})
- accounts_by_id = {account_id: payload for account_id, payload in zip(account_ids, account_payloads) if isinstance(payload, dict)}
- total_values: dict[str, float | None] = {}
- for account_id, payload in accounts_by_id.items():
- total_values[account_id] = await _live_total_value(crypto_url, payload)
- markets_by_symbol: dict[str, dict] = {}
- if isinstance(market_payload, list):
- for market in market_payload:
- if not isinstance(market, dict):
- continue
- symbol = str(market.get("symbol") or market.get("market_symbol") or "").strip().lower()
- if symbol in market_symbols or symbol:
- markets_by_symbol[symbol] = market
- return accounts_by_id, markets_by_symbol, total_values
- async def _live_total_value(crypto_url: str, account_info: dict) -> float | None:
- balances = account_info.get("balances")
- if not isinstance(balances, list):
- return None
- total = 0.0
- seen = False
- for item in balances:
- if not isinstance(item, dict):
- continue
- asset = str(item.get("asset_code") or item.get("asset") or "").strip().lower()
- amount = item.get("total")
- if not asset or amount is None:
- continue
- try:
- amount_f = float(amount)
- except Exception:
- continue
- seen = True
- if asset == "usd":
- total += amount_f
- continue
- price_payload = await get_price(crypto_url, asset)
- try:
- price = float(price_payload.get("price"))
- except Exception:
- price = 0.0
- total += amount_f * price
- return total if seen else None
- def _compact_balances(payload: object) -> str:
- if not isinstance(payload, list):
- return "-"
- parts: list[str] = []
- for item in payload:
- if not isinstance(item, dict):
- continue
- asset = str(item.get("asset_code") or item.get("asset") or "").upper().strip()
- total = item.get("total")
- available = item.get("available")
- value_usd = item.get("value_usd")
- if not asset:
- continue
- segment = f"{asset} {float(total):.8g}" if total is not None else asset
- if available is not None and total is not None and available != total:
- segment += f" (avail {float(available):.8g})"
- if isinstance(value_usd, (int, float)):
- segment += f" ≈ ${float(value_usd):,.2f}"
- parts.append(segment)
- return " | ".join(parts[:5]) or "-"
- def _resolve_regime_symbol(concern: dict) -> str | None:
- base = str(concern.get("base_currency") or "").strip().upper()
- if base:
- return base
- market = str(concern.get("market_symbol") or "").strip().upper().replace("/", "").replace("-", "")
- for suffix in ("USDT", "USDC", "USD", "EUR", "BTC", "ETH"):
- if market.endswith(suffix) and len(market) > len(suffix):
- return market[:-len(suffix)]
- return market or None
- def _default_playbook_name(strategies: list[dict]) -> str:
- types = {str(s.get("strategy_type") or "").strip() for s in strategies}
- if {"grid_trader", "trend_follower", "exposure_protector"}.issubset(types):
- return "grid-trend-rebalancer"
- if types == {"trend_follower"} or ("trend_follower" in types and "grid_trader" not in types and "exposure_protector" not in types):
- return "trend-only"
- labels = sorted(t.replace("_", "-") for t in types if t)
- return "+".join(labels) if labels else "playbook"
- def _default_playbook_family(strategies: list[dict]) -> str:
- types = {str(s.get("strategy_type") or "").strip() for s in strategies}
- if {"grid_trader", "trend_follower", "exposure_protector"}.issubset(types):
- return "grid-trend-rebalancer"
- if "trend_follower" in types and "grid_trader" not in types and "exposure_protector" not in types:
- return "trend-only"
- return "mixed"
- def _default_profile_config(family: str | None = None) -> dict[str, object]:
- normalized = str(family or "").strip().lower()
- if normalized in {"trend-only", "trend_only", "trend"}:
- return {
- "estimated_turn_cost_pct": 0.7,
- "micro_trend_weight": 0.8,
- "meso_trend_weight": 1.0,
- "macro_trend_weight": 0.7,
- "persistence_bonus_weight": 0.45,
- "argus_compression_penalty": 0.18,
- "activation_edge_threshold": 1.15,
- "flip_edge_threshold": 1.35,
- "flip_confirmation_gap": 0.25,
- }
- return {
- "breakout_persistence_min": 0.65,
- "short_term_confirmation_min": 0.32,
- "switch_cost_penalty": 1.0,
- "rebalance_imbalance_threshold": 0.30,
- "force_grid_when_balanced": True,
- "grid_release_threshold": 0.35,
- "trend_cooling_threshold": 0.45,
- "trend_inventory_stress_threshold": 0.55,
- "action_cooldown_seconds": 600,
- }
- def _profile_allowed_keys(family: str | None = None) -> set[str]:
- return set(_default_profile_config(family).keys())
- def _normalize_profile_config(config: dict[str, object] | None, family: str | None = None) -> dict[str, object]:
- defaults = _default_profile_config(family)
- allowed = _profile_allowed_keys(family)
- current = config if isinstance(config, dict) else {}
- return {**defaults, **{k: v for k, v in current.items() if k in allowed}}
- def _ensure_profile_for_family(*, profile_id: str, family: str | None, name: str, description: str | None = None, status: str = "active") -> dict[str, Any]:
- family_label = str(family or "").strip() or "playbook"
- profile = get_decision_profile(profile_id=profile_id)
- config: dict[str, object] = {}
- if profile:
- try:
- raw = json.loads(profile.get("config_json") or "{}")
- except Exception:
- raw = {}
- config = _normalize_profile_config(raw if isinstance(raw, dict) else {}, family)
- current_name = str(profile.get("name") or "").strip()
- generic_names = {"grid-trend-rebalancer profile", "trend-only profile", "playbook profile"}
- profile_name = name if not current_name or current_name in generic_names else current_name
- upsert_decision_profile(
- id=profile_id,
- name=profile_name,
- description=str(profile.get("description") or description or "").strip() or None,
- config=config,
- status=str(profile.get("status") or status or "active"),
- )
- return {**profile, "name": profile_name, "config": config}
- config = _default_profile_config(family)
- upsert_decision_profile(
- id=profile_id,
- name=name or f"{family_label} profile",
- description=description,
- config=config,
- status=status,
- )
- created = get_decision_profile(profile_id=profile_id) or {"id": profile_id, "name": name, "description": description, "status": status}
- return {**created, "config": config}
- def _strategy_display_label(strategy: dict) -> str:
- for key in ("label", "display_name", "name", "title"):
- value = str(strategy.get(key) or "").strip()
- if value:
- return value
- strategy_type = str(strategy.get("strategy_type") or "strategy").strip().replace("_", " ")
- instance_id = str(strategy.get("id") or "").strip()
- return f"{strategy_type} ({instance_id[:8]})" if instance_id else strategy_type
- @app.get("/dashboard/data")
- def dashboard_data() -> JSONResponse:
- cfg = load_config()
- concerns = list_concerns()
- accounts_by_id: dict[str, dict] = {}
- markets_by_symbol: dict[str, dict] = {}
- strategy_inventory: list[dict] = []
- strategy_inventory_available = True
- try:
- accounts_by_id, markets_by_symbol, total_values = anyio.run(_load_exec_enrichment, cfg.exec_url, cfg.crypto_url, concerns)
- except Exception:
- total_values = {}
- pass
- try:
- strategy_inventory = anyio.run(list_strategies, cfg.trader_url)
- except Exception:
- strategy_inventory = []
- strategy_inventory_available = False
- live_scopes = {
- (str(strategy.get("account_id") or "").strip(), str(strategy.get("market_symbol") or "").strip().lower())
- for strategy in strategy_inventory
- if str(strategy.get("account_id") or "").strip() and str(strategy.get("market_symbol") or "").strip()
- }
- enriched = []
- concern_lookup: dict[str, dict] = {}
- for concern in concerns:
- account_id = str(concern.get("account_id") or "").strip()
- market_symbol = str(concern.get("market_symbol") or "").strip().lower()
- account_info = accounts_by_id.get(account_id, {})
- market_info = markets_by_symbol.get(market_symbol, {})
- enriched.append({
- **concern,
- "account_display": account_info.get("display_name") or account_id,
- "balances": account_info.get("balances") or account_info.get("balance") or account_info.get("wallets") or [],
- "balance_summary": _compact_balances(account_info.get("balances") or account_info.get("balance") or account_info.get("wallets") or []),
- "total_value_usd": total_values.get(account_id) if total_values.get(account_id) is not None else account_info.get("total_value_usd"),
- "market_display": market_info.get("name") or concern.get("market_symbol") or "",
- "market_description": market_info.get("description") or "",
- "orphaned": strategy_inventory_available and (account_id, market_symbol) not in live_scopes,
- })
- concern_lookup[str(concern.get("id") or "")] = enriched[-1]
- regimes = []
- histories_by_key: dict[str, list[dict]] = {}
- for sample in recent_regime_samples(1000):
- concern_id = str(sample.get("concern_id") or "")
- timeframe = str(sample.get("timeframe") or "")
- key = f"{concern_id}::{timeframe}"
- bucket = histories_by_key.setdefault(key, [])
- if len(bucket) < 24:
- bucket.append(sample)
- for sample in latest_regime_samples(20):
- concern_meta = concern_lookup.get(str(sample.get("concern_id") or ""), {})
- regimes.append({**sample, **{
- "account_display": concern_meta.get("account_display"),
- "market_display": concern_meta.get("market_display"),
- "market_symbol": concern_meta.get("market_symbol"),
- }})
- return JSONResponse({
- "latest_cycle": latest_cycle(),
- "cycles": latest_cycles(10),
- "argus_observations": latest_observations(20, source="argus-mcp"),
- "concerns": enriched,
- "regime_samples": regimes,
- "regime_histories": histories_by_key,
- "state_samples": latest_states(20),
- "state_history": latest_states(100),
- "narrative_samples": latest_narratives(20),
- "decision_samples": latest_decisions(20),
- "decision_history": latest_decisions(100),
- })
- @app.get("/dashboard/concerns/{concern_id}/data")
- def dashboard_concern_detail_data(concern_id: str) -> JSONResponse:
- cfg = load_config()
- concern_id = str(concern_id or "").strip()
- concern = next((c for c in list_concerns() if str(c.get("id") or "") == concern_id), None)
- if not concern:
- return JSONResponse({"ok": False, "error": "concern not found"}, status_code=404)
- account_id = str(concern.get("account_id") or "").strip()
- market_symbol = str(concern.get("market_symbol") or "").strip().lower()
- concerns = [concern]
- try:
- accounts_by_id, markets_by_symbol, total_values = anyio.run(_load_exec_enrichment, cfg.exec_url, cfg.crypto_url, concerns)
- except Exception:
- accounts_by_id, markets_by_symbol, total_values = {}, {}, {}
- account_info = accounts_by_id.get(account_id, {})
- market_info = markets_by_symbol.get(market_symbol, {})
- enriched_concern = {
- **concern,
- "account_display": account_info.get("display_name") or account_id,
- "balances": account_info.get("balances") or account_info.get("balance") or account_info.get("wallets") or [],
- "balance_summary": _compact_balances(account_info.get("balances") or account_info.get("balance") or account_info.get("wallets") or []),
- "total_value_usd": total_values.get(account_id) if total_values.get(account_id) is not None else account_info.get("total_value_usd"),
- "market_display": market_info.get("name") or concern.get("market_symbol") or "",
- "market_description": market_info.get("description") or "",
- }
- try:
- strategy_inventory = anyio.run(list_strategies, cfg.trader_url)
- except Exception:
- strategy_inventory = []
- concern_strategies = [
- s for s in strategy_inventory
- if str(s.get("account_id") or "").strip() == account_id
- and str(s.get("market_symbol") or "").strip().lower() == market_symbol
- ]
- strategies_by_id = {str(s.get("id") or "").strip(): s for s in concern_strategies if str(s.get("id") or "").strip()}
- profile_id = str(concern.get("decision_profile_id") or "").strip()
- existing_groups = list_strategy_groups(concern_id=concern_id)
- if not existing_groups and concern_strategies:
- seeded_group_id = f"playbook:{concern_id}:default"
- seeded_family = _default_playbook_family(concern_strategies)
- seeded_profile_id = profile_id or f"profile:{concern_id}:default"
- if not profile_id:
- upsert_decision_profile(
- id=seeded_profile_id,
- name=f"{_default_playbook_name(concern_strategies)} profile",
- description="Auto-seeded default profile for this concern.",
- config=_default_profile_config(seeded_family),
- status="active",
- )
- upsert_concern(
- id=str(concern.get("id") or ""),
- account_id=account_id or None,
- market_symbol=market_symbol or None,
- base_currency=str(concern.get("base_currency") or "").strip() or None,
- quote_currency=str(concern.get("quote_currency") or "").strip() or None,
- strategy_id=str(concern.get("strategy_id") or "").strip() or None,
- decision_profile_id=seeded_profile_id,
- source=str(concern.get("source") or "dashboard"),
- status=str(concern.get("status") or "active"),
- notes=str(concern.get("notes") or "").strip() or None,
- )
- concern = {**concern, "decision_profile_id": seeded_profile_id}
- profile_id = seeded_profile_id
- upsert_strategy_group(
- id=seeded_group_id,
- concern_id=concern_id,
- name=_default_playbook_name(concern_strategies),
- strategy_family=seeded_family,
- decision_profile_id=profile_id or None,
- notes="auto-seeded from trader strategies",
- status="active",
- )
- for strategy in concern_strategies:
- strategy_id = str(strategy.get("id") or "").strip()
- if not strategy_id:
- continue
- upsert_strategy_assignment(
- id=f"assign:{seeded_group_id}:{strategy_id}",
- strategy_group_id=seeded_group_id,
- strategy_id=strategy_id,
- strategy_type=str(strategy.get("strategy_type") or "").strip() or None,
- role="member",
- status="active",
- notes="auto-seeded from trader inventory",
- )
- existing_groups = list_strategy_groups(concern_id=concern_id)
- playbooks = []
- active_playbook_profile_id = None
- for group in existing_groups:
- assignments = list_strategy_assignments(strategy_group_id=str(group.get("id") or ""))
- if str(group.get("strategy_family") or "").strip().lower() == "mixed" and assignments:
- assigned_strategies = [
- strategies_by_id.get(str(a.get("strategy_id") or "").strip(), {"strategy_type": a.get("strategy_type")})
- for a in assignments
- ]
- inferred_family = _default_playbook_family(assigned_strategies)
- if inferred_family != "mixed":
- upsert_strategy_group(
- id=str(group.get("id") or ""),
- concern_id=concern_id,
- name=str(group.get("name") or group.get("id") or "playbook"),
- strategy_family=inferred_family,
- decision_profile_id=str(group.get("decision_profile_id") or "").strip() or None,
- notes=str(group.get("notes") or "").strip() or None,
- status=str(group.get("status") or "active"),
- )
- group = {**group, "strategy_family": inferred_family}
- group_profile_id = str(group.get("decision_profile_id") or "").strip()
- if not group_profile_id:
- group_profile_id = f"profile:{concern_id}:{str(group.get('id') or '').strip() or 'default'}"
- _ensure_profile_for_family(
- profile_id=group_profile_id,
- family=str(group.get("strategy_family") or ""),
- name=f"{str(group.get('name') or group.get('id') or 'playbook')} profile",
- description="Auto-created for this playbook.",
- status="active",
- )
- upsert_strategy_group(
- id=str(group.get("id") or ""),
- concern_id=concern_id,
- name=str(group.get("name") or group.get("id") or "playbook"),
- strategy_family=str(group.get("strategy_family") or "").strip() or None,
- decision_profile_id=group_profile_id,
- notes=str(group.get("notes") or "").strip() or None,
- status=str(group.get("status") or "active"),
- )
- group = {**group, "decision_profile_id": group_profile_id}
- else:
- _ensure_profile_for_family(
- profile_id=group_profile_id,
- family=str(group.get("strategy_family") or ""),
- name=f"{str(group.get('name') or group.get('id') or 'playbook')} profile",
- description="Auto-created for this playbook.",
- status="active",
- )
- if str(group.get("status") or "").lower() == "active" and str(group.get("decision_profile_id") or "").strip():
- active_playbook_profile_id = str(group.get("decision_profile_id") or "").strip()
- enriched_assignments = []
- for assignment in assignments:
- strategy = strategies_by_id.get(str(assignment.get("strategy_id") or "").strip(), {})
- enriched_assignments.append({
- **assignment,
- "strategy_label": _strategy_display_label(strategy) if strategy else str(assignment.get("strategy_id") or "").strip(),
- })
- playbooks.append({**group, "assignments": enriched_assignments})
- concern_strategies = [{**s, "display_label": _strategy_display_label(s)} for s in concern_strategies]
- if active_playbook_profile_id and profile_id != active_playbook_profile_id:
- upsert_concern(
- id=str(concern.get("id") or ""),
- account_id=account_id or None,
- market_symbol=market_symbol or None,
- base_currency=str(concern.get("base_currency") or "").strip() or None,
- quote_currency=str(concern.get("quote_currency") or "").strip() or None,
- strategy_id=str(concern.get("strategy_id") or "").strip() or None,
- decision_profile_id=active_playbook_profile_id,
- source=str(concern.get("source") or "dashboard"),
- status=str(concern.get("status") or "active"),
- notes=str(concern.get("notes") or "").strip() or None,
- )
- concern = {**concern, "decision_profile_id": active_playbook_profile_id}
- profile_id = active_playbook_profile_id
- active_family = next((str(p.get("strategy_family") or "") for p in playbooks if str(p.get("status") or "").lower() == "active"), "")
- decision_profile = (
- _ensure_profile_for_family(
- profile_id=profile_id,
- family=active_family,
- name=f"{str((next((p for p in playbooks if str(p.get('status') or '').lower() == 'active'), {}) or {}).get('name') or 'playbook')} profile",
- description="Auto-created for this playbook.",
- status="active",
- )
- if profile_id else None
- )
- latest_state = next((s for s in latest_states(200) if str(s.get("concern_id") or "") == concern_id), None)
- latest_narrative = next((n for n in latest_narratives(200) if str(n.get("concern_id") or "") == concern_id), None)
- latest_decision = next((d for d in latest_decisions(200) if str(d.get("concern_id") or "") == concern_id), None)
- latest_regimes = [s for s in recent_regime_samples(500) if str(s.get("concern_id") or "") == concern_id][:24]
- return JSONResponse({
- "ok": True,
- "concern": enriched_concern,
- "decision_profile": decision_profile,
- "playbooks": playbooks,
- "strategies": concern_strategies,
- "latest_state": latest_state,
- "latest_narrative": latest_narrative,
- "latest_decision": latest_decision,
- "latest_regimes": latest_regimes,
- })
- @app.post("/dashboard/concerns/{concern_id}/playbooks/{playbook_id}/activate")
- def dashboard_activate_playbook(concern_id: str, playbook_id: str) -> JSONResponse:
- concern_id = str(concern_id or "").strip()
- playbook_id = str(playbook_id or "").strip()
- concern = next((c for c in list_concerns() if str(c.get("id") or "") == concern_id), None)
- if not concern:
- return JSONResponse({"ok": False, "error": "concern not found"}, status_code=404)
- groups = list_strategy_groups(concern_id=concern_id)
- target = next((g for g in groups if str(g.get("id") or "") == playbook_id), None)
- if not target:
- return JSONResponse({"ok": False, "error": "playbook not found"}, status_code=404)
- target_profile_id = str(target.get("decision_profile_id") or "").strip() or f"profile:{concern_id}:{playbook_id}"
- _ensure_profile_for_family(
- profile_id=target_profile_id,
- family=str(target.get("strategy_family") or ""),
- name=f"{str(target.get('name') or playbook_id)} profile",
- description="Auto-created for this playbook.",
- status="active",
- )
- if str(target.get("decision_profile_id") or "").strip() != target_profile_id:
- upsert_strategy_group(
- id=str(target.get("id") or ""),
- concern_id=concern_id,
- name=str(target.get("name") or target.get("id") or "playbook"),
- strategy_family=str(target.get("strategy_family") or "").strip() or None,
- decision_profile_id=target_profile_id,
- notes=str(target.get("notes") or "").strip() or None,
- status=str(target.get("status") or "active"),
- )
- target = {**target, "decision_profile_id": target_profile_id}
- for group in groups:
- upsert_strategy_group(
- id=str(group.get("id") or ""),
- concern_id=concern_id,
- name=str(group.get("name") or group.get("id") or "playbook"),
- strategy_family=str(group.get("strategy_family") or "").strip() or None,
- decision_profile_id=(target_profile_id if str(group.get("id") or "") == playbook_id else str(group.get("decision_profile_id") or "").strip() or None),
- notes=str(group.get("notes") or "").strip() or None,
- status="active" if str(group.get("id") or "") == playbook_id else "standby",
- )
- upsert_concern(
- id=str(concern.get("id") or ""),
- account_id=str(concern.get("account_id") or "").strip() or None,
- market_symbol=str(concern.get("market_symbol") or "").strip() or None,
- base_currency=str(concern.get("base_currency") or "").strip() or None,
- quote_currency=str(concern.get("quote_currency") or "").strip() or None,
- strategy_id=str(concern.get("strategy_id") or "").strip() or None,
- decision_profile_id=target_profile_id,
- source=str(concern.get("source") or "dashboard"),
- status=str(concern.get("status") or "active"),
- notes=str(concern.get("notes") or "").strip() or None,
- )
- return JSONResponse({"ok": True, "activated_playbook_id": playbook_id})
- @app.post("/dashboard/concerns/{concern_id}/playbooks/{playbook_id}/tuning")
- async def dashboard_update_playbook_tuning(concern_id: str, playbook_id: str, request: Request) -> JSONResponse:
- concern_id = str(concern_id or "").strip()
- playbook_id = str(playbook_id or "").strip()
- concern = next((c for c in list_concerns() if str(c.get("id") or "") == concern_id), None)
- if not concern:
- return JSONResponse({"ok": False, "error": "concern not found"}, status_code=404)
- groups = list_strategy_groups(concern_id=concern_id)
- target = next((g for g in groups if str(g.get("id") or "") == playbook_id), None)
- if not target:
- return JSONResponse({"ok": False, "error": "playbook not found"}, status_code=404)
- profile_id = str(target.get("decision_profile_id") or "").strip() or f"profile:{concern_id}:{playbook_id}"
- if not str(target.get("decision_profile_id") or "").strip():
- _ensure_profile_for_family(
- profile_id=profile_id,
- family=str(target.get("strategy_family") or ""),
- name=f"{str(target.get('name') or playbook_id)} profile",
- description="Auto-created while saving tuning from the dashboard.",
- status="active",
- )
- upsert_strategy_group(
- id=str(target.get("id") or ""),
- concern_id=concern_id,
- name=str(target.get("name") or playbook_id),
- strategy_family=str(target.get("strategy_family") or "").strip() or None,
- decision_profile_id=profile_id,
- notes=str(target.get("notes") or "").strip() or None,
- status=str(target.get("status") or "active"),
- )
- _ensure_profile_for_family(
- profile_id=profile_id,
- family=str(target.get("strategy_family") or ""),
- name=f"{str(target.get('name') or playbook_id)} profile",
- description="Auto-created while saving tuning from the dashboard.",
- status="active",
- )
- if str(target.get("status") or "").lower() == "active" and str(concern.get("decision_profile_id") or "").strip() != profile_id:
- upsert_concern(
- id=str(concern.get("id") or ""),
- account_id=str(concern.get("account_id") or "").strip() or None,
- market_symbol=str(concern.get("market_symbol") or "").strip() or None,
- base_currency=str(concern.get("base_currency") or "").strip() or None,
- quote_currency=str(concern.get("quote_currency") or "").strip() or None,
- strategy_id=str(concern.get("strategy_id") or "").strip() or None,
- decision_profile_id=profile_id,
- source=str(concern.get("source") or "dashboard"),
- status=str(concern.get("status") or "active"),
- notes=str(concern.get("notes") or "").strip() or None,
- )
- payload = await request.json()
- updates = payload if isinstance(payload, dict) else {}
- profile = get_decision_profile(profile_id=profile_id)
- if not profile:
- return JSONResponse({"ok": False, "error": "decision profile not found"}, status_code=404)
- try:
- current_config = json.loads(profile.get("config_json") or "{}")
- except Exception:
- current_config = {}
- if not isinstance(current_config, dict):
- current_config = {}
- allowed_keys = {
- "breakout_persistence_min",
- "short_term_confirmation_min",
- "switch_cost_penalty",
- "rebalance_imbalance_threshold",
- "force_grid_when_balanced",
- "grid_release_threshold",
- "trend_cooling_threshold",
- "trend_inventory_stress_threshold",
- "action_cooldown_seconds",
- "estimated_turn_cost_pct",
- "micro_trend_weight",
- "meso_trend_weight",
- "macro_trend_weight",
- "persistence_bonus_weight",
- "argus_compression_penalty",
- "activation_edge_threshold",
- "flip_edge_threshold",
- "flip_confirmation_gap",
- }
- merged = _normalize_profile_config(current_config, str(target.get("strategy_family") or ""))
- for key, value in updates.items():
- if key not in allowed_keys:
- continue
- if key == "force_grid_when_balanced":
- merged[key] = bool(value)
- continue
- try:
- merged[key] = float(value) if key != "action_cooldown_seconds" else int(float(value))
- except Exception:
- continue
- upsert_decision_profile(
- id=profile_id,
- name=str(profile.get("name") or profile_id),
- description=str(profile.get("description") or "").strip() or None,
- config=merged,
- status=str(profile.get("status") or "active"),
- )
- return JSONResponse({"ok": True, "profile_id": profile_id, "config": merged})
- @app.get("/dashboard/playbooks/data")
- def dashboard_playbooks_data() -> JSONResponse:
- concerns = {str(c.get("id") or ""): c for c in list_concerns()}
- groups = list_strategy_groups()
- out = []
- for group in groups:
- concern = concerns.get(str(group.get("concern_id") or ""), {})
- assignments = list_strategy_assignments(strategy_group_id=str(group.get("id") or ""))
- out.append({
- **group,
- "concern": concern,
- "assignment_count": len(assignments),
- })
- return JSONResponse({"ok": True, "playbooks": out})
- @app.get("/dashboard/playbooks/{playbook_id}/data")
- def dashboard_playbook_detail_data(playbook_id: str) -> JSONResponse:
- playbook_id = str(playbook_id or "").strip()
- group = next((g for g in list_strategy_groups() if str(g.get("id") or "") == playbook_id), None)
- if not group:
- return JSONResponse({"ok": False, "error": "playbook not found"}, status_code=404)
- concern_id = str(group.get("concern_id") or "").strip()
- concern = next((c for c in list_concerns() if str(c.get("id") or "") == concern_id), None)
- if not concern:
- return JSONResponse({"ok": False, "error": "concern not found"}, status_code=404)
- cfg = load_config()
- account_id = str(concern.get("account_id") or "").strip()
- market_symbol = str(concern.get("market_symbol") or "").strip().lower()
- try:
- strategy_inventory = anyio.run(list_strategies, cfg.trader_url)
- except Exception:
- strategy_inventory = []
- concern_strategies = [
- {**s, "display_label": _strategy_display_label(s)}
- for s in strategy_inventory
- if str(s.get("account_id") or "").strip() == account_id
- and str(s.get("market_symbol") or "").strip().lower() == market_symbol
- ]
- strategies_by_id = {str(s.get("id") or "").strip(): s for s in concern_strategies if str(s.get("id") or "").strip()}
- assignments = []
- raw_assignments = list_strategy_assignments(strategy_group_id=playbook_id)
- if str(group.get("strategy_family") or "").strip().lower() == "mixed" and raw_assignments:
- inferred_family = _default_playbook_family([
- strategies_by_id.get(str(a.get("strategy_id") or "").strip(), {"strategy_type": a.get("strategy_type")})
- for a in raw_assignments
- ])
- if inferred_family != "mixed":
- upsert_strategy_group(
- id=str(group.get("id") or ""),
- concern_id=concern_id,
- name=str(group.get("name") or group.get("id") or "playbook"),
- strategy_family=inferred_family,
- decision_profile_id=str(group.get("decision_profile_id") or concern.get("decision_profile_id") or "").strip() or None,
- notes=str(group.get("notes") or "").strip() or None,
- status=str(group.get("status") or "active"),
- )
- group = {**group, "strategy_family": inferred_family}
- for assignment in raw_assignments:
- strategy = strategies_by_id.get(str(assignment.get("strategy_id") or "").strip(), {})
- assignments.append({
- **assignment,
- "strategy_label": _strategy_display_label(strategy) if strategy else str(assignment.get("strategy_id") or "").strip(),
- })
- profile_id = str(group.get("decision_profile_id") or concern.get("decision_profile_id") or "").strip()
- profile = get_decision_profile(profile_id=profile_id) if profile_id else None
- if profile:
- try:
- profile = {**profile, "config": json.loads(profile.get("config_json") or "{}")}
- except Exception:
- profile = {**profile, "config": {}}
- return JSONResponse({
- "ok": True,
- "playbook": group,
- "concern": concern,
- "decision_profile": profile,
- "assignments": assignments,
- "available_strategies": concern_strategies,
- })
- @app.post("/dashboard/concerns/{concern_id}/playbooks/create")
- async def dashboard_create_playbook(concern_id: str, request: Request) -> JSONResponse:
- concern_id = str(concern_id or "").strip()
- concern = next((c for c in list_concerns() if str(c.get("id") or "") == concern_id), None)
- if not concern:
- return JSONResponse({"ok": False, "error": "concern not found"}, status_code=404)
- payload = await request.json()
- name = str((payload or {}).get("name") or "").strip()
- strategy_family = str((payload or {}).get("strategy_family") or "manual").strip() or "manual"
- if not name:
- return JSONResponse({"ok": False, "error": "name is required"}, status_code=400)
- playbook_id = f"playbook:{concern_id}:{uuid4().hex[:8]}"
- profile_id = str(concern.get("decision_profile_id") or "").strip() or f"profile:{playbook_id}"
- if not get_decision_profile(profile_id=profile_id):
- upsert_decision_profile(
- id=profile_id,
- name=f"{name} profile",
- description="Auto-created profile for a new playbook.",
- config=_default_profile_config(strategy_family),
- status="active",
- )
- upsert_strategy_group(
- id=playbook_id,
- concern_id=concern_id,
- name=name,
- strategy_family=strategy_family,
- decision_profile_id=profile_id,
- notes="created from dashboard playbooks page",
- status="standby",
- )
- return JSONResponse({"ok": True, "playbook_id": playbook_id})
- @app.post("/dashboard/playbooks/{playbook_id}/assignments/upsert")
- async def dashboard_playbook_assignment_upsert(playbook_id: str, request: Request) -> JSONResponse:
- playbook_id = str(playbook_id or "").strip()
- group = next((g for g in list_strategy_groups() if str(g.get("id") or "") == playbook_id), None)
- if not group:
- return JSONResponse({"ok": False, "error": "playbook not found"}, status_code=404)
- payload = await request.json()
- strategy_id = str((payload or {}).get("strategy_id") or "").strip()
- strategy_type = str((payload or {}).get("strategy_type") or "").strip() or None
- role = str((payload or {}).get("role") or "member").strip() or "member"
- if not strategy_id:
- return JSONResponse({"ok": False, "error": "strategy_id is required"}, status_code=400)
- assignment_id = f"assign:{playbook_id}:{strategy_id}"
- upsert_strategy_assignment(
- id=assignment_id,
- strategy_group_id=playbook_id,
- strategy_id=strategy_id,
- strategy_type=strategy_type,
- role=role,
- status="active",
- notes="managed from dashboard playbook editor",
- )
- return JSONResponse({"ok": True, "assignment_id": assignment_id})
- @app.post("/dashboard/concerns/{concern_id}/status")
- async def dashboard_set_concern_status(concern_id: str, request: Request) -> JSONResponse:
- concern_id = str(concern_id or "").strip()
- concern = next((c for c in list_concerns() if str(c.get("id") or "") == concern_id), None)
- if not concern:
- return JSONResponse({"ok": False, "error": "concern not found"}, status_code=404)
- payload = await request.json()
- status = str((payload or {}).get("status") or "").strip().lower()
- if status not in {"active", "inactive"}:
- return JSONResponse({"ok": False, "error": "status must be active or inactive"}, status_code=400)
- account_id = str(concern.get("account_id") or "").strip()
- if status == "inactive" and account_id:
- try:
- await trader_cancel_all_orders(cfg.trader_url if (cfg := load_config()) else "", account_id)
- except Exception:
- pass
- upsert_concern(
- id=str(concern.get("id") or ""),
- account_id=account_id or None,
- market_symbol=str(concern.get("market_symbol") or "").strip() or None,
- base_currency=str(concern.get("base_currency") or "").strip() or None,
- quote_currency=str(concern.get("quote_currency") or "").strip() or None,
- strategy_id=str(concern.get("strategy_id") or "").strip() or None,
- decision_profile_id=str(concern.get("decision_profile_id") or "").strip() or None,
- source=str(concern.get("source") or "dashboard"),
- status=status,
- notes=str(concern.get("notes") or "").strip() or None,
- )
- return JSONResponse({"ok": True, "status": status})
- @app.post("/dashboard/playbooks/{playbook_id}/assignments/{assignment_id}/delete")
- def dashboard_playbook_assignment_delete(playbook_id: str, assignment_id: str) -> JSONResponse:
- assignment_id = str(assignment_id or "").strip()
- init_db()
- from .store import _connect # local import to avoid widening the public store API for one dashboard mutation
- with _connect() as conn:
- deleted = conn.execute("delete from strategy_assignments where id = ? and strategy_group_id = ?", (assignment_id, str(playbook_id or "").strip())).rowcount or 0
- if not deleted:
- return JSONResponse({"ok": False, "error": "assignment not found"}, status_code=404)
- return JSONResponse({"ok": True, "deleted": deleted})
|