Procházet zdrojové kódy

Add Bitstamp order and websocket flow

Lukas Goldschmidt před 1 měsícem
rodič
revize
804ca8ec2d

+ 71 - 54
DB_SCHEME.md

@@ -1,54 +1,71 @@
-# exec-mcp database scheme (agreed reference)
-
-## Clean generic scheme
-
-### accounts
-
-- `id` → internal primary key, hidden
-- `display_name` → arbitrary label shown in UI
-- `venue` → exchange name, e.g. `bitstamp`
-- `venue_account_ref` → exchange-side account id, e.g. Bitstamp user id
-- `description` → optional note
-- `enabled` → boolean
-- `metadata_json` → optional extra metadata
-- `created_at`
-- `updated_at`
-
-### account_secrets
-
-- `account_id` → FK to `accounts.id`
-- `api_key`
-- `api_secret`
-- `created_at`
-- `updated_at`
-
-### balance_snapshots
-
-- `id` → internal primary key
-- `account_id` → FK to `accounts.id`
-- `asset_code` → e.g. `BTC`, `EUR`
-- `balance_value`
-- `captured_at`
-
-### order_records
-
-- `id` → internal primary key
-- `account_id` → FK to `accounts.id`
-- `instrument` → e.g. `BTC/EUR`
-- `side`
-- `order_kind`
-- `quantity`
-- `price`
-- `status`
-- `raw_json`
-- `created_at`
-- `updated_at`
-
-## Key rules
-
-- UI never shows `accounts.id`.
-- In communication, distinguish clearly:
-  - **`id`** = internal id used for operations.
-  - **`venue_account_ref`** = external exchange account reference.
-- `venue_account_ref` is **not unique** and must **not** be used for operations.
-- Dashboard operations (update/delete and similar mutations) must use internal `id` only.
+# exec-mcp DB Scheme
+
+Canonical and authoritative.
+
+## accounts
+- id
+- display_name
+- venue
+- venue_account_ref
+- description
+- enabled
+- metadata_json
+- created_at
+- updated_at
+
+## account_secrets
+- account_id
+- api_key
+- api_secret
+- created_at
+- updated_at
+
+## balance_snapshots
+- id
+- account_id
+- asset_code
+- balance
+- balance_value
+- value_currency
+- captured_at
+
+## order_records
+Single schema only:
+- id
+- account_id
+- market
+- side
+- order_type
+- amount
+- price
+- expire_time
+- status
+- bitstamp_order_id
+- client_order_id
+- raw_json
+- created_at
+- updated_at
+
+## api_cache
+- cache_key
+- payload_json
+- fetched_at
+- expires_at
+
+## bitstamp_metadata
+- kind
+- item_key
+- payload_json
+
+## bitstamp_live_prices
+- market
+- price
+- payload_json
+- captured_at
+
+## bitstamp_fx_rates
+- pair
+- buy
+- sell
+- payload_json
+- captured_at

+ 49 - 0
ORDER_PROTOCOL.md

@@ -0,0 +1,49 @@
+# Order Protocol
+
+## place_order request JSON
+
+```json
+{
+  "account_id": "qndd8o9ppop6",
+  "market": "xrpusd",
+  "side": "buy",
+  "order_type": "limit",
+  "amount": "100.0",
+  "price": "0.52",
+  "expire_time": 3600,
+  "client_order_id": "optional-client-id"
+}
+```
+
+## place_order success response JSON
+
+```json
+{
+  "ok": true,
+  "bitstamp_order_id": "1234567890",
+  "record_id": "local-order-id",
+  "status": "open",
+  "raw": {}
+}
+```
+
+## place_order failure response JSON
+
+```json
+{
+  "ok": false,
+  "error": "reason from exchange or validation",
+  "details": {}
+}
+```
+
+## Rules
+
+- `account_id` is the internal account id only.
+- `market` is a Bitstamp pair such as `xrpusd`.
+- `amount` is a string in base currency units.
+- `side` is `buy` or `sell`.
+- `order_type` is `market` or `limit`.
+- `price` is required for `limit` orders.
+- `expire_time` is optional, in seconds.
+- `client_order_id` is optional.

