from __future__ import annotations from contextlib import asynccontextmanager import asyncio import json from datetime import datetime, timezone from uuid import uuid4 import anyio from fastapi import FastAPI from fastapi.responses import JSONResponse from mcp.server.fastmcp import FastMCP from mcp.server.transport_security import TransportSecuritySettings from mcp import ClientSession from mcp.client.sse import sse_client from .config import load_config from .argus_client import get_regime as argus_get_regime, get_snapshot as argus_get_snapshot from .crypto_client import get_price, get_regime from .decision_engine import assess_wallet_state, make_decision from .narrative_engine import build_narrative from .state_engine import synthesize_state from .store import delete_concern, get_state, init_db, list_concerns, latest_cycle, latest_cycles, latest_decisions, latest_narratives, latest_observations, latest_regime_samples, prune_older_than, recent_regime_samples, recent_states_for_concern, sync_concerns_from_strategies, upsert_cycle, upsert_decision, upsert_narrative, upsert_observation, upsert_regime_sample, upsert_state, latest_states from .trader_client import apply_control_decision as trader_apply_control_decision, get_strategy as trader_get_strategy, list_strategies mcp = FastMCP( "hermes-mcp", transport_security=TransportSecuritySettings(enable_dns_rebinding_protection=False), ) def _build_trader_control_payload(*, decision_id: str, concern: dict, decision: object) -> dict | None: action = str(getattr(decision, "action", "") or "").strip() target_strategy = str(getattr(decision, "target_strategy", "") or "").strip() or None decision_payload = getattr(decision, "payload", {}) if isinstance(getattr(decision, "payload", {}), dict) else {} current_primary = str(decision_payload.get("current_primary_strategy") or "").strip() or None trader_action: str | None = None risk_mode: str | None = None if action.startswith("replace_with_") or action.startswith("enable_"): trader_action = "switch" elif action == "suspend_grid": trader_action = "pause" target_strategy = current_primary elif action == "set_risk_mode": trader_action = "set_risk_mode" risk_mode = str(decision_payload.get("risk_mode") or "").strip() or None else: return None account_id = str(concern.get("account_id") or "").strip() market_symbol = str(concern.get("market_symbol") or "").strip().lower() concern_id = str(concern.get("id") or "").strip() or None reason = str(getattr(decision, "reason_summary", "") or "").strip() confidence = float(getattr(decision, "confidence", 0.0) or 0.0) payload = { "decision_id": decision_id, "concern_id": concern_id, "account_id": account_id, "market_symbol": market_symbol, "action": trader_action, "target_strategy_id": target_strategy, "expected_active_strategy_id": current_primary, "risk_mode": risk_mode, "reason": reason, "confidence": confidence, "dry_run": False, "override": False, "source": "hermes-mcp", "source_action": action, } return payload async def _maybe_dispatch_trader_action(*, cfg: object, decision_id: str, concern: dict, decision: object) -> dict: if not bool(getattr(decision, "requires_action", False)): return {"dispatch": "not_required"} payload = _build_trader_control_payload(decision_id=decision_id, concern=concern, decision=decision) if payload is None: return { "dispatch": "skipped", "reason": f"no trader action mapping for {getattr(decision, 'action', 'unknown')}", } if not bool(getattr(cfg, "hermes_allow_actions", False)): return { "dispatch": "blocked", "reason": "HERMES_ALLOW_ACTIONS is false", "payload": payload, } try: result = await trader_apply_control_decision(getattr(cfg, "trader_url"), payload) return { "dispatch": "sent", "payload": payload, "result": result, } except Exception as exc: return { "dispatch": "failed", "payload": payload, "error": str(exc), } @mcp.tool(description="Return Hermes current state, narrative, uncertainty, and a short self-assessment report.") def report() -> dict: state = get_state() return { "status": state.get("status", "stub"), "thinking": state.get("thinking", "Hermes scaffold is ready."), "confidence": state.get("confidence", 0.0), "uncertainty": state.get("uncertainty", ["no live adapters wired yet"]), "layers": state.get("layers", []), } @asynccontextmanager async def lifespan(_: FastAPI): cfg = load_config() init_db() try: sync_concerns_from_strategies(await list_strategies(cfg.trader_url)) except Exception: pass try: prune_older_than(cfg.retention_days) except Exception: pass async def _poll_loop() -> None: while True: started = datetime.now(timezone.utc).isoformat() cycle_id = str(uuid4()) concerns = list_concerns() try: strategy_inventory = await list_strategies(cfg.trader_url) enriched_inventory = [] for strategy in strategy_inventory: instance_id = str(strategy.get("id") or "").strip() if not instance_id: enriched_inventory.append(strategy) continue try: detail = await trader_get_strategy(cfg.trader_url, instance_id, include_state=True, include_report=True) enriched_inventory.append({**strategy, **detail}) except Exception: enriched_inventory.append(strategy) strategy_inventory = enriched_inventory except Exception: strategy_inventory = [] upsert_cycle(id=cycle_id, started_at=started, finished_at=None, status="running", trigger="interval", notes=f"polling {len(concerns)} concerns") argus_snapshot: dict = {} argus_regime: dict = {} try: argus_snapshot = await argus_get_snapshot(cfg.argus_url) except Exception: argus_snapshot = {} try: argus_regime = await argus_get_regime(cfg.argus_url) except Exception: argus_regime = {} if argus_snapshot or argus_regime: upsert_observation( id=f"{cycle_id}:argus", cycle_id=cycle_id, concern_id=None, source="argus-mcp", kind="macro_snapshot", payload_json=json.dumps({"snapshot": argus_snapshot, "regime": argus_regime}, ensure_ascii=False), observed_at=datetime.now(timezone.utc).isoformat(), ) for concern in concerns: symbol = _resolve_regime_symbol(concern) if not symbol: continue account_id = str(concern.get("account_id") or "").strip() account_info = {} if account_id: try: payload = await _call_exec_tool(cfg.exec_url, "get_account_info", {"account_id": account_id}) account_info = payload if isinstance(payload, dict) else {} except Exception: account_info = {} current_regimes: list[dict] = [] for timeframe in cfg.crypto_timeframes: regime = await get_regime(cfg.crypto_url, str(symbol), timeframe) current_regimes.append({**regime, "timeframe": timeframe}) upsert_regime_sample( id=f"{cycle_id}:{concern['id']}:{timeframe}", cycle_id=cycle_id, concern_id=str(concern["id"]), timeframe=timeframe, regime_json=json.dumps(regime, ensure_ascii=False), captured_at=datetime.now(timezone.utc).isoformat(), ) try: state = synthesize_state( concern=concern, regimes=current_regimes, account_info=account_info, argus_snapshot=argus_snapshot, argus_regime=argus_regime, ) upsert_state( id=f"{cycle_id}:{concern['id']}", cycle_id=cycle_id, concern_id=str(concern["id"]), market_regime=state.market_regime, volatility_state=state.volatility_state, liquidity_state=state.liquidity_state, sentiment_pressure=state.sentiment_pressure, event_risk=state.event_risk, execution_quality=state.execution_quality, confidence=state.confidence, payload_json=json.dumps(state.payload, ensure_ascii=False), created_at=state.payload.get("generated_at"), ) narrative = build_narrative(concern=concern, state_payload=state.payload) upsert_narrative( id=f"{cycle_id}:{concern['id']}", cycle_id=cycle_id, concern_id=str(concern["id"]), summary=narrative.summary, key_drivers_json=json.dumps(narrative.key_drivers, ensure_ascii=False), risk_flags_json=json.dumps(narrative.risk_flags, ensure_ascii=False), uncertainties_json=json.dumps(narrative.uncertainties, ensure_ascii=False), confidence=narrative.confidence, created_at=narrative.payload.get("generated_at"), ) latest_price = None if current_regimes: latest_price = next((r.get("price") for r in reversed(current_regimes) if r.get("price") is not None), None) wallet_state = assess_wallet_state( account_info=account_info, concern=concern, price=float(latest_price) if latest_price is not None else None, strategies=strategy_inventory, ) breakout_window_seconds = max(300, int(getattr(cfg, "breakout_memory_window_seconds", 900) or 900)) recent_state_rows = recent_states_for_concern(concern_id=str(concern["id"]), since_seconds=breakout_window_seconds, limit=12) decision = make_decision( concern=concern, narrative_payload={ **state.payload, **narrative.payload, "confidence": narrative.confidence, }, wallet_state=wallet_state, strategies=strategy_inventory, history_window={ "window_seconds": breakout_window_seconds, "recent_states": recent_state_rows, }, ) decision_id = f"{cycle_id}:{concern['id']}" dispatch_record = await _maybe_dispatch_trader_action( cfg=cfg, decision_id=decision_id, concern=concern, decision=decision, ) decision_payload = { **decision.payload, "dispatch": dispatch_record, } upsert_decision( id=decision_id, cycle_id=cycle_id, concern_id=str(concern["id"]), mode=decision.mode, action=decision.action, target_strategy=decision.target_strategy, target_policy_json=json.dumps(decision_payload, ensure_ascii=False), reason_summary=decision.reason_summary, confidence=decision.confidence, requires_action=decision.requires_action, created_at=decision.payload.get("generated_at"), ) except Exception: pass upsert_cycle(id=cycle_id, started_at=started, finished_at=datetime.now(timezone.utc).isoformat(), status="ok", trigger="interval", notes=f"polled {len(concerns)} concerns over {','.join(cfg.crypto_timeframes)}") await asyncio.sleep(max(10, cfg.cycle_seconds)) asyncio.create_task(_poll_loop()) yield app = FastAPI(title="Hermes MCP", lifespan=lifespan) app.mount("/mcp", mcp.sse_app()) @app.get("/") def root() -> dict: return {"status": "ok", "mount": "/mcp/sse", "dashboard": "/dashboard"} @app.get("/health") def health() -> dict: return {"status": "ok", "db": "sqlite", "tool": "report"} @app.delete("/concerns/{concern_id}") def remove_concern(concern_id: str) -> JSONResponse: deleted = delete_concern(concern_id=concern_id) if not deleted.get("concerns"): return JSONResponse({"ok": False, "error": "concern not found", "deleted": deleted}, status_code=404) return JSONResponse({"ok": True, "deleted": deleted}) def _strip_sse(url: str) -> str: root = url.rstrip("/") return root[:-8] if root.endswith("/mcp/sse") else root async def _call_exec_tool(exec_url: str, tool: str, arguments: dict) -> object: async with sse_client(exec_url) as (read_stream, write_stream): async with ClientSession(read_stream, write_stream) as session: await session.initialize() result = await session.call_tool(tool, arguments) content = getattr(result, "content", None) or [] if not content: return None first = content[0] text = getattr(first, "text", None) if not isinstance(first, dict) else first.get("text") if text is None: return None try: return json.loads(text) except Exception: return text async def _load_exec_enrichment(exec_url: str, crypto_url: str, concerns: list[dict]) -> tuple[dict[str, dict], dict[str, dict], dict[str, float | None]]: account_ids = sorted({str(c.get("account_id") or "").strip() for c in concerns if str(c.get("account_id") or "").strip()}) market_symbols = sorted({str(c.get("market_symbol") or "").strip().lower() for c in concerns if str(c.get("market_symbol") or "").strip()}) account_payloads = await asyncio.gather(*[_call_exec_tool(exec_url, "get_account_info", {"account_id": account_id}) for account_id in account_ids]) market_payload = await _call_exec_tool(exec_url, "list_markets", {}) accounts_by_id = {account_id: payload for account_id, payload in zip(account_ids, account_payloads) if isinstance(payload, dict)} total_values: dict[str, float | None] = {} for account_id, payload in accounts_by_id.items(): total_values[account_id] = await _live_total_value(crypto_url, payload) markets_by_symbol: dict[str, dict] = {} if isinstance(market_payload, list): for market in market_payload: if not isinstance(market, dict): continue symbol = str(market.get("symbol") or market.get("market_symbol") or "").strip().lower() if symbol in market_symbols or symbol: markets_by_symbol[symbol] = market return accounts_by_id, markets_by_symbol, total_values async def _live_total_value(crypto_url: str, account_info: dict) -> float | None: balances = account_info.get("balances") if not isinstance(balances, list): return None total = 0.0 seen = False for item in balances: if not isinstance(item, dict): continue asset = str(item.get("asset_code") or item.get("asset") or "").strip().lower() amount = item.get("total") if not asset or amount is None: continue try: amount_f = float(amount) except Exception: continue seen = True if asset == "usd": total += amount_f continue price_payload = await get_price(crypto_url, asset) try: price = float(price_payload.get("price")) except Exception: price = 0.0 total += amount_f * price return total if seen else None def _compact_balances(payload: object) -> str: if not isinstance(payload, list): return "-" parts: list[str] = [] for item in payload: if not isinstance(item, dict): continue asset = str(item.get("asset_code") or item.get("asset") or "").upper().strip() total = item.get("total") available = item.get("available") value_usd = item.get("value_usd") if not asset: continue segment = f"{asset} {float(total):.8g}" if total is not None else asset if available is not None and total is not None and available != total: segment += f" (avail {float(available):.8g})" if isinstance(value_usd, (int, float)): segment += f" ≈ ${float(value_usd):,.2f}" parts.append(segment) return " | ".join(parts[:5]) or "-" def _resolve_regime_symbol(concern: dict) -> str | None: base = str(concern.get("base_currency") or "").strip().upper() if base: return base market = str(concern.get("market_symbol") or "").strip().upper().replace("/", "").replace("-", "") for suffix in ("USDT", "USDC", "USD", "EUR", "BTC", "ETH"): if market.endswith(suffix) and len(market) > len(suffix): return market[:-len(suffix)] return market or None @app.get("/dashboard/data") def dashboard_data() -> JSONResponse: cfg = load_config() concerns = list_concerns() accounts_by_id: dict[str, dict] = {} markets_by_symbol: dict[str, dict] = {} strategy_inventory: list[dict] = [] strategy_inventory_available = True try: accounts_by_id, markets_by_symbol, total_values = anyio.run(_load_exec_enrichment, cfg.exec_url, cfg.crypto_url, concerns) except Exception: total_values = {} pass try: strategy_inventory = anyio.run(list_strategies, cfg.trader_url) except Exception: strategy_inventory = [] strategy_inventory_available = False live_scopes = { (str(strategy.get("account_id") or "").strip(), str(strategy.get("market_symbol") or "").strip().lower()) for strategy in strategy_inventory if str(strategy.get("account_id") or "").strip() and str(strategy.get("market_symbol") or "").strip() } enriched = [] concern_lookup: dict[str, dict] = {} for concern in concerns: account_id = str(concern.get("account_id") or "").strip() market_symbol = str(concern.get("market_symbol") or "").strip().lower() account_info = accounts_by_id.get(account_id, {}) market_info = markets_by_symbol.get(market_symbol, {}) enriched.append({ **concern, "account_display": account_info.get("display_name") or account_id, "balances": account_info.get("balances") or account_info.get("balance") or account_info.get("wallets") or [], "balance_summary": _compact_balances(account_info.get("balances") or account_info.get("balance") or account_info.get("wallets") or []), "total_value_usd": total_values.get(account_id) if total_values.get(account_id) is not None else account_info.get("total_value_usd"), "market_display": market_info.get("name") or concern.get("market_symbol") or "", "market_description": market_info.get("description") or "", "orphaned": strategy_inventory_available and (account_id, market_symbol) not in live_scopes, }) concern_lookup[str(concern.get("id") or "")] = enriched[-1] regimes = [] histories_by_key: dict[str, list[dict]] = {} for sample in recent_regime_samples(1000): concern_id = str(sample.get("concern_id") or "") timeframe = str(sample.get("timeframe") or "") key = f"{concern_id}::{timeframe}" bucket = histories_by_key.setdefault(key, []) if len(bucket) < 24: bucket.append(sample) for sample in latest_regime_samples(20): concern_meta = concern_lookup.get(str(sample.get("concern_id") or ""), {}) regimes.append({**sample, **{ "account_display": concern_meta.get("account_display"), "market_display": concern_meta.get("market_display"), "market_symbol": concern_meta.get("market_symbol"), }}) return JSONResponse({ "latest_cycle": latest_cycle(), "cycles": latest_cycles(10), "argus_observations": latest_observations(20, source="argus-mcp"), "concerns": enriched, "regime_samples": regimes, "regime_histories": histories_by_key, "state_samples": latest_states(20), "state_history": latest_states(100), "narrative_samples": latest_narratives(20), "decision_samples": latest_decisions(20), "decision_history": latest_decisions(100), })