services_orders.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322
  1. from __future__ import annotations
  2. import json
  3. import os
  4. import time
  5. import threading
  6. from datetime import datetime, timezone, timedelta
  7. from decimal import Decimal, ROUND_DOWN
  8. from uuid import uuid4
  9. from fastapi import HTTPException
  10. from .bitstamp import BitstampClient, BitstampError
  11. from .services_bitstamp import get_bitstamp_client, clear_bitstamp_trading_client, invalidate_account_cache
  12. from .bitstamp_metadata import load_market_by_symbol
  13. from .storage import get_connection
  14. OPEN_ORDER_STATUSES = {"open", "new", "partially_filled"}
  15. _CANCEL_BREAKER_LOCK = threading.Lock()
  16. _CANCEL_BREAKER_NEXT_ALLOWED: dict[str, float] = {}
  17. _CANCEL_BREAKER_SECONDS = 3.0
  18. def _cancel_breaker_is_open(account_id: str) -> bool:
  19. with _CANCEL_BREAKER_LOCK:
  20. return _CANCEL_BREAKER_NEXT_ALLOWED.get(account_id, 0.0) > time.monotonic()
  21. def _cancel_breaker_trip(account_id: str) -> None:
  22. with _CANCEL_BREAKER_LOCK:
  23. _CANCEL_BREAKER_NEXT_ALLOWED[account_id] = time.monotonic() + _CANCEL_BREAKER_SECONDS
  24. def _looks_like_auth_failure(detail: str) -> bool:
  25. text = str(detail or "").lower()
  26. return "403" in text and ("authentication failed" in text or "nonce" in text or "timestamp" in text)
  27. def _bitstamp_call_delay_seconds() -> float:
  28. try:
  29. return max(int(os.getenv("BITSTAMP_CALL_DELAY_MS", "250")) / 1000.0, 0.0)
  30. except Exception:
  31. return 0.25
  32. def _utc_now() -> str:
  33. return datetime.now(timezone.utc).isoformat()
  34. def _get_client(account_id: str) -> BitstampClient:
  35. return BitstampClientWrapper(account_id)
  36. class BitstampClientWrapper:
  37. def __init__(self, account_id: str):
  38. self.trading = get_bitstamp_client(account_id).trading
  39. def _invalidate_client(account_id: str) -> None:
  40. clear_bitstamp_trading_client(account_id)
  41. def _format_decimal(value, decimals: int) -> str:
  42. quant = Decimal("1").scaleb(-decimals)
  43. return str(Decimal(str(value)).quantize(quant, rounding=ROUND_DOWN))
  44. def _normalize_client_id(client_id: str | None) -> str | None:
  45. if client_id is None:
  46. return None
  47. if not isinstance(client_id, str):
  48. raise HTTPException(status_code=400, detail="client_id must be a string")
  49. client_id = client_id.strip()
  50. if not client_id:
  51. raise HTTPException(status_code=400, detail="client_id must not be empty")
  52. return client_id
  53. def _normalize_status(status) -> str:
  54. return str(status or "unknown").strip().lower().replace(" ", "_")
  55. def _set_local_order_status(*, bitstamp_order_id: str, status: str) -> None:
  56. with get_connection() as conn:
  57. conn.execute(
  58. "UPDATE order_records SET status = ?, updated_at = ? WHERE bitstamp_order_id = ?",
  59. (_normalize_status(status), _utc_now(), bitstamp_order_id),
  60. )
  61. conn.commit()
  62. def _validate_order_shape(market: str, side: str, order_type: str, amount, price) -> tuple[str, str | None, dict]:
  63. meta = load_market_by_symbol(market)
  64. if meta is None:
  65. raise HTTPException(status_code=400, detail=f"unknown market {market}")
  66. base_decimals = int(meta.get("base_decimals", 8))
  67. counter_decimals = int(meta.get("counter_decimals", 2))
  68. minimum_order_value = Decimal(str(meta.get("minimum_order_value", "0")))
  69. amount_dec = Decimal(str(amount))
  70. amount_fmt = _format_decimal(amount, base_decimals)
  71. if side == "buy" and order_type == "market":
  72. if Decimal(amount_fmt) < minimum_order_value:
  73. raise HTTPException(status_code=400, detail=f"Minimum order size is {minimum_order_value} {meta.get('counter_currency', 'USD')}.")
  74. if price is not None:
  75. price_fmt = _format_decimal(price, counter_decimals)
  76. if Decimal(price_fmt) <= 0:
  77. raise HTTPException(status_code=400, detail="price must be positive")
  78. else:
  79. price_fmt = None
  80. if amount_dec <= 0:
  81. raise HTTPException(status_code=400, detail="amount must be positive")
  82. return amount_fmt, price_fmt, meta
  83. def place_order(*, account_id: str, market: str, side: str, order_type: str, amount, price=None, expire_time: int | None = None, client_id: str | None = None, client_order_id: str | None = None) -> dict:
  84. client = _get_client(account_id)
  85. client_id = _normalize_client_id(client_id)
  86. side = side.lower()
  87. order_type = order_type.lower()
  88. market = market.lower()
  89. if len(market) < 6:
  90. raise HTTPException(status_code=400, detail="market must look like xrpusd")
  91. base = market[:-3]
  92. quote = market[-3:]
  93. expire_timestamp = None
  94. if expire_time is not None:
  95. expire_timestamp = int((datetime.now(timezone.utc) + timedelta(seconds=expire_time)).timestamp() * 1000)
  96. amount, price, _meta = _validate_order_shape(market, side, order_type, amount, price)
  97. try:
  98. if order_type not in {"market", "limit"}:
  99. raise HTTPException(status_code=400, detail="invalid order_type")
  100. if side == "buy":
  101. if expire_timestamp is None:
  102. result = client.trading.buy_order(amount=amount, price=price or "0", base=base, quote=quote)
  103. else:
  104. result = client.trading.buy_gtd_order(amount=amount, price=price or "0", base=base, quote=quote, expire_time=expire_timestamp)
  105. elif side == "sell":
  106. if expire_timestamp is None:
  107. result = client.trading.sell_order(amount=amount, price=price or "0", base=base, quote=quote)
  108. else:
  109. result = client.trading.sell_gtd_order(amount=amount, price=price or "0", base=base, quote=quote, expire_time=expire_timestamp)
  110. else:
  111. raise HTTPException(status_code=400, detail="invalid side")
  112. except BitstampError as exc:
  113. detail = str(exc)
  114. if _looks_like_auth_failure(detail):
  115. _invalidate_client(account_id)
  116. raise HTTPException(status_code=400, detail=detail) from exc
  117. bitstamp_order_id = str(result.get("id") or result.get("order_id") or "")
  118. if not bitstamp_order_id:
  119. return {"ok": False, "error": "missing Bitstamp order id", "details": {"raw": result}}
  120. record_id = str(uuid4())
  121. now = _utc_now()
  122. with get_connection() as conn:
  123. conn.execute(
  124. """
  125. INSERT INTO order_records
  126. (id, account_id, market, side, order_type, amount, price, expire_time, status, bitstamp_order_id, client_id, client_order_id, raw_json, created_at, updated_at)
  127. VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
  128. """,
  129. (record_id, account_id, market, side, order_type, amount, price, expire_time, _normalize_status(result.get("status", "open")), bitstamp_order_id, client_id, client_order_id or result.get("client_order_id"), json.dumps(result), now, now),
  130. )
  131. conn.commit()
  132. invalidate_account_cache(account_id)
  133. return {"ok": True, "bitstamp_order_id": bitstamp_order_id, "record_id": record_id, "status": str(result.get("status", "open")), "raw": result}
  134. def get_open_orders(*, account_id: str, client_id: str | None = None) -> dict:
  135. client_id = _normalize_client_id(client_id)
  136. client = _get_client(account_id)
  137. with get_connection() as conn:
  138. if client_id is None:
  139. rows = conn.execute(
  140. """
  141. SELECT id, account_id, market, side, order_type, amount, price, expire_time, status, bitstamp_order_id, client_id, client_order_id, raw_json, created_at, updated_at
  142. FROM order_records
  143. WHERE account_id = ? AND lower(status) IN ('open', 'new', 'partially_filled')
  144. ORDER BY created_at DESC
  145. """,
  146. (account_id,),
  147. ).fetchall()
  148. else:
  149. rows = conn.execute(
  150. """
  151. SELECT id, account_id, market, side, order_type, amount, price, expire_time, status, bitstamp_order_id, client_id, client_order_id, raw_json, created_at, updated_at
  152. FROM order_records
  153. WHERE account_id = ? AND client_id = ? AND lower(status) IN ('open', 'new', 'partially_filled')
  154. ORDER BY created_at DESC
  155. """,
  156. (account_id, client_id),
  157. ).fetchall()
  158. order_status_v2 = getattr(getattr(client, "trading", None), "order_status_v2", None)
  159. if order_status_v2 is None:
  160. return {"ok": True, "client_id": client_id, "orders": [dict(row) for row in rows]}
  161. orders = []
  162. delay = _bitstamp_call_delay_seconds()
  163. for row in rows:
  164. order = dict(row)
  165. bitstamp_order_id = order.get("bitstamp_order_id")
  166. if not bitstamp_order_id:
  167. continue
  168. try:
  169. result = order_status_v2(order_id=str(bitstamp_order_id), omit_transactions=True)
  170. status = _normalize_status(result.get("status", "unknown"))
  171. if status not in OPEN_ORDER_STATUSES:
  172. _set_local_order_status(bitstamp_order_id=str(bitstamp_order_id), status=status)
  173. invalidate_account_cache(account_id)
  174. continue
  175. order["status"] = status
  176. order["raw_json"] = json.dumps(result)
  177. orders.append(order)
  178. except Exception as exc:
  179. msg = str(exc)
  180. if "not found" in msg.lower():
  181. _set_local_order_status(bitstamp_order_id=str(bitstamp_order_id), status="missing")
  182. invalidate_account_cache(account_id)
  183. continue
  184. # Best effort: keep the local record when the exchange cannot be queried.
  185. orders.append(order)
  186. if delay > 0:
  187. time.sleep(delay)
  188. return {"ok": True, "client_id": client_id, "orders": orders}
  189. def cancel_all_orders(*, account_id: str, client_id: str | None = None) -> dict:
  190. client_id = _normalize_client_id(client_id)
  191. orders = get_open_orders(account_id=account_id, client_id=client_id)["orders"]
  192. results = []
  193. delay = _bitstamp_call_delay_seconds()
  194. for order in orders:
  195. if _cancel_breaker_is_open(account_id):
  196. results.append({"ok": False, "order_id": order.get("bitstamp_order_id"), "error": "cancel breaker active", "status": "deferred"})
  197. continue
  198. bitstamp_order_id = order.get("bitstamp_order_id")
  199. if not bitstamp_order_id:
  200. results.append({"order_id": None, "ok": False, "error": "missing bitstamp_order_id"})
  201. continue
  202. try:
  203. results.append(cancel_order(account_id=account_id, order_id=bitstamp_order_id))
  204. except HTTPException as exc:
  205. detail = str(exc.detail)
  206. if _looks_like_auth_failure(detail):
  207. _cancel_breaker_trip(account_id)
  208. results.append({"ok": False, "order_id": bitstamp_order_id, "error": detail, "status": "deferred"})
  209. break
  210. if "not found" in detail.lower():
  211. _set_local_order_status(bitstamp_order_id=bitstamp_order_id, status="missing")
  212. invalidate_account_cache(account_id)
  213. results.append({"ok": False, "order_id": bitstamp_order_id, "error": detail, "status": "missing"})
  214. continue
  215. raise
  216. if delay > 0:
  217. time.sleep(delay)
  218. invalidate_account_cache(account_id)
  219. return {"ok": True, "client_id": client_id, "cancelled": results, "count": len(results)}
  220. def query_order(*, account_id: str, order_id, client_order_id: str | None = None, omit_transactions: bool | None = None) -> dict:
  221. order_id = str(order_id)
  222. client = _get_client(account_id)
  223. try:
  224. result = client.trading.order_status_v2(order_id=order_id, client_order_id=client_order_id, omit_transactions=omit_transactions)
  225. except BitstampError as exc:
  226. detail = str(exc)
  227. if _looks_like_auth_failure(detail):
  228. _invalidate_client(account_id)
  229. raise HTTPException(status_code=400, detail=detail) from exc
  230. with get_connection() as conn:
  231. conn.execute(
  232. "UPDATE order_records SET status = ?, raw_json = ?, updated_at = ? WHERE bitstamp_order_id = ?",
  233. (_normalize_status(result.get("status", "unknown")), json.dumps(result), _utc_now(), order_id),
  234. )
  235. conn.commit()
  236. invalidate_account_cache(account_id)
  237. return {"ok": True, "order_id": order_id, "raw": result}
  238. def cancel_order(*, account_id: str, order_id) -> dict:
  239. order_id = str(order_id)
  240. if _cancel_breaker_is_open(account_id):
  241. raise HTTPException(status_code=503, detail="cancel breaker active, retry later")
  242. client = _get_client(account_id)
  243. try:
  244. result = client.trading.cancel_order(order_id=order_id, version=2)
  245. except BitstampError as exc:
  246. detail = str(exc)
  247. if _looks_like_auth_failure(detail):
  248. _cancel_breaker_trip(account_id)
  249. _invalidate_client(account_id)
  250. raise HTTPException(status_code=400, detail=detail) from exc
  251. status = "cancelled" if result else "cancel_failed"
  252. with get_connection() as conn:
  253. conn.execute(
  254. "UPDATE order_records SET status = ?, updated_at = ? WHERE bitstamp_order_id = ?",
  255. (status, _utc_now(), order_id),
  256. )
  257. conn.commit()
  258. invalidate_account_cache(account_id)
  259. return {"ok": bool(result), "order_id": order_id, "raw": result}