bitstamp_ws.py 2.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. from __future__ import annotations
  2. import asyncio
  3. import json
  4. from datetime import datetime, timezone
  5. import websockets
  6. from . import repo
  7. WS_URL = "wss://ws.bitstamp.net"
  8. WS_RECONNECT_SECONDS = 5
  9. WS_HEARTBEAT_SECONDS = 15
  10. QUOTE_CURRENCY = "usd"
  11. async def ws_main(stop_event: asyncio.Event) -> None:
  12. while not stop_event.is_set():
  13. try:
  14. await _run_once(stop_event)
  15. except asyncio.CancelledError:
  16. raise
  17. except Exception:
  18. await asyncio.sleep(WS_RECONNECT_SECONDS)
  19. def get_watched_markets() -> list[str]:
  20. accounts = repo.list_accounts(enabled_only=True)
  21. assets = set()
  22. for account in accounts:
  23. account_id = account["id"]
  24. # Prefer normalized balances if present; fall back to raw payload later.
  25. # This stays exchange-specific to Bitstamp.
  26. info = None
  27. try:
  28. from .services_bitstamp import fetch_account_balance
  29. info = fetch_account_balance(account_id)
  30. except Exception:
  31. continue
  32. for item in info.get("balances", []):
  33. asset = str(item.get("asset_code", "")).lower()
  34. if asset and asset != QUOTE_CURRENCY:
  35. assets.add(asset)
  36. return sorted(f"{asset}{QUOTE_CURRENCY}" for asset in assets)
  37. async def _run_once(stop_event: asyncio.Event) -> None:
  38. async with websockets.connect(WS_URL, ping_interval=None) as ws:
  39. markets = get_watched_markets()
  40. for market in markets:
  41. await ws.send(json.dumps({"event": "bts:subscribe", "data": {"channel": f"live_trades_{market}"}}))
  42. last_heartbeat = asyncio.get_event_loop().time()
  43. while not stop_event.is_set():
  44. try:
  45. message = await asyncio.wait_for(ws.recv(), timeout=WS_HEARTBEAT_SECONDS)
  46. except asyncio.TimeoutError:
  47. await ws.send(json.dumps({"event": "bts:heartbeat"}))
  48. last_heartbeat = asyncio.get_event_loop().time()
  49. continue
  50. payload = json.loads(message)
  51. _handle_message(payload)
  52. def _handle_message(payload: dict) -> None:
  53. event = payload.get("event")
  54. data = payload.get("data") or {}
  55. if event != "trade":
  56. return
  57. channel = str(payload.get("channel", ""))
  58. market = channel.replace("live_trades_", "")
  59. price = data.get("price")
  60. if not market or price is None:
  61. return
  62. captured_at = datetime.now(timezone.utc).isoformat()
  63. from .storage import get_connection
  64. with get_connection() as conn:
  65. conn.execute(
  66. """
  67. INSERT INTO bitstamp_live_prices (market, price, payload_json, captured_at)
  68. VALUES (?, ?, ?, ?)
  69. ON CONFLICT(market) DO UPDATE SET
  70. price=excluded.price,
  71. payload_json=excluded.payload_json,
  72. captured_at=excluded.captured_at
  73. """,
  74. (market, str(price), json.dumps(payload), captured_at),
  75. )
  76. conn.commit()