+ 64 - 8
src/exec_mcp/bitstamp.py

@@ -1,7 +1,16 @@
 from __future__ import annotations
 
+import inspect
+import json
 from dataclasses import dataclass
 
+try:
+    from bitstamp.client import BitstampError, Public, Trading
+except ModuleNotFoundError:  # optional in tests
+    BitstampError = Exception  # type: ignore
+    Public = object  # type: ignore
+    Trading = object  # type: ignore
+
 
 @dataclass(slots=True)
 class AccountInfo:
@@ -13,12 +22,59 @@ class AccountInfo:
     metadata: dict | None = None
 
 
-class BitstampAdapter:
-    exchange = "bitstamp"
+class LG_Trading(Trading):
+    def __init__(self, username, key, secret, *args, **kwargs):
+        super(LG_Trading, self).__init__(username=username, key=key, secret=secret, *args, **kwargs)
+
+    def order_status_v2(self, order_id, client_order_id=None, omit_transactions=None):
+        data = {'id': order_id}
+        if client_order_id is not None:
+            data['client_order_id'] = client_order_id
+        if omit_transactions is not None:
+            data['omit_transactions'] = omit_transactions
+        return self._post("order_status/", data=data, return_json=True, version=2)
+
+    def sell_gtd_order(self, amount, price, base="btc", quote="usd", limit_price=None, ioc_order=False, gtd_order=True, expire_time=None):
+        data = {'amount': amount, 'price': price}
+        if limit_price is not None:
+            data['limit_price'] = limit_price
+        if ioc_order is True:
+            data['ioc_order'] = True
+        if gtd_order is True:
+            data['gtd_order'] = True
+        if expire_time is not None:
+            data['expire_time'] = expire_time
+        url = self._construct_url("sell/", base, quote)
+        return self._post(url, data=data, return_json=True, version=2)
+
+    def buy_gtd_order(self, amount, price, base="btc", quote="usd", limit_price=None, ioc_order=False, gtd_order=True, expire_time=None):
+        data = {'amount': amount, 'price': price}
+        if limit_price is not None:
+            data['limit_price'] = limit_price
+        if ioc_order is True:
+            data['ioc_order'] = True
+        if gtd_order is True:
+            data['gtd_order'] = True
+        if expire_time is not None:
+            data['expire_time'] = expire_time
+        url = self._construct_url("buy/", base, quote)
+        return self._post(url, data=data, return_json=True, version=2)
+
+    def fees_trading(self, base=False, quote=False):
+        url = self._construct_url("fees/trading/", base, quote)
+        return self._post(url, return_json=True, version=2)
+
+
+class BitstampClient:
+    def __init__(self, username: str, api_key: str, api_secret: str):
+        self.trading = LG_Trading(username=username, key=api_key, secret=api_secret)
+
+    def get_method_docs(self):
+        method_docs = {}
+        for name, method in inspect.getmembers(self.trading, predicate=inspect.ismethod):
+            if name != "get_method_docs":
+                method_docs[name] = method.__doc__
+        return json.dumps(method_docs, indent=4)
 
-    def get_account_info(self, account_id: str) -> AccountInfo:
-        return AccountInfo(
-            account_id=account_id,
-            exchange=self.exchange,
-            metadata={"note": "bitstamp adapter stub"},
-        )
+    def websocket_token(self) -> dict:
+        return self.trading._post("websockets_token/", return_json=True, version=2)

+ 8 - 0
src/exec_mcp/bitstamp_metadata.py

@@ -40,6 +40,14 @@ def load_metadata(kind: str) -> list[dict]:
     return [__import__("json").loads(row["payload_json"]) for row in rows]
 
 
