from __future__ import annotations import json import sqlite3 from datetime import datetime, timedelta, timezone from pathlib import Path from threading import Lock from typing import Any DB_PATH = Path(__file__).resolve().parent.parent / "data" / "trends_history.db" DEFAULT_RETENTION_DAYS = 30 AUTO_PRUNE_INTERVAL_HOURS = 24 _LOCK = Lock() def _connect() -> sqlite3.Connection: DB_PATH.parent.mkdir(parents=True, exist_ok=True) conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row return conn def init_db() -> None: with _LOCK: conn = _connect() try: conn.execute( """ CREATE TABLE IF NOT EXISTS snapshots ( id INTEGER PRIMARY KEY AUTOINCREMENT, ts TEXT NOT NULL, tool TEXT NOT NULL, keyword TEXT, normalized_keyword TEXT, mid TEXT, canonical_label TEXT, payload_json TEXT NOT NULL ) """ ) conn.execute( """ CREATE TABLE IF NOT EXISTS meta ( key TEXT PRIMARY KEY, value TEXT NOT NULL ) """ ) conn.execute("CREATE INDEX IF NOT EXISTS idx_snapshots_ts ON snapshots(ts)") conn.execute("CREATE INDEX IF NOT EXISTS idx_snapshots_tool ON snapshots(tool)") conn.execute("CREATE INDEX IF NOT EXISTS idx_snapshots_keyword ON snapshots(keyword)") conn.execute("CREATE INDEX IF NOT EXISTS idx_snapshots_norm_keyword ON snapshots(normalized_keyword)") conn.execute("CREATE INDEX IF NOT EXISTS idx_snapshots_mid ON snapshots(mid)") conn.commit() finally: conn.close() def _get_meta(conn: sqlite3.Connection, key: str) -> str | None: row = conn.execute("SELECT value FROM meta WHERE key = ?", (key,)).fetchone() return row[0] if row else None def _set_meta(conn: sqlite3.Connection, key: str, value: str) -> None: conn.execute( "INSERT INTO meta (key, value) VALUES (?, ?) ON CONFLICT(key) DO UPDATE SET value = excluded.value", (key, value), ) def prune_snapshots(retention_days: int = DEFAULT_RETENTION_DAYS) -> int: cutoff = datetime.now(timezone.utc) - timedelta(days=max(1, int(retention_days))) with _LOCK: conn = _connect() try: cur = conn.execute("DELETE FROM snapshots WHERE ts < ?", (cutoff.isoformat(),)) _set_meta(conn, "last_prune_ts", datetime.now(timezone.utc).isoformat()) conn.commit() return int(cur.rowcount or 0) finally: conn.close() def auto_prune_if_due(retention_days: int = DEFAULT_RETENTION_DAYS) -> int: init_db() with _LOCK: conn = _connect() try: last_prune_ts = _get_meta(conn, "last_prune_ts") if last_prune_ts: try: last_prune = datetime.fromisoformat(last_prune_ts) if datetime.now(timezone.utc) - last_prune < timedelta(hours=AUTO_PRUNE_INTERVAL_HOURS): return 0 except ValueError: pass finally: conn.close() return prune_snapshots(retention_days) def store_snapshot( *, tool: str, keyword: str | None, normalized_keyword: str | None, mid: str | None, canonical_label: str | None, payload: dict[str, Any], ) -> None: init_db() auto_prune_if_due() ts = datetime.now(timezone.utc).isoformat() with _LOCK: conn = _connect() try: conn.execute( """ INSERT INTO snapshots (ts, tool, keyword, normalized_keyword, mid, canonical_label, payload_json) VALUES (?, ?, ?, ?, ?, ?, ?) """, ( ts, tool, keyword, normalized_keyword, mid, canonical_label, json.dumps(payload, ensure_ascii=False), ), ) conn.commit() finally: conn.close() def _row_to_dict(row: sqlite3.Row) -> dict[str, Any]: payload = json.loads(row["payload_json"]) return { "id": row["id"], "ts": row["ts"], "tool": row["tool"], "keyword": row["keyword"], "normalized_keyword": row["normalized_keyword"], "mid": row["mid"], "canonical_label": row["canonical_label"], "payload": payload, } def read_recent(limit: int = 50) -> list[dict[str, Any]]: init_db() conn = _connect() try: rows = conn.execute( "SELECT * FROM snapshots ORDER BY ts DESC LIMIT ?", (max(1, min(int(limit), 200)),), ).fetchall() return [_row_to_dict(row) for row in rows] finally: conn.close() def summarize(limit: int = 500) -> dict[str, Any]: rows = read_recent(limit) tools: dict[str, int] = {} keywords: dict[str, int] = {} mids: dict[str, int] = {} for row in rows: if row["tool"]: tools[row["tool"]] = tools.get(row["tool"], 0) + 1 keyword = row["normalized_keyword"] or row["keyword"] if keyword: keywords[keyword] = keywords.get(keyword, 0) + 1 if row["mid"]: mids[row["mid"]] = mids.get(row["mid"], 0) + 1 return { "entries": len(rows), "top_tools": sorted(tools.items(), key=lambda x: x[1], reverse=True)[:10], "top_keywords": sorted(keywords.items(), key=lambda x: x[1], reverse=True)[:10], "top_mids": sorted(mids.items(), key=lambda x: x[1], reverse=True)[:10], } def entity_history(entity: str, limit: int = 500) -> dict[str, Any]: init_db() entity_key = entity.strip().lower() conn = _connect() try: rows = conn.execute( """ SELECT * FROM snapshots WHERE lower(coalesce(normalized_keyword, '')) = ? OR lower(coalesce(keyword, '')) = ? OR lower(coalesce(mid, '')) = ? OR lower(coalesce(canonical_label, '')) = ? ORDER BY ts DESC LIMIT ? """, (entity_key, entity_key, entity_key, entity_key, max(1, min(int(limit), 2000))), ).fetchall() return { "entity": entity, "count": len(rows), "entries": [_row_to_dict(row) for row in rows], } finally: conn.close()