| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465 |
- from __future__ import annotations
- import json
- import sqlite3
- from dataclasses import dataclass
- from datetime import datetime, timezone, timedelta
- from pathlib import Path
- from typing import Any
- @dataclass
- class ClusterRow:
- cluster_id: str
- topic: str
- payload: dict
- updated_at: datetime
- class SQLiteClusterStore:
- def __init__(self, db_path: str | Path):
- self.db_path = str(db_path)
- self._init_db()
- def _conn(self) -> sqlite3.Connection:
- return sqlite3.connect(self.db_path)
- def _init_db(self) -> None:
- Path(self.db_path).parent.mkdir(parents=True, exist_ok=True)
- with self._conn() as conn:
- conn.execute(
- """
- CREATE TABLE IF NOT EXISTS clusters (
- cluster_id TEXT PRIMARY KEY,
- topic TEXT NOT NULL,
- payload TEXT NOT NULL,
- updated_at TEXT NOT NULL
- )
- """
- )
- conn.execute(
- "CREATE INDEX IF NOT EXISTS idx_clusters_topic ON clusters(topic)"
- )
- def upsert_clusters(self, clusters: list[dict], topic: str) -> None:
- now = datetime.now(timezone.utc)
- with self._conn() as conn:
- for c in clusters:
- cluster_id = c["cluster_id"]
- payload = json.dumps(c, ensure_ascii=False)
- conn.execute(
- "INSERT INTO clusters(cluster_id, topic, payload, updated_at) VALUES(?,?,?,?) "
- "ON CONFLICT(cluster_id) DO UPDATE SET topic=excluded.topic, payload=excluded.payload, updated_at=excluded.updated_at",
- (cluster_id, topic, payload, now.isoformat()),
- )
- def get_latest_clusters(self, topic: str, ttl_hours: float, limit: int) -> list[dict]:
- cutoff = datetime.now(timezone.utc) - timedelta(hours=ttl_hours)
- cutoff_iso = cutoff.isoformat()
- with self._conn() as conn:
- cur = conn.execute(
- "SELECT payload FROM clusters WHERE topic=? AND updated_at >= ? ORDER BY updated_at DESC LIMIT ?",
- (topic, cutoff_iso, int(limit)),
- )
- rows = [json.loads(r[0]) for r in cur.fetchall()]
- return rows
|