| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869 |
- from __future__ import annotations
- import asyncio
- import json
- from datetime import datetime, timezone
- import websockets
- from .bitstamp import BitstampClient
- from .repo import get_account, get_account_secrets
- from .storage import get_connection
- WS_URL = "wss://ws.bitstamp.net"
- WS_RECONNECT_SECONDS = 5
- WS_HEARTBEAT_SECONDS = 15
- async def private_ws_main(stop_event: asyncio.Event) -> None:
- while not stop_event.is_set():
- try:
- await _run_once(stop_event)
- except asyncio.CancelledError:
- raise
- except Exception:
- await asyncio.sleep(WS_RECONNECT_SECONDS)
- async def _run_once(stop_event: asyncio.Event) -> None:
- accounts = [a for a in __import__("exec_mcp.repo", fromlist=["list_accounts"]).list_accounts(enabled_only=True) if a["venue"] == "bitstamp"]
- if not accounts:
- await asyncio.sleep(WS_RECONNECT_SECONDS)
- return
- async with websockets.connect(WS_URL, ping_interval=None) as ws:
- for account in accounts:
- token_info = _get_token(account["id"])
- await ws.send(json.dumps({"event": "bts:subscribe", "data": {"channel": f"private-my_orders_{account['venue_account_ref']}-{token_info['user_id']}", "auth": token_info["token"]}}))
- while not stop_event.is_set():
- try:
- message = await asyncio.wait_for(ws.recv(), timeout=WS_HEARTBEAT_SECONDS)
- except asyncio.TimeoutError:
- await ws.send(json.dumps({"event": "bts:heartbeat"}))
- continue
- _handle_message(json.loads(message))
- def _get_token(account_id: str) -> dict:
- account = get_account(account_id)
- secrets = get_account_secrets(account_id)
- client = BitstampClient(account["venue_account_ref"], secrets["api_key"], secrets["api_secret"])
- return client.websocket_token()
- def _handle_message(payload: dict) -> None:
- event = payload.get("event")
- if not event:
- return
- data = payload.get("data") or {}
- order_id = data.get("id") or data.get("order_id")
- if not order_id:
- return
- captured_at = datetime.now(timezone.utc).isoformat()
- with get_connection() as conn:
- conn.execute(
- "UPDATE order_records SET status = ?, payload_json = ?, updated_at = ? WHERE bitstamp_order_id = ?",
- (str(event), json.dumps(payload), captured_at, str(order_id)),
- )
- conn.commit()
|