from __future__ import annotations import json import sqlite3 from dataclasses import dataclass from datetime import datetime, timezone, timedelta from pathlib import Path from typing import Any @dataclass class ClusterRow: cluster_id: str topic: str payload: dict updated_at: datetime class SQLiteClusterStore: def __init__(self, db_path: str | Path): self.db_path = str(db_path) self._init_db() def _conn(self) -> sqlite3.Connection: return sqlite3.connect(self.db_path) def _init_db(self) -> None: Path(self.db_path).parent.mkdir(parents=True, exist_ok=True) with self._conn() as conn: conn.execute( """ CREATE TABLE IF NOT EXISTS clusters ( cluster_id TEXT PRIMARY KEY, topic TEXT NOT NULL, payload TEXT NOT NULL, updated_at TEXT NOT NULL, summary_payload TEXT, summary_updated_at TEXT ) """ ) # If the table already exists without the summary columns, # add them (SQLite-friendly incremental migrations). for col_def in [ "summary_payload TEXT", "summary_updated_at TEXT", ]: col = col_def.split()[0] try: conn.execute(f"ALTER TABLE clusters ADD COLUMN {col_def}") except sqlite3.OperationalError: pass conn.execute( "CREATE INDEX IF NOT EXISTS idx_clusters_topic ON clusters(topic)" ) conn.execute( """ CREATE TABLE IF NOT EXISTS feed_state ( feed_key TEXT PRIMARY KEY, last_hash TEXT NOT NULL, updated_at TEXT NOT NULL ) """ ) def upsert_clusters(self, clusters: list[dict], topic: str) -> None: now = datetime.now(timezone.utc) with self._conn() as conn: for c in clusters: cluster_id = c["cluster_id"] payload = json.dumps(c, ensure_ascii=False) conn.execute( "INSERT INTO clusters(cluster_id, topic, payload, updated_at) VALUES(?,?,?,?) " "ON CONFLICT(cluster_id) DO UPDATE SET topic=excluded.topic, payload=excluded.payload, updated_at=excluded.updated_at", (cluster_id, topic, payload, now.isoformat()), ) def upsert_cluster_summary( self, cluster_id: str, summary_payload: dict, ) -> None: now = datetime.now(timezone.utc).isoformat() with self._conn() as conn: conn.execute( "INSERT INTO clusters(cluster_id, topic, payload, updated_at, summary_payload, summary_updated_at) " "VALUES(?,?,?,?,?,?) " "ON CONFLICT(cluster_id) DO UPDATE SET " "summary_payload=excluded.summary_payload, summary_updated_at=excluded.summary_updated_at", ( cluster_id, "", # topic not used for update json.dumps({}, ensure_ascii=False), now, json.dumps(summary_payload, ensure_ascii=False), now, ), ) def get_cluster_summary(self, cluster_id: str, ttl_hours: float) -> dict | None: cutoff = datetime.now(timezone.utc) - timedelta(hours=ttl_hours) cutoff_iso = cutoff.isoformat() with self._conn() as conn: cur = conn.execute( "SELECT summary_payload, summary_updated_at FROM clusters " "WHERE cluster_id=? AND summary_updated_at >= ?", (cluster_id, cutoff_iso), ) row = cur.fetchone() if not row or not row[0]: return None return json.loads(row[0]) def get_latest_clusters(self, topic: str, ttl_hours: float, limit: int) -> list[dict]: cutoff = datetime.now(timezone.utc) - timedelta(hours=ttl_hours) cutoff_iso = cutoff.isoformat() with self._conn() as conn: cur = conn.execute( "SELECT payload FROM clusters WHERE topic=? AND updated_at >= ? ORDER BY updated_at DESC LIMIT ?", (topic, cutoff_iso, int(limit)), ) rows = [json.loads(r[0]) for r in cur.fetchall()] return rows def get_latest_clusters_all_topics(self, ttl_hours: float, limit: int) -> list[dict]: cutoff = datetime.now(timezone.utc) - timedelta(hours=ttl_hours) cutoff_iso = cutoff.isoformat() with self._conn() as conn: cur = conn.execute( "SELECT payload FROM clusters WHERE updated_at >= ? ORDER BY updated_at DESC LIMIT ?", (cutoff_iso, int(limit)), ) return [json.loads(r[0]) for r in cur.fetchall()] def get_cluster_by_id(self, cluster_id: str) -> dict | None: with self._conn() as conn: cur = conn.execute( "SELECT payload FROM clusters WHERE cluster_id=?", (cluster_id,), ) row = cur.fetchone() return json.loads(row[0]) if row else None def get_feed_hash(self, feed_key: str) -> str | None: with self._conn() as conn: cur = conn.execute( "SELECT last_hash FROM feed_state WHERE feed_key=?", (feed_key,), ) row = cur.fetchone() return row[0] if row else None def set_feed_hash(self, feed_key: str, last_hash: str) -> None: now = datetime.now(timezone.utc).isoformat() with self._conn() as conn: conn.execute( "INSERT INTO feed_state(feed_key, last_hash, updated_at) VALUES(?,?,?) " "ON CONFLICT(feed_key) DO UPDATE SET last_hash=excluded.last_hash, updated_at=excluded.updated_at", (feed_key, last_hash, now), ) def get_feed_state(self, feed_key: str) -> dict | None: with self._conn() as conn: cur = conn.execute( "SELECT last_hash, updated_at FROM feed_state WHERE feed_key=?", (feed_key,), ) row = cur.fetchone() if not row: return None return {"last_hash": row[0], "updated_at": row[1]}