| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192 |
- from __future__ import annotations
- import asyncio
- import json
- from datetime import datetime, timezone
- import websockets
- from . import repo
- WS_URL = "wss://ws.bitstamp.net"
- WS_RECONNECT_SECONDS = 5
- WS_HEARTBEAT_SECONDS = 15
- QUOTE_CURRENCY = "usd"
- async def ws_main(stop_event: asyncio.Event) -> None:
- while not stop_event.is_set():
- try:
- await _run_once(stop_event)
- except asyncio.CancelledError:
- raise
- except Exception:
- await asyncio.sleep(WS_RECONNECT_SECONDS)
- def get_watched_markets() -> list[str]:
- accounts = repo.list_accounts(enabled_only=True)
- assets = set()
- for account in accounts:
- account_id = account["id"]
- # Prefer normalized balances if present; fall back to raw payload later.
- # This stays exchange-specific to Bitstamp.
- info = None
- try:
- from .services_bitstamp import fetch_account_balance
- info = fetch_account_balance(account_id)
- except Exception:
- continue
- for item in info.get("balances", []):
- asset = str(item.get("asset_code", "")).lower()
- if asset and asset != QUOTE_CURRENCY:
- assets.add(asset)
- return sorted(f"{asset}{QUOTE_CURRENCY}" for asset in assets)
- async def _run_once(stop_event: asyncio.Event) -> None:
- async with websockets.connect(WS_URL, ping_interval=None) as ws:
- markets = get_watched_markets()
- for market in markets:
- await ws.send(json.dumps({"event": "bts:subscribe", "data": {"channel": f"live_trades_{market}"}}))
- last_heartbeat = asyncio.get_event_loop().time()
- while not stop_event.is_set():
- try:
- message = await asyncio.wait_for(ws.recv(), timeout=WS_HEARTBEAT_SECONDS)
- except asyncio.TimeoutError:
- await ws.send(json.dumps({"event": "bts:heartbeat"}))
- last_heartbeat = asyncio.get_event_loop().time()
- continue
- payload = json.loads(message)
- _handle_message(payload)
- def _handle_message(payload: dict) -> None:
- event = payload.get("event")
- data = payload.get("data") or {}
- if event != "trade":
- return
- channel = str(payload.get("channel", ""))
- market = channel.replace("live_trades_", "")
- price = data.get("price")
- if not market or price is None:
- return
- captured_at = datetime.now(timezone.utc).isoformat()
- from .storage import get_connection
- with get_connection() as conn:
- conn.execute(
- """
- INSERT INTO bitstamp_live_prices (market, price, payload_json, captured_at)
- VALUES (?, ?, ?, ?)
- ON CONFLICT(market) DO UPDATE SET
- price=excluded.price,
- payload_json=excluded.payload_json,
- captured_at=excluded.captured_at
- """,
- (market, str(price), json.dumps(payload), captured_at),
- )
- conn.commit()
|