+def load_market_by_symbol(market_symbol: str) -> dict | None:
+    symbol = market_symbol.lower()
+    for item in load_metadata("markets"):
+        if str(item.get("market_symbol", "")).lower() == symbol:
+            return item
+    return None
+
+
 def refresh_metadata() -> dict:
     currencies = fetch_currencies()
     markets = fetch_markets()

+ 69 - 0
src/exec_mcp/bitstamp_private_ws.py

@@ -0,0 +1,69 @@
+from __future__ import annotations
+
+import asyncio
+import json
+from datetime import datetime, timezone
+
+import websockets
+
+from .bitstamp import BitstampClient
+from .repo import get_account, get_account_secrets
+from .storage import get_connection
+
+WS_URL = "wss://ws.bitstamp.net"
+WS_RECONNECT_SECONDS = 5
+WS_HEARTBEAT_SECONDS = 15
+
+
+async def private_ws_main(stop_event: asyncio.Event) -> None:
+    while not stop_event.is_set():
+        try:
+            await _run_once(stop_event)
+        except asyncio.CancelledError:
+            raise
+        except Exception:
+            await asyncio.sleep(WS_RECONNECT_SECONDS)
+
+
+async def _run_once(stop_event: asyncio.Event) -> None:
+    accounts = [a for a in __import__("exec_mcp.repo", fromlist=["list_accounts"]).list_accounts(enabled_only=True) if a["venue"] == "bitstamp"]
+    if not accounts:
+        await asyncio.sleep(WS_RECONNECT_SECONDS)
+        return
+
+    async with websockets.connect(WS_URL, ping_interval=None) as ws:
+        for account in accounts:
+            token_info = _get_token(account["id"])
+            await ws.send(json.dumps({"event": "bts:subscribe", "data": {"channel": f"private-my_orders_{account['venue_account_ref']}-{token_info['user_id']}", "auth": token_info["token"]}}))
+
+        while not stop_event.is_set():
+            try:
+                message = await asyncio.wait_for(ws.recv(), timeout=WS_HEARTBEAT_SECONDS)
+            except asyncio.TimeoutError:
+                await ws.send(json.dumps({"event": "bts:heartbeat"}))
+                continue
+            _handle_message(json.loads(message))
+
+
+def _get_token(account_id: str) -> dict:
+    account = get_account(account_id)
+    secrets = get_account_secrets(account_id)
+    client = BitstampClient(account["venue_account_ref"], secrets["api_key"], secrets["api_secret"])
+    return client.websocket_token()
+
+
+def _handle_message(payload: dict) -> None:
+    event = payload.get("event")
+    if not event:
+        return
+    data = payload.get("data") or {}
+    order_id = data.get("id") or data.get("order_id")
+    if not order_id:
+        return
+    captured_at = datetime.now(timezone.utc).isoformat()
+    with get_connection() as conn:
+        conn.execute(
+            "UPDATE order_records SET status = ?, payload_json = ?, updated_at = ? WHERE bitstamp_order_id = ?",
+            (str(event), json.dumps(payload), captured_at, str(order_id)),
+        )
+        conn.commit()

+ 19 - 0
src/exec_mcp/server.py

@@ -13,6 +13,8 @@ from .services_bitstamp import fetch_account_info as fetch_remote_account_info
 from .bitstamp_metadata import METADATA_REFRESH_SECONDS, refresh_metadata
 from .bitstamp_fx import FX_REFRESH_SECONDS, refresh_eur_usd
 from .bitstamp_ws import ws_main
+from .bitstamp_private_ws import private_ws_main
+from .services_orders import place_order as service_place_order, query_order as service_query_order, cancel_order as service_cancel_order
 from .storage import init_db
 
 mcp = FastMCP("exec-mcp")
@@ -47,6 +49,7 @@ async def lifespan(_: FastAPI):
     metadata_task = asyncio.create_task(_metadata_refresh_loop())
     fx_task = asyncio.create_task(_fx_refresh_loop())
     ws_task = asyncio.create_task(ws_main(stop_event))
+    private_ws_task = asyncio.create_task(private_ws_main(stop_event))
     try:
         yield
     finally:
