from __future__ import annotations import asyncio import json from datetime import datetime, timezone import websockets from . import repo WS_URL = "wss://ws.bitstamp.net" WS_RECONNECT_SECONDS = 5 WS_HEARTBEAT_SECONDS = 15 QUOTE_CURRENCY = "usd" async def ws_main(stop_event: asyncio.Event) -> None: while not stop_event.is_set(): try: await _run_once(stop_event) except asyncio.CancelledError: raise except Exception: await asyncio.sleep(WS_RECONNECT_SECONDS) def get_watched_markets() -> list[str]: accounts = repo.list_accounts(enabled_only=True) assets = set() for account in accounts: account_id = account["id"] # Prefer normalized balances if present; fall back to raw payload later. # This stays exchange-specific to Bitstamp. info = None try: from .services_bitstamp import fetch_account_balance info = fetch_account_balance(account_id) except Exception: continue for item in info.get("balances", []): asset = str(item.get("asset_code", "")).lower() if asset and asset != QUOTE_CURRENCY: assets.add(asset) return sorted(f"{asset}{QUOTE_CURRENCY}" for asset in assets) async def _run_once(stop_event: asyncio.Event) -> None: async with websockets.connect(WS_URL, ping_interval=None) as ws: markets = get_watched_markets() for market in markets: await ws.send(json.dumps({"event": "bts:subscribe", "data": {"channel": f"live_trades_{market}"}})) last_heartbeat = asyncio.get_event_loop().time() while not stop_event.is_set(): try: message = await asyncio.wait_for(ws.recv(), timeout=WS_HEARTBEAT_SECONDS) except asyncio.TimeoutError: await ws.send(json.dumps({"event": "bts:heartbeat"})) last_heartbeat = asyncio.get_event_loop().time() continue payload = json.loads(message) _handle_message(payload) def _handle_message(payload: dict) -> None: event = payload.get("event") data = payload.get("data") or {} if event != "trade": return channel = str(payload.get("channel", "")) market = channel.replace("live_trades_", "") price = data.get("price") if not market or price is None: return captured_at = datetime.now(timezone.utc).isoformat() from .storage import get_connection with get_connection() as conn: conn.execute( """ INSERT INTO bitstamp_live_prices (market, price, payload_json, captured_at) VALUES (?, ?, ?, ?) ON CONFLICT(market) DO UPDATE SET price=excluded.price, payload_json=excluded.payload_json, captured_at=excluded.captured_at """, (market, str(price), json.dumps(payload), captured_at), ) conn.commit()