from __future__ import annotations import json import sqlite3 from dataclasses import dataclass from datetime import datetime, timezone, timedelta from pathlib import Path from typing import Any @dataclass class ClusterRow: cluster_id: str topic: str payload: dict updated_at: datetime class SQLiteClusterStore: def __init__(self, db_path: str | Path): self.db_path = str(db_path) self._init_db() def _conn(self) -> sqlite3.Connection: return sqlite3.connect(self.db_path) def _init_db(self) -> None: Path(self.db_path).parent.mkdir(parents=True, exist_ok=True) with self._conn() as conn: conn.execute( """ CREATE TABLE IF NOT EXISTS clusters ( cluster_id TEXT PRIMARY KEY, topic TEXT NOT NULL, payload TEXT NOT NULL, updated_at TEXT NOT NULL ) """ ) conn.execute( "CREATE INDEX IF NOT EXISTS idx_clusters_topic ON clusters(topic)" ) def upsert_clusters(self, clusters: list[dict], topic: str) -> None: now = datetime.now(timezone.utc) with self._conn() as conn: for c in clusters: cluster_id = c["cluster_id"] payload = json.dumps(c, ensure_ascii=False) conn.execute( "INSERT INTO clusters(cluster_id, topic, payload, updated_at) VALUES(?,?,?,?) " "ON CONFLICT(cluster_id) DO UPDATE SET topic=excluded.topic, payload=excluded.payload, updated_at=excluded.updated_at", (cluster_id, topic, payload, now.isoformat()), ) def get_latest_clusters(self, topic: str, ttl_hours: float, limit: int) -> list[dict]: cutoff = datetime.now(timezone.utc) - timedelta(hours=ttl_hours) cutoff_iso = cutoff.isoformat() with self._conn() as conn: cur = conn.execute( "SELECT payload FROM clusters WHERE topic=? AND updated_at >= ? ORDER BY updated_at DESC LIMIT ?", (topic, cutoff_iso, int(limit)), ) rows = [json.loads(r[0]) for r in cur.fetchall()] return rows