from __future__ import annotations import json import os import time import threading from datetime import datetime, timezone, timedelta from decimal import Decimal, ROUND_DOWN from uuid import uuid4 from fastapi import HTTPException from .bitstamp import BitstampClient, BitstampError from .services_bitstamp import get_bitstamp_client, clear_bitstamp_trading_client from .bitstamp_metadata import load_market_by_symbol from .storage import get_connection OPEN_ORDER_STATUSES = {"open", "new", "partially_filled"} _CANCEL_BREAKER_LOCK = threading.Lock() _CANCEL_BREAKER_NEXT_ALLOWED: dict[str, float] = {} _CANCEL_BREAKER_SECONDS = 3.0 def _cancel_breaker_is_open(account_id: str) -> bool: with _CANCEL_BREAKER_LOCK: return _CANCEL_BREAKER_NEXT_ALLOWED.get(account_id, 0.0) > time.monotonic() def _cancel_breaker_trip(account_id: str) -> None: with _CANCEL_BREAKER_LOCK: _CANCEL_BREAKER_NEXT_ALLOWED[account_id] = time.monotonic() + _CANCEL_BREAKER_SECONDS def _looks_like_auth_failure(detail: str) -> bool: text = str(detail or "").lower() return "403" in text and ("authentication failed" in text or "nonce" in text or "timestamp" in text) def _bitstamp_call_delay_seconds() -> float: try: return max(int(os.getenv("BITSTAMP_CALL_DELAY_MS", "250")) / 1000.0, 0.0) except Exception: return 0.25 def _utc_now() -> str: return datetime.now(timezone.utc).isoformat() def _get_client(account_id: str) -> BitstampClient: return BitstampClientWrapper(account_id) class BitstampClientWrapper: def __init__(self, account_id: str): self.trading = get_bitstamp_client(account_id).trading def _invalidate_client(account_id: str) -> None: clear_bitstamp_trading_client(account_id) def _format_decimal(value, decimals: int) -> str: quant = Decimal("1").scaleb(-decimals) return str(Decimal(str(value)).quantize(quant, rounding=ROUND_DOWN)) def _normalize_client_id(client_id: str | None) -> str | None: if client_id is None: return None if not isinstance(client_id, str): raise HTTPException(status_code=400, detail="client_id must be a string") client_id = client_id.strip() if not client_id: raise HTTPException(status_code=400, detail="client_id must not be empty") return client_id def _normalize_status(status) -> str: return str(status or "unknown").strip().lower().replace(" ", "_") def _set_local_order_status(*, bitstamp_order_id: str, status: str) -> None: with get_connection() as conn: conn.execute( "UPDATE order_records SET status = ?, updated_at = ? WHERE bitstamp_order_id = ?", (_normalize_status(status), _utc_now(), bitstamp_order_id), ) conn.commit() def _validate_order_shape(market: str, side: str, order_type: str, amount, price) -> tuple[str, str | None, dict]: meta = load_market_by_symbol(market) if meta is None: raise HTTPException(status_code=400, detail=f"unknown market {market}") base_decimals = int(meta.get("base_decimals", 8)) counter_decimals = int(meta.get("counter_decimals", 2)) minimum_order_value = Decimal(str(meta.get("minimum_order_value", "0"))) amount_dec = Decimal(str(amount)) amount_fmt = _format_decimal(amount, base_decimals) if side == "buy" and order_type == "market": if Decimal(amount_fmt) < minimum_order_value: raise HTTPException(status_code=400, detail=f"Minimum order size is {minimum_order_value} {meta.get('counter_currency', 'USD')}.") if price is not None: price_fmt = _format_decimal(price, counter_decimals) if Decimal(price_fmt) <= 0: raise HTTPException(status_code=400, detail="price must be positive") else: price_fmt = None if amount_dec <= 0: raise HTTPException(status_code=400, detail="amount must be positive") return amount_fmt, price_fmt, meta def place_order(*, account_id: str, market: str, side: str, order_type: str, amount, price=None, expire_time: int | None = None, client_id: str | None = None, client_order_id: str | None = None) -> dict: client = _get_client(account_id) client_id = _normalize_client_id(client_id) side = side.lower() order_type = order_type.lower() market = market.lower() if len(market) < 6: raise HTTPException(status_code=400, detail="market must look like xrpusd") base = market[:-3] quote = market[-3:] expire_timestamp = None if expire_time is not None: expire_timestamp = int((datetime.now(timezone.utc) + timedelta(seconds=expire_time)).timestamp() * 1000) amount, price, _meta = _validate_order_shape(market, side, order_type, amount, price) try: if order_type not in {"market", "limit"}: raise HTTPException(status_code=400, detail="invalid order_type") if side == "buy": if expire_timestamp is None: result = client.trading.buy_order(amount=amount, price=price or "0", base=base, quote=quote) else: result = client.trading.buy_gtd_order(amount=amount, price=price or "0", base=base, quote=quote, expire_time=expire_timestamp) elif side == "sell": if expire_timestamp is None: result = client.trading.sell_order(amount=amount, price=price or "0", base=base, quote=quote) else: result = client.trading.sell_gtd_order(amount=amount, price=price or "0", base=base, quote=quote, expire_time=expire_timestamp) else: raise HTTPException(status_code=400, detail="invalid side") except BitstampError as exc: detail = str(exc) if _looks_like_auth_failure(detail): _invalidate_client(account_id) raise HTTPException(status_code=400, detail=detail) from exc bitstamp_order_id = str(result.get("id") or result.get("order_id") or "") if not bitstamp_order_id: return {"ok": False, "error": "missing Bitstamp order id", "details": {"raw": result}} record_id = str(uuid4()) now = _utc_now() with get_connection() as conn: conn.execute( """ INSERT INTO order_records (id, account_id, market, side, order_type, amount, price, expire_time, status, bitstamp_order_id, client_id, client_order_id, raw_json, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, (record_id, account_id, market, side, order_type, amount, price, expire_time, _normalize_status(result.get("status", "open")), bitstamp_order_id, client_id, client_order_id or result.get("client_order_id"), json.dumps(result), now, now), ) conn.commit() return {"ok": True, "bitstamp_order_id": bitstamp_order_id, "record_id": record_id, "status": str(result.get("status", "open")), "raw": result} def get_open_orders(*, account_id: str, client_id: str | None = None) -> dict: client_id = _normalize_client_id(client_id) client = _get_client(account_id) with get_connection() as conn: if client_id is None: rows = conn.execute( """ SELECT id, account_id, market, side, order_type, amount, price, expire_time, status, bitstamp_order_id, client_id, client_order_id, raw_json, created_at, updated_at FROM order_records WHERE account_id = ? AND lower(status) IN ('open', 'new', 'partially_filled') ORDER BY created_at DESC """, (account_id,), ).fetchall() else: rows = conn.execute( """ SELECT id, account_id, market, side, order_type, amount, price, expire_time, status, bitstamp_order_id, client_id, client_order_id, raw_json, created_at, updated_at FROM order_records WHERE account_id = ? AND client_id = ? AND lower(status) IN ('open', 'new', 'partially_filled') ORDER BY created_at DESC """, (account_id, client_id), ).fetchall() order_status_v2 = getattr(getattr(client, "trading", None), "order_status_v2", None) if order_status_v2 is None: return {"ok": True, "client_id": client_id, "orders": [dict(row) for row in rows]} orders = [] delay = _bitstamp_call_delay_seconds() for row in rows: order = dict(row) bitstamp_order_id = order.get("bitstamp_order_id") if not bitstamp_order_id: continue try: result = order_status_v2(order_id=str(bitstamp_order_id), omit_transactions=True) status = _normalize_status(result.get("status", "unknown")) if status not in OPEN_ORDER_STATUSES: _set_local_order_status(bitstamp_order_id=str(bitstamp_order_id), status=status) continue order["status"] = status order["raw_json"] = json.dumps(result) orders.append(order) except Exception as exc: msg = str(exc) if "not found" in msg.lower(): _set_local_order_status(bitstamp_order_id=str(bitstamp_order_id), status="missing") continue # Best effort: keep the local record when the exchange cannot be queried. orders.append(order) if delay > 0: time.sleep(delay) return {"ok": True, "client_id": client_id, "orders": orders} def cancel_all_orders(*, account_id: str, client_id: str | None = None) -> dict: client_id = _normalize_client_id(client_id) orders = get_open_orders(account_id=account_id, client_id=client_id)["orders"] results = [] delay = _bitstamp_call_delay_seconds() for order in orders: if _cancel_breaker_is_open(account_id): results.append({"ok": False, "order_id": order.get("bitstamp_order_id"), "error": "cancel breaker active", "status": "deferred"}) continue bitstamp_order_id = order.get("bitstamp_order_id") if not bitstamp_order_id: results.append({"order_id": None, "ok": False, "error": "missing bitstamp_order_id"}) continue try: results.append(cancel_order(account_id=account_id, order_id=bitstamp_order_id)) except HTTPException as exc: detail = str(exc.detail) if _looks_like_auth_failure(detail): _cancel_breaker_trip(account_id) results.append({"ok": False, "order_id": bitstamp_order_id, "error": detail, "status": "deferred"}) break if "not found" in detail.lower(): _set_local_order_status(bitstamp_order_id=bitstamp_order_id, status="missing") results.append({"ok": False, "order_id": bitstamp_order_id, "error": detail, "status": "missing"}) continue raise if delay > 0: time.sleep(delay) return {"ok": True, "client_id": client_id, "cancelled": results, "count": len(results)} def query_order(*, account_id: str, order_id, client_order_id: str | None = None, omit_transactions: bool | None = None) -> dict: order_id = str(order_id) client = _get_client(account_id) try: result = client.trading.order_status_v2(order_id=order_id, client_order_id=client_order_id, omit_transactions=omit_transactions) except BitstampError as exc: detail = str(exc) if _looks_like_auth_failure(detail): _invalidate_client(account_id) raise HTTPException(status_code=400, detail=detail) from exc with get_connection() as conn: conn.execute( "UPDATE order_records SET status = ?, raw_json = ?, updated_at = ? WHERE bitstamp_order_id = ?", (_normalize_status(result.get("status", "unknown")), json.dumps(result), _utc_now(), order_id), ) conn.commit() return {"ok": True, "order_id": order_id, "raw": result} def cancel_order(*, account_id: str, order_id) -> dict: order_id = str(order_id) if _cancel_breaker_is_open(account_id): raise HTTPException(status_code=503, detail="cancel breaker active, retry later") client = _get_client(account_id) try: result = client.trading.cancel_order(order_id=order_id, version=2) except BitstampError as exc: detail = str(exc) if _looks_like_auth_failure(detail): _cancel_breaker_trip(account_id) _invalidate_client(account_id) raise HTTPException(status_code=400, detail=detail) from exc status = "cancelled" if result else "cancel_failed" with get_connection() as conn: conn.execute( "UPDATE order_records SET status = ?, updated_at = ? WHERE bitstamp_order_id = ?", (status, _utc_now(), order_id), ) conn.commit() return {"ok": bool(result), "order_id": order_id, "raw": result}