@@ -54,6 +57,7 @@ async def lifespan(_: FastAPI):
         metadata_task.cancel()
         fx_task.cancel()
         ws_task.cancel()
+        private_ws_task.cancel()
 
 
 app = FastAPI(title="exec-mcp", lifespan=lifespan)
@@ -244,5 +248,20 @@ def get_account_info(account_id: str) -> dict:
     raise HTTPException(status_code=400, detail="unsupported venue")
 
 
+@mcp.tool()
+def place_order(account_id: str, market: str, side: str, order_type: str, amount, price=None, expire_time: int | None = None, client_order_id: str | None = None) -> dict:
+    return service_place_order(account_id=account_id, market=market, side=side, order_type=order_type, amount=amount, price=price, expire_time=expire_time, client_order_id=client_order_id)
+
+
+@mcp.tool()
+def query_order(account_id: str, order_id, client_order_id: str | None = None, omit_transactions: bool | None = None) -> dict:
+    return service_query_order(account_id=account_id, order_id=order_id, client_order_id=client_order_id, omit_transactions=omit_transactions)
+
+
+@mcp.tool()
+def cancel_order(account_id: str, order_id) -> dict:
+    return service_cancel_order(account_id=account_id, order_id=order_id)
+
+
 def main() -> None:
     init_db()

+ 146 - 0
src/exec_mcp/services_orders.py

