bitstamp_private_ws.py 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. from __future__ import annotations
  2. import asyncio
  3. import json
  4. from datetime import datetime, timezone
  5. import websockets
  6. from .bitstamp import BitstampClient
  7. from .repo import get_account, get_account_secrets
  8. from .storage import get_connection
  9. WS_URL = "wss://ws.bitstamp.net"
  10. WS_RECONNECT_SECONDS = 5
  11. WS_HEARTBEAT_SECONDS = 15
  12. async def private_ws_main(stop_event: asyncio.Event) -> None:
  13. while not stop_event.is_set():
  14. try:
  15. await _run_once(stop_event)
  16. except asyncio.CancelledError:
  17. raise
  18. except Exception:
  19. await asyncio.sleep(WS_RECONNECT_SECONDS)
  20. async def _run_once(stop_event: asyncio.Event) -> None:
  21. accounts = [a for a in __import__("exec_mcp.repo", fromlist=["list_accounts"]).list_accounts(enabled_only=True) if a["venue"] == "bitstamp"]
  22. if not accounts:
  23. await asyncio.sleep(WS_RECONNECT_SECONDS)
  24. return
  25. async with websockets.connect(WS_URL, ping_interval=None) as ws:
  26. for account in accounts:
  27. token_info = _get_token(account["id"])
  28. await ws.send(json.dumps({"event": "bts:subscribe", "data": {"channel": f"private-my_orders_{account['venue_account_ref']}-{token_info['user_id']}", "auth": token_info["token"]}}))
  29. while not stop_event.is_set():
  30. try:
  31. message = await asyncio.wait_for(ws.recv(), timeout=WS_HEARTBEAT_SECONDS)
  32. except asyncio.TimeoutError:
  33. await ws.send(json.dumps({"event": "bts:heartbeat"}))
  34. continue
  35. _handle_message(json.loads(message))
  36. def _get_token(account_id: str) -> dict:
  37. account = get_account(account_id)
  38. secrets = get_account_secrets(account_id)
  39. client = BitstampClient(account["venue_account_ref"], secrets["api_key"], secrets["api_secret"])
  40. return client.websocket_token()
  41. def _handle_message(payload: dict) -> None:
  42. event = payload.get("event")
  43. if not event:
  44. return
  45. data = payload.get("data") or {}
  46. order_id = data.get("id") or data.get("order_id")
  47. if not order_id:
  48. return
  49. captured_at = datetime.now(timezone.utc).isoformat()
  50. with get_connection() as conn:
  51. conn.execute(
  52. "UPDATE order_records SET status = ?, payload_json = ?, updated_at = ? WHERE bitstamp_order_id = ?",
  53. (str(event), json.dumps(payload), captured_at, str(order_id)),
  54. )
  55. conn.commit()