| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489 |
- from __future__ import annotations
- from contextlib import asynccontextmanager
- import asyncio
- import json
- from datetime import datetime, timezone
- from uuid import uuid4
- import anyio
- from fastapi import FastAPI
- from fastapi.responses import JSONResponse
- from mcp.server.fastmcp import FastMCP
- from mcp.server.transport_security import TransportSecuritySettings
- from mcp import ClientSession
- from mcp.client.sse import sse_client
- from .config import load_config
- from .argus_client import get_regime as argus_get_regime, get_snapshot as argus_get_snapshot
- from .crypto_client import get_price, get_regime
- from .decision_engine import assess_wallet_state, make_decision
- from .narrative_engine import build_narrative
- from .state_engine import synthesize_state
- from .store import delete_concern, get_state, init_db, list_concerns, latest_cycle, latest_cycles, latest_decisions, latest_narratives, latest_observations, latest_regime_samples, prune_older_than, recent_regime_samples, recent_states_for_concern, sync_concerns_from_strategies, upsert_cycle, upsert_decision, upsert_narrative, upsert_observation, upsert_regime_sample, upsert_state, latest_states
- from .trader_client import apply_control_decision as trader_apply_control_decision, get_strategy as trader_get_strategy, list_strategies
- mcp = FastMCP(
- "hermes-mcp",
- transport_security=TransportSecuritySettings(enable_dns_rebinding_protection=False),
- )
- def _build_trader_control_payload(*, decision_id: str, concern: dict, decision: object) -> dict | None:
- action = str(getattr(decision, "action", "") or "").strip()
- target_strategy = str(getattr(decision, "target_strategy", "") or "").strip() or None
- decision_payload = getattr(decision, "payload", {}) if isinstance(getattr(decision, "payload", {}), dict) else {}
- current_primary = str(decision_payload.get("current_primary_strategy") or "").strip() or None
- trader_action: str | None = None
- risk_mode: str | None = None
- if action.startswith("replace_with_") or action.startswith("enable_"):
- trader_action = "switch"
- elif action == "suspend_grid":
- trader_action = "pause"
- target_strategy = current_primary
- elif action == "set_risk_mode":
- trader_action = "set_risk_mode"
- risk_mode = str(decision_payload.get("risk_mode") or "").strip() or None
- else:
- return None
- account_id = str(concern.get("account_id") or "").strip()
- market_symbol = str(concern.get("market_symbol") or "").strip().lower()
- concern_id = str(concern.get("id") or "").strip() or None
- reason = str(getattr(decision, "reason_summary", "") or "").strip()
- confidence = float(getattr(decision, "confidence", 0.0) or 0.0)
- payload = {
- "decision_id": decision_id,
- "concern_id": concern_id,
- "account_id": account_id,
- "market_symbol": market_symbol,
- "action": trader_action,
- "target_strategy_id": target_strategy,
- "expected_active_strategy_id": current_primary,
- "risk_mode": risk_mode,
- "reason": reason,
- "confidence": confidence,
- "dry_run": False,
- "override": False,
- "source": "hermes-mcp",
- "source_action": action,
- }
- return payload
- async def _maybe_dispatch_trader_action(*, cfg: object, decision_id: str, concern: dict, decision: object) -> dict:
- if not bool(getattr(decision, "requires_action", False)):
- return {"dispatch": "not_required"}
- payload = _build_trader_control_payload(decision_id=decision_id, concern=concern, decision=decision)
- if payload is None:
- return {
- "dispatch": "skipped",
- "reason": f"no trader action mapping for {getattr(decision, 'action', 'unknown')}",
- }
- if not bool(getattr(cfg, "hermes_allow_actions", False)):
- return {
- "dispatch": "blocked",
- "reason": "HERMES_ALLOW_ACTIONS is false",
- "payload": payload,
- }
- try:
- result = await trader_apply_control_decision(getattr(cfg, "trader_url"), payload)
- return {
- "dispatch": "sent",
- "payload": payload,
- "result": result,
- }
- except Exception as exc:
- return {
- "dispatch": "failed",
- "payload": payload,
- "error": str(exc),
- }
- @mcp.tool(description="Return Hermes current state, narrative, uncertainty, and a short self-assessment report.")
- def report() -> dict:
- state = get_state()
- return {
- "status": state.get("status", "stub"),
- "thinking": state.get("thinking", "Hermes scaffold is ready."),
- "confidence": state.get("confidence", 0.0),
- "uncertainty": state.get("uncertainty", ["no live adapters wired yet"]),
- "layers": state.get("layers", []),
- }
- @asynccontextmanager
- async def lifespan(_: FastAPI):
- cfg = load_config()
- init_db()
- try:
- sync_concerns_from_strategies(await list_strategies(cfg.trader_url))
- except Exception:
- pass
- try:
- prune_older_than(cfg.retention_days)
- except Exception:
- pass
- async def _poll_loop() -> None:
- while True:
- started = datetime.now(timezone.utc).isoformat()
- cycle_id = str(uuid4())
- concerns = list_concerns()
- try:
- strategy_inventory = await list_strategies(cfg.trader_url)
- enriched_inventory = []
- for strategy in strategy_inventory:
- instance_id = str(strategy.get("id") or "").strip()
- if not instance_id:
- enriched_inventory.append(strategy)
- continue
- try:
- detail = await trader_get_strategy(cfg.trader_url, instance_id, include_state=True, include_report=True)
- enriched_inventory.append({**strategy, **detail})
- except Exception:
- enriched_inventory.append(strategy)
- strategy_inventory = enriched_inventory
- except Exception:
- strategy_inventory = []
- upsert_cycle(id=cycle_id, started_at=started, finished_at=None, status="running", trigger="interval", notes=f"polling {len(concerns)} concerns")
- argus_snapshot: dict = {}
- argus_regime: dict = {}
- try:
- argus_snapshot = await argus_get_snapshot(cfg.argus_url)
- except Exception:
- argus_snapshot = {}
- try:
- argus_regime = await argus_get_regime(cfg.argus_url)
- except Exception:
- argus_regime = {}
- if argus_snapshot or argus_regime:
- upsert_observation(
- id=f"{cycle_id}:argus",
- cycle_id=cycle_id,
- concern_id=None,
- source="argus-mcp",
- kind="macro_snapshot",
- payload_json=json.dumps({"snapshot": argus_snapshot, "regime": argus_regime}, ensure_ascii=False),
- observed_at=datetime.now(timezone.utc).isoformat(),
- )
- for concern in concerns:
- symbol = _resolve_regime_symbol(concern)
- if not symbol:
- continue
- account_id = str(concern.get("account_id") or "").strip()
- account_info = {}
- if account_id:
- try:
- payload = await _call_exec_tool(cfg.exec_url, "get_account_info", {"account_id": account_id})
- account_info = payload if isinstance(payload, dict) else {}
- except Exception:
- account_info = {}
- current_regimes: list[dict] = []
- for timeframe in cfg.crypto_timeframes:
- regime = await get_regime(cfg.crypto_url, str(symbol), timeframe)
- current_regimes.append({**regime, "timeframe": timeframe})
- upsert_regime_sample(
- id=f"{cycle_id}:{concern['id']}:{timeframe}",
- cycle_id=cycle_id,
- concern_id=str(concern["id"]),
- timeframe=timeframe,
- regime_json=json.dumps(regime, ensure_ascii=False),
- captured_at=datetime.now(timezone.utc).isoformat(),
- )
- try:
- state = synthesize_state(
- concern=concern,
- regimes=current_regimes,
- account_info=account_info,
- argus_snapshot=argus_snapshot,
- argus_regime=argus_regime,
- )
- upsert_state(
- id=f"{cycle_id}:{concern['id']}",
- cycle_id=cycle_id,
- concern_id=str(concern["id"]),
- market_regime=state.market_regime,
- volatility_state=state.volatility_state,
- liquidity_state=state.liquidity_state,
- sentiment_pressure=state.sentiment_pressure,
- event_risk=state.event_risk,
- execution_quality=state.execution_quality,
- confidence=state.confidence,
- payload_json=json.dumps(state.payload, ensure_ascii=False),
- created_at=state.payload.get("generated_at"),
- )
- narrative = build_narrative(concern=concern, state_payload=state.payload)
- upsert_narrative(
- id=f"{cycle_id}:{concern['id']}",
- cycle_id=cycle_id,
- concern_id=str(concern["id"]),
- summary=narrative.summary,
- key_drivers_json=json.dumps(narrative.key_drivers, ensure_ascii=False),
- risk_flags_json=json.dumps(narrative.risk_flags, ensure_ascii=False),
- uncertainties_json=json.dumps(narrative.uncertainties, ensure_ascii=False),
- confidence=narrative.confidence,
- created_at=narrative.payload.get("generated_at"),
- )
- latest_price = None
- if current_regimes:
- latest_price = next((r.get("price") for r in reversed(current_regimes) if r.get("price") is not None), None)
- wallet_state = assess_wallet_state(
- account_info=account_info,
- concern=concern,
- price=float(latest_price) if latest_price is not None else None,
- strategies=strategy_inventory,
- )
- breakout_window_seconds = max(300, int(getattr(cfg, "breakout_memory_window_seconds", 900) or 900))
- recent_state_rows = recent_states_for_concern(concern_id=str(concern["id"]), since_seconds=breakout_window_seconds, limit=12)
- decision = make_decision(
- concern=concern,
- narrative_payload={
- **state.payload,
- **narrative.payload,
- "confidence": narrative.confidence,
- },
- wallet_state=wallet_state,
- strategies=strategy_inventory,
- history_window={
- "window_seconds": breakout_window_seconds,
- "recent_states": recent_state_rows,
- },
- )
- decision_id = f"{cycle_id}:{concern['id']}"
- dispatch_record = await _maybe_dispatch_trader_action(
- cfg=cfg,
- decision_id=decision_id,
- concern=concern,
- decision=decision,
- )
- decision_payload = {
- **decision.payload,
- "dispatch": dispatch_record,
- }
- upsert_decision(
- id=decision_id,
- cycle_id=cycle_id,
- concern_id=str(concern["id"]),
- mode=decision.mode,
- action=decision.action,
- target_strategy=decision.target_strategy,
- target_policy_json=json.dumps(decision_payload, ensure_ascii=False),
- reason_summary=decision.reason_summary,
- confidence=decision.confidence,
- requires_action=decision.requires_action,
- created_at=decision.payload.get("generated_at"),
- )
- except Exception:
- pass
- upsert_cycle(id=cycle_id, started_at=started, finished_at=datetime.now(timezone.utc).isoformat(), status="ok", trigger="interval", notes=f"polled {len(concerns)} concerns over {','.join(cfg.crypto_timeframes)}")
- await asyncio.sleep(max(10, cfg.cycle_seconds))
- asyncio.create_task(_poll_loop())
- yield
- app = FastAPI(title="Hermes MCP", lifespan=lifespan)
- app.mount("/mcp", mcp.sse_app())
- @app.get("/")
- def root() -> dict:
- return {"status": "ok", "mount": "/mcp/sse", "dashboard": "/dashboard"}
- @app.get("/health")
- def health() -> dict:
- return {"status": "ok", "db": "sqlite", "tool": "report"}
- @app.delete("/concerns/{concern_id}")
- def remove_concern(concern_id: str) -> JSONResponse:
- deleted = delete_concern(concern_id=concern_id)
- if not deleted.get("concerns"):
- return JSONResponse({"ok": False, "error": "concern not found", "deleted": deleted}, status_code=404)
- return JSONResponse({"ok": True, "deleted": deleted})
- def _strip_sse(url: str) -> str:
- root = url.rstrip("/")
- return root[:-8] if root.endswith("/mcp/sse") else root
- async def _call_exec_tool(exec_url: str, tool: str, arguments: dict) -> object:
- async with sse_client(exec_url) as (read_stream, write_stream):
- async with ClientSession(read_stream, write_stream) as session:
- await session.initialize()
- result = await session.call_tool(tool, arguments)
- content = getattr(result, "content", None) or []
- if not content:
- return None
- first = content[0]
- text = getattr(first, "text", None) if not isinstance(first, dict) else first.get("text")
- if text is None:
- return None
- try:
- return json.loads(text)
- except Exception:
- return text
- async def _load_exec_enrichment(exec_url: str, crypto_url: str, concerns: list[dict]) -> tuple[dict[str, dict], dict[str, dict], dict[str, float | None]]:
- account_ids = sorted({str(c.get("account_id") or "").strip() for c in concerns if str(c.get("account_id") or "").strip()})
- market_symbols = sorted({str(c.get("market_symbol") or "").strip().lower() for c in concerns if str(c.get("market_symbol") or "").strip()})
- account_payloads = await asyncio.gather(*[_call_exec_tool(exec_url, "get_account_info", {"account_id": account_id}) for account_id in account_ids])
- market_payload = await _call_exec_tool(exec_url, "list_markets", {})
- accounts_by_id = {account_id: payload for account_id, payload in zip(account_ids, account_payloads) if isinstance(payload, dict)}
- total_values: dict[str, float | None] = {}
- for account_id, payload in accounts_by_id.items():
- total_values[account_id] = await _live_total_value(crypto_url, payload)
- markets_by_symbol: dict[str, dict] = {}
- if isinstance(market_payload, list):
- for market in market_payload:
- if not isinstance(market, dict):
- continue
- symbol = str(market.get("symbol") or market.get("market_symbol") or "").strip().lower()
- if symbol in market_symbols or symbol:
- markets_by_symbol[symbol] = market
- return accounts_by_id, markets_by_symbol, total_values
- async def _live_total_value(crypto_url: str, account_info: dict) -> float | None:
- balances = account_info.get("balances")
- if not isinstance(balances, list):
- return None
- total = 0.0
- seen = False
- for item in balances:
- if not isinstance(item, dict):
- continue
- asset = str(item.get("asset_code") or item.get("asset") or "").strip().lower()
- amount = item.get("total")
- if not asset or amount is None:
- continue
- try:
- amount_f = float(amount)
- except Exception:
- continue
- seen = True
- if asset == "usd":
- total += amount_f
- continue
- price_payload = await get_price(crypto_url, asset)
- try:
- price = float(price_payload.get("price"))
- except Exception:
- price = 0.0
- total += amount_f * price
- return total if seen else None
- def _compact_balances(payload: object) -> str:
- if not isinstance(payload, list):
- return "-"
- parts: list[str] = []
- for item in payload:
- if not isinstance(item, dict):
- continue
- asset = str(item.get("asset_code") or item.get("asset") or "").upper().strip()
- total = item.get("total")
- available = item.get("available")
- value_usd = item.get("value_usd")
- if not asset:
- continue
- segment = f"{asset} {float(total):.8g}" if total is not None else asset
- if available is not None and total is not None and available != total:
- segment += f" (avail {float(available):.8g})"
- if isinstance(value_usd, (int, float)):
- segment += f" ≈ ${float(value_usd):,.2f}"
- parts.append(segment)
- return " | ".join(parts[:5]) or "-"
- def _resolve_regime_symbol(concern: dict) -> str | None:
- base = str(concern.get("base_currency") or "").strip().upper()
- if base:
- return base
- market = str(concern.get("market_symbol") or "").strip().upper().replace("/", "").replace("-", "")
- for suffix in ("USDT", "USDC", "USD", "EUR", "BTC", "ETH"):
- if market.endswith(suffix) and len(market) > len(suffix):
- return market[:-len(suffix)]
- return market or None
- @app.get("/dashboard/data")
- def dashboard_data() -> JSONResponse:
- cfg = load_config()
- concerns = list_concerns()
- accounts_by_id: dict[str, dict] = {}
- markets_by_symbol: dict[str, dict] = {}
- strategy_inventory: list[dict] = []
- strategy_inventory_available = True
- try:
- accounts_by_id, markets_by_symbol, total_values = anyio.run(_load_exec_enrichment, cfg.exec_url, cfg.crypto_url, concerns)
- except Exception:
- total_values = {}
- pass
- try:
- strategy_inventory = anyio.run(list_strategies, cfg.trader_url)
- except Exception:
- strategy_inventory = []
- strategy_inventory_available = False
- live_scopes = {
- (str(strategy.get("account_id") or "").strip(), str(strategy.get("market_symbol") or "").strip().lower())
- for strategy in strategy_inventory
- if str(strategy.get("account_id") or "").strip() and str(strategy.get("market_symbol") or "").strip()
- }
- enriched = []
- concern_lookup: dict[str, dict] = {}
- for concern in concerns:
- account_id = str(concern.get("account_id") or "").strip()
- market_symbol = str(concern.get("market_symbol") or "").strip().lower()
- account_info = accounts_by_id.get(account_id, {})
- market_info = markets_by_symbol.get(market_symbol, {})
- enriched.append({
- **concern,
- "account_display": account_info.get("display_name") or account_id,
- "balances": account_info.get("balances") or account_info.get("balance") or account_info.get("wallets") or [],
- "balance_summary": _compact_balances(account_info.get("balances") or account_info.get("balance") or account_info.get("wallets") or []),
- "total_value_usd": total_values.get(account_id) if total_values.get(account_id) is not None else account_info.get("total_value_usd"),
- "market_display": market_info.get("name") or concern.get("market_symbol") or "",
- "market_description": market_info.get("description") or "",
- "orphaned": strategy_inventory_available and (account_id, market_symbol) not in live_scopes,
- })
- concern_lookup[str(concern.get("id") or "")] = enriched[-1]
- regimes = []
- histories_by_key: dict[str, list[dict]] = {}
- for sample in recent_regime_samples(1000):
- concern_id = str(sample.get("concern_id") or "")
- timeframe = str(sample.get("timeframe") or "")
- key = f"{concern_id}::{timeframe}"
- bucket = histories_by_key.setdefault(key, [])
- if len(bucket) < 24:
- bucket.append(sample)
- for sample in latest_regime_samples(20):
- concern_meta = concern_lookup.get(str(sample.get("concern_id") or ""), {})
- regimes.append({**sample, **{
- "account_display": concern_meta.get("account_display"),
- "market_display": concern_meta.get("market_display"),
- "market_symbol": concern_meta.get("market_symbol"),
- }})
- return JSONResponse({
- "latest_cycle": latest_cycle(),
- "cycles": latest_cycles(10),
- "argus_observations": latest_observations(20, source="argus-mcp"),
- "concerns": enriched,
- "regime_samples": regimes,
- "regime_histories": histories_by_key,
- "state_samples": latest_states(20),
- "state_history": latest_states(100),
- "narrative_samples": latest_narratives(20),
- "decision_samples": latest_decisions(20),
- "decision_history": latest_decisions(100),
- })
|