from __future__ import annotations import sqlite3 from datetime import datetime, timezone, timedelta from pathlib import Path from typing import Any SCHEMA = """ CREATE TABLE IF NOT EXISTS candles ( id INTEGER PRIMARY KEY AUTOINCREMENT, symbol TEXT NOT NULL, timeframe TEXT NOT NULL, open REAL NOT NULL, high REAL NOT NULL, low REAL NOT NULL, close REAL NOT NULL, start_ts INTEGER NOT NULL, end_ts INTEGER NOT NULL, UNIQUE(symbol, timeframe, start_ts) ); """ def connect(db_path: str | Path) -> sqlite3.Connection: conn = sqlite3.connect(str(db_path)) conn.row_factory = sqlite3.Row return conn def init_db(db_path: str | Path) -> None: path = Path(db_path) path.parent.mkdir(parents=True, exist_ok=True) with connect(path) as conn: conn.executescript(SCHEMA) conn.execute( "CREATE UNIQUE INDEX IF NOT EXISTS idx_candles_symbol_timeframe_start_ts ON candles(symbol, timeframe, start_ts)" ) conn.commit() def upsert_candle(db_path: str | Path, candle: dict[str, Any]) -> None: with connect(db_path) as conn: conn.execute( """ INSERT INTO candles(symbol, timeframe, open, high, low, close, start_ts, end_ts) VALUES(?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(symbol, timeframe, start_ts) DO UPDATE SET open=excluded.open, high=excluded.high, low=excluded.low, close=excluded.close, end_ts=excluded.end_ts """, ( candle["symbol"], candle["timeframe"], candle["open"], candle["high"], candle["low"], candle["close"], candle["start_ts"], candle["end_ts"], ), ) conn.commit() def latest_candles(db_path: str | Path, symbol: str, timeframe: str, limit: int = 100) -> list[dict[str, Any]]: with connect(db_path) as conn: rows = conn.execute( """ SELECT symbol, timeframe, open, high, low, close, start_ts, end_ts FROM candles WHERE symbol = ? AND timeframe = ? ORDER BY start_ts DESC LIMIT ? """, (symbol, timeframe, limit), ).fetchall() return [dict(row) for row in reversed(rows)] def last_candle(db_path: str | Path, symbol: str, timeframe: str) -> dict[str, Any] | None: with connect(db_path) as conn: row = conn.execute( """ SELECT symbol, timeframe, open, high, low, close, start_ts, end_ts FROM candles WHERE symbol = ? AND timeframe = ? ORDER BY start_ts DESC LIMIT 1 """, (symbol, timeframe), ).fetchone() return dict(row) if row else None def stats(db_path: str | Path) -> dict[str, Any]: with connect(db_path) as conn: candles = conn.execute("SELECT COUNT(*) AS n FROM candles").fetchone()["n"] return {"candles": candles} def prune_candles_older_than(db_path: str | Path, days: int) -> int: if days <= 0: return 0 cutoff = int((datetime.now(timezone.utc) - timedelta(days=days)).timestamp() * 1000) with connect(db_path) as conn: cursor = conn.execute("DELETE FROM candles WHERE end_ts < ?", (cutoff,)) conn.commit() return int(cursor.rowcount or 0)