@@ -0,0 +1,146 @@
+from __future__ import annotations
+
+import json
+from datetime import datetime, timezone, timedelta
+from decimal import Decimal, ROUND_DOWN
+from uuid import uuid4
+
+from fastapi import HTTPException
+
+from .bitstamp import BitstampClient, BitstampError
+from .bitstamp_metadata import load_market_by_symbol
+from .storage import get_connection
+
+
+def _utc_now() -> str:
+    return datetime.now(timezone.utc).isoformat()
+
+
+def _get_client(account_id: str) -> BitstampClient:
+    from .repo import get_account, get_account_secrets
+    account = get_account(account_id)
+    secrets = get_account_secrets(account_id)
+    return BitstampClient(
+        username=account["venue_account_ref"],
+        api_key=secrets["api_key"],
+        api_secret=secrets["api_secret"],
+    )
+
+
+def _format_decimal(value, decimals: int) -> str:
+    quant = Decimal("1").scaleb(-decimals)
+    return str(Decimal(str(value)).quantize(quant, rounding=ROUND_DOWN))
+
+
+def _validate_order_shape(market: str, side: str, order_type: str, amount, price) -> tuple[str, str | None, dict]:
+    meta = load_market_by_symbol(market)
+    if meta is None:
+        raise HTTPException(status_code=400, detail=f"unknown market {market}")
+
+    base_decimals = int(meta.get("base_decimals", 8))
+    counter_decimals = int(meta.get("counter_decimals", 2))
+    minimum_order_value = Decimal(str(meta.get("minimum_order_value", "0")))
+
+    amount_dec = Decimal(str(amount))
+    amount_fmt = _format_decimal(amount, base_decimals)
+
+    if side == "buy" and order_type == "market":
+        if Decimal(amount_fmt) < minimum_order_value:
+            raise HTTPException(status_code=400, detail=f"Minimum order size is {minimum_order_value} {meta.get('counter_currency', 'USD')}.")
+
+    if price is not None:
+        price_fmt = _format_decimal(price, counter_decimals)
+        if Decimal(price_fmt) <= 0:
+            raise HTTPException(status_code=400, detail="price must be positive")
+    else:
+        price_fmt = None
+
+    if amount_dec <= 0:
+        raise HTTPException(status_code=400, detail="amount must be positive")
+
+    return amount_fmt, price_fmt, meta
+
+
+def place_order(*, account_id: str, market: str, side: str, order_type: str, amount, price=None, expire_time: int | None = None, client_order_id: str | None = None) -> dict:
+    client = _get_client(account_id)
+    side = side.lower()
+    order_type = order_type.lower()
+    market = market.lower()
+    if len(market) < 6:
+        raise HTTPException(status_code=400, detail="market must look like xrpusd")
+    base = market[:-3]
+    quote = market[-3:]
+
+    expire_timestamp = None
+    if expire_time is not None:
+        expire_timestamp = int((datetime.now(timezone.utc) + timedelta(seconds=expire_time)).timestamp() * 1000)
+
+    amount, price, _meta = _validate_order_shape(market, side, order_type, amount, price)
+
+    try:
+        if order_type not in {"market", "limit"}:
+            raise HTTPException(status_code=400, detail="invalid order_type")
+        if side == "buy":
+            result = client.trading.buy_gtd_order(amount=amount, price=price or "0", base=base, quote=quote, expire_time=expire_timestamp)
+        elif side == "sell":
+            result = client.trading.sell_gtd_order(amount=amount, price=price or "0", base=base, quote=quote, expire_time=expire_timestamp)
+        else:
+            raise HTTPException(status_code=400, detail="invalid side")
+    except BitstampError as exc:
+        raise HTTPException(status_code=400, detail=str(exc)) from exc
+
+    bitstamp_order_id = str(result.get("id") or result.get("order_id") or "")
+    if not bitstamp_order_id:
+        return {"ok": False, "error": "missing Bitstamp order id", "details": {"raw": result}}
+
+    record_id = str(uuid4())
+    now = _utc_now()
+    with get_connection() as conn:
+        conn.execute(
+            """
+            INSERT INTO order_records
+            (id, account_id, market, side, order_type, amount, price, expire_time, status, bitstamp_order_id, client_order_id, raw_json, created_at, updated_at)
+            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
+            """,
+            (record_id, account_id, market, side, order_type, amount, price, expire_time, str(result.get("status", "open")), bitstamp_order_id, client_order_id or result.get("client_order_id"), json.dumps(result), now, now),
+        )
+        conn.commit()
+
+    return {"ok": True, "bitstamp_order_id": bitstamp_order_id, "record_id": record_id, "status": str(result.get("status", "open")), "raw": result}
+
+
+def query_order(*, account_id: str, order_id, client_order_id: str | None = None, omit_transactions: bool | None = None) -> dict:
+    order_id = str(order_id)
+    client = _get_client(account_id)
+    try:
+        result = client.trading.order_status_v2(order_id=order_id, client_order_id=client_order_id, omit_transactions=omit_transactions)
+    except BitstampError as exc:
+        raise HTTPException(status_code=400, detail=str(exc)) from exc
+
+    with get_connection() as conn:
+        conn.execute(
+            "UPDATE order_records SET status = ?, raw_json = ?, updated_at = ? WHERE bitstamp_order_id = ?",
+            (str(result.get("status", "unknown")), json.dumps(result), _utc_now(), order_id),
+        )
+        conn.commit()
+
+    return {"ok": True, "order_id": order_id, "raw": result}
+
+
+def cancel_order(*, account_id: str, order_id) -> dict:
+    order_id = str(order_id)
+    client = _get_client(account_id)
+    try:
+        result = client.trading.cancel_order(order_id=order_id, version=2)
+    except BitstampError as exc:
+        raise HTTPException(status_code=400, detail=str(exc)) from exc
+
+    status = "cancelled" if result else "cancel_failed"
+    with get_connection() as conn:
+        conn.execute(
+            "UPDATE order_records SET status = ?, updated_at = ? WHERE bitstamp_order_id = ?",
+            (status, _utc_now(), order_id),
+        )
+        conn.commit()
+
+    return {"ok": bool(result), "order_id": order_id, "raw": result}

+ 59 - 5
src/exec_mcp/storage.py

@@ -14,6 +14,56 @@ def get_connection() -> sqlite3.Connection:
     return conn
 
 
+def _migrate_order_records(conn: sqlite3.Connection) -> None:
+    columns = {row[1] for row in conn.execute("PRAGMA table_info(order_records)").fetchall()}
+    canonical = {"id", "account_id", "market", "side", "order_type", "amount", "price", "expire_time", "status", "bitstamp_order_id", "client_order_id", "raw_json", "created_at", "updated_at"}
+    if not columns:
+        return
+    if columns == canonical:
+        return
+
+    conn.execute("DROP TABLE IF EXISTS order_records_legacy")
+    conn.execute("ALTER TABLE order_records RENAME TO order_records_legacy")
+    conn.execute(
+        """
+        CREATE TABLE order_records (
+            id TEXT PRIMARY KEY,
+            account_id TEXT NOT NULL,
+            market TEXT NOT NULL,
+            side TEXT NOT NULL,
+            order_type TEXT NOT NULL,
+            amount TEXT NOT NULL,
+            price TEXT,
+            expire_time INTEGER,
+            status TEXT NOT NULL,
+            bitstamp_order_id TEXT,
+            client_order_id TEXT,
+            raw_json TEXT NOT NULL DEFAULT '{}',
+            created_at TEXT NOT NULL,
+            updated_at TEXT NOT NULL,
+            FOREIGN KEY(account_id) REFERENCES accounts(id) ON DELETE CASCADE
+        )
+        """
+    )
+    legacy_columns = {row[1] for row in conn.execute("PRAGMA table_info(order_records_legacy)").fetchall()}
+    market_col = "market" if "market" in legacy_columns else "instrument"
+    order_type_col = "order_type" if "order_type" in legacy_columns else "order_kind"
+    amount_col = "amount" if "amount" in legacy_columns else "quantity"
+    raw_col = "raw_json" if "raw_json" in legacy_columns else "payload_json"
+    side_expr = "COALESCE(side, '')" if "side" in legacy_columns else "''"
+    status_expr = "COALESCE(status, 'unknown')" if "status" in legacy_columns else "'unknown'"
+    created_expr = "created_at" if "created_at" in legacy_columns else "CURRENT_TIMESTAMP"
+    updated_expr = "updated_at" if "updated_at" in legacy_columns else "CURRENT_TIMESTAMP"
+    conn.execute(
+        f"""
+        INSERT INTO order_records (id, account_id, market, side, order_type, amount, price, expire_time, status, bitstamp_order_id, client_order_id, raw_json, created_at, updated_at)
+        SELECT id, account_id, {market_col}, {side_expr}, {order_type_col}, CAST({amount_col} AS TEXT), price, expire_time, {status_expr}, bitstamp_order_id, client_order_id, COALESCE({raw_col}, '{{}}'), {created_expr}, {updated_expr}
+        FROM order_records_legacy
+        """
+    )
+    conn.execute("DROP TABLE order_records_legacy")
+
+
 def init_db() -> None:
     with get_connection() as conn:
         # Non-destructive schema bootstrap. Keep this stable from here on.
@@ -54,15 +104,18 @@ def init_db() -> None:
             CREATE TABLE IF NOT EXISTS order_records (
                 id TEXT PRIMARY KEY,
                 account_id TEXT NOT NULL,
-                instrument TEXT NOT NULL,
+                market TEXT NOT NULL,
                 side TEXT NOT NULL,
-                order_kind TEXT NOT NULL,
-                quantity REAL NOT NULL,
-                price REAL,
+                order_type TEXT NOT NULL,
+                amount TEXT NOT NULL,
+                price TEXT,
+                expire_time INTEGER,
                 status TEXT NOT NULL,
+                bitstamp_order_id TEXT,
+                client_order_id TEXT,
+                raw_json TEXT NOT NULL,
                 created_at TEXT NOT NULL,
                 updated_at TEXT NOT NULL,
-                raw_json TEXT NOT NULL DEFAULT '{}',
                 FOREIGN KEY(account_id) REFERENCES accounts(id) ON DELETE CASCADE
             );
 
@@ -96,4 +149,5 @@ def init_db() -> None:
             );
             """
         )
+        _migrate_order_records(conn)
         conn.commit()