| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111 |
- from __future__ import annotations
- import sqlite3
- from datetime import datetime, timezone, timedelta
- from pathlib import Path
- from typing import Any
- SCHEMA = """
- CREATE TABLE IF NOT EXISTS candles (
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- symbol TEXT NOT NULL,
- timeframe TEXT NOT NULL,
- open REAL NOT NULL,
- high REAL NOT NULL,
- low REAL NOT NULL,
- close REAL NOT NULL,
- start_ts INTEGER NOT NULL,
- end_ts INTEGER NOT NULL,
- UNIQUE(symbol, timeframe, start_ts)
- );
- """
- def connect(db_path: str | Path) -> sqlite3.Connection:
- conn = sqlite3.connect(str(db_path))
- conn.row_factory = sqlite3.Row
- return conn
- def init_db(db_path: str | Path) -> None:
- path = Path(db_path)
- path.parent.mkdir(parents=True, exist_ok=True)
- with connect(path) as conn:
- conn.executescript(SCHEMA)
- conn.execute(
- "CREATE UNIQUE INDEX IF NOT EXISTS idx_candles_symbol_timeframe_start_ts ON candles(symbol, timeframe, start_ts)"
- )
- conn.commit()
- def upsert_candle(db_path: str | Path, candle: dict[str, Any]) -> None:
- with connect(db_path) as conn:
- conn.execute(
- """
- INSERT INTO candles(symbol, timeframe, open, high, low, close, start_ts, end_ts)
- VALUES(?, ?, ?, ?, ?, ?, ?, ?)
- ON CONFLICT(symbol, timeframe, start_ts) DO UPDATE SET
- open=excluded.open,
- high=excluded.high,
- low=excluded.low,
- close=excluded.close,
- end_ts=excluded.end_ts
- """,
- (
- candle["symbol"],
- candle["timeframe"],
- candle["open"],
- candle["high"],
- candle["low"],
- candle["close"],
- candle["start_ts"],
- candle["end_ts"],
- ),
- )
- conn.commit()
- def latest_candles(db_path: str | Path, symbol: str, timeframe: str, limit: int = 100) -> list[dict[str, Any]]:
- with connect(db_path) as conn:
- rows = conn.execute(
- """
- SELECT symbol, timeframe, open, high, low, close, start_ts, end_ts
- FROM candles
- WHERE symbol = ? AND timeframe = ?
- ORDER BY start_ts DESC
- LIMIT ?
- """,
- (symbol, timeframe, limit),
- ).fetchall()
- return [dict(row) for row in reversed(rows)]
- def last_candle(db_path: str | Path, symbol: str, timeframe: str) -> dict[str, Any] | None:
- with connect(db_path) as conn:
- row = conn.execute(
- """
- SELECT symbol, timeframe, open, high, low, close, start_ts, end_ts
- FROM candles
- WHERE symbol = ? AND timeframe = ?
- ORDER BY start_ts DESC
- LIMIT 1
- """,
- (symbol, timeframe),
- ).fetchone()
- return dict(row) if row else None
- def stats(db_path: str | Path) -> dict[str, Any]:
- with connect(db_path) as conn:
- candles = conn.execute("SELECT COUNT(*) AS n FROM candles").fetchone()["n"]
- return {"candles": candles}
- def prune_candles_older_than(db_path: str | Path, days: int) -> int:
- if days <= 0:
- return 0
- cutoff = int((datetime.now(timezone.utc) - timedelta(days=days)).timestamp() * 1000)
- with connect(db_path) as conn:
- cursor = conn.execute("DELETE FROM candles WHERE end_ts < ?", (cutoff,))
- conn.commit()
- return int(cursor.rowcount or 0)
|