from __future__ import annotations import asyncio import json from datetime import datetime, timezone import websockets from .bitstamp import BitstampClient from .repo import get_account, get_account_secrets from .storage import get_connection WS_URL = "wss://ws.bitstamp.net" WS_RECONNECT_SECONDS = 5 WS_HEARTBEAT_SECONDS = 15 async def private_ws_main(stop_event: asyncio.Event) -> None: while not stop_event.is_set(): try: await _run_once(stop_event) except asyncio.CancelledError: raise except Exception: await asyncio.sleep(WS_RECONNECT_SECONDS) async def _run_once(stop_event: asyncio.Event) -> None: accounts = [a for a in __import__("exec_mcp.repo", fromlist=["list_accounts"]).list_accounts(enabled_only=True) if a["venue"] == "bitstamp"] if not accounts: await asyncio.sleep(WS_RECONNECT_SECONDS) return async with websockets.connect(WS_URL, ping_interval=None) as ws: for account in accounts: token_info = _get_token(account["id"]) await ws.send(json.dumps({"event": "bts:subscribe", "data": {"channel": f"private-my_orders_{account['venue_account_ref']}-{token_info['user_id']}", "auth": token_info["token"]}})) while not stop_event.is_set(): try: message = await asyncio.wait_for(ws.recv(), timeout=WS_HEARTBEAT_SECONDS) except asyncio.TimeoutError: await ws.send(json.dumps({"event": "bts:heartbeat"})) continue _handle_message(json.loads(message)) def _get_token(account_id: str) -> dict: account = get_account(account_id) secrets = get_account_secrets(account_id) client = BitstampClient(account["venue_account_ref"], secrets["api_key"], secrets["api_secret"]) return client.websocket_token() def _handle_message(payload: dict) -> None: event = payload.get("event") if not event: return data = payload.get("data") or {} order_id = data.get("id") or data.get("order_id") if not order_id: return captured_at = datetime.now(timezone.utc).isoformat() with get_connection() as conn: conn.execute( "UPDATE order_records SET status = ?, payload_json = ?, updated_at = ? WHERE bitstamp_order_id = ?", (str(event), json.dumps(payload), captured_at, str(order_id)), ) conn.commit()