| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208 |
- from __future__ import annotations
- import json
- import sqlite3
- from datetime import datetime, timedelta, timezone
- from pathlib import Path
- from threading import Lock
- from typing import Any
- DB_PATH = Path(__file__).resolve().parent.parent / "data" / "trends_history.db"
- DEFAULT_RETENTION_DAYS = 30
- AUTO_PRUNE_INTERVAL_HOURS = 24
- _LOCK = Lock()
- def _connect() -> sqlite3.Connection:
- DB_PATH.parent.mkdir(parents=True, exist_ok=True)
- conn = sqlite3.connect(DB_PATH)
- conn.row_factory = sqlite3.Row
- return conn
- def init_db() -> None:
- with _LOCK:
- conn = _connect()
- try:
- conn.execute(
- """
- CREATE TABLE IF NOT EXISTS snapshots (
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- ts TEXT NOT NULL,
- tool TEXT NOT NULL,
- keyword TEXT,
- normalized_keyword TEXT,
- mid TEXT,
- canonical_label TEXT,
- payload_json TEXT NOT NULL
- )
- """
- )
- conn.execute(
- """
- CREATE TABLE IF NOT EXISTS meta (
- key TEXT PRIMARY KEY,
- value TEXT NOT NULL
- )
- """
- )
- conn.execute("CREATE INDEX IF NOT EXISTS idx_snapshots_ts ON snapshots(ts)")
- conn.execute("CREATE INDEX IF NOT EXISTS idx_snapshots_tool ON snapshots(tool)")
- conn.execute("CREATE INDEX IF NOT EXISTS idx_snapshots_keyword ON snapshots(keyword)")
- conn.execute("CREATE INDEX IF NOT EXISTS idx_snapshots_norm_keyword ON snapshots(normalized_keyword)")
- conn.execute("CREATE INDEX IF NOT EXISTS idx_snapshots_mid ON snapshots(mid)")
- conn.commit()
- finally:
- conn.close()
- def _get_meta(conn: sqlite3.Connection, key: str) -> str | None:
- row = conn.execute("SELECT value FROM meta WHERE key = ?", (key,)).fetchone()
- return row[0] if row else None
- def _set_meta(conn: sqlite3.Connection, key: str, value: str) -> None:
- conn.execute(
- "INSERT INTO meta (key, value) VALUES (?, ?) ON CONFLICT(key) DO UPDATE SET value = excluded.value",
- (key, value),
- )
- def prune_snapshots(retention_days: int = DEFAULT_RETENTION_DAYS) -> int:
- cutoff = datetime.now(timezone.utc) - timedelta(days=max(1, int(retention_days)))
- with _LOCK:
- conn = _connect()
- try:
- cur = conn.execute("DELETE FROM snapshots WHERE ts < ?", (cutoff.isoformat(),))
- _set_meta(conn, "last_prune_ts", datetime.now(timezone.utc).isoformat())
- conn.commit()
- return int(cur.rowcount or 0)
- finally:
- conn.close()
- def auto_prune_if_due(retention_days: int = DEFAULT_RETENTION_DAYS) -> int:
- init_db()
- with _LOCK:
- conn = _connect()
- try:
- last_prune_ts = _get_meta(conn, "last_prune_ts")
- if last_prune_ts:
- try:
- last_prune = datetime.fromisoformat(last_prune_ts)
- if datetime.now(timezone.utc) - last_prune < timedelta(hours=AUTO_PRUNE_INTERVAL_HOURS):
- return 0
- except ValueError:
- pass
- finally:
- conn.close()
- return prune_snapshots(retention_days)
- def store_snapshot(
- *,
- tool: str,
- keyword: str | None,
- normalized_keyword: str | None,
- mid: str | None,
- canonical_label: str | None,
- payload: dict[str, Any],
- ) -> None:
- init_db()
- auto_prune_if_due()
- ts = datetime.now(timezone.utc).isoformat()
- with _LOCK:
- conn = _connect()
- try:
- conn.execute(
- """
- INSERT INTO snapshots (ts, tool, keyword, normalized_keyword, mid, canonical_label, payload_json)
- VALUES (?, ?, ?, ?, ?, ?, ?)
- """,
- (
- ts,
- tool,
- keyword,
- normalized_keyword,
- mid,
- canonical_label,
- json.dumps(payload, ensure_ascii=False),
- ),
- )
- conn.commit()
- finally:
- conn.close()
- def _row_to_dict(row: sqlite3.Row) -> dict[str, Any]:
- payload = json.loads(row["payload_json"])
- return {
- "id": row["id"],
- "ts": row["ts"],
- "tool": row["tool"],
- "keyword": row["keyword"],
- "normalized_keyword": row["normalized_keyword"],
- "mid": row["mid"],
- "canonical_label": row["canonical_label"],
- "payload": payload,
- }
- def read_recent(limit: int = 50) -> list[dict[str, Any]]:
- init_db()
- conn = _connect()
- try:
- rows = conn.execute(
- "SELECT * FROM snapshots ORDER BY ts DESC LIMIT ?",
- (max(1, min(int(limit), 200)),),
- ).fetchall()
- return [_row_to_dict(row) for row in rows]
- finally:
- conn.close()
- def summarize(limit: int = 500) -> dict[str, Any]:
- rows = read_recent(limit)
- tools: dict[str, int] = {}
- keywords: dict[str, int] = {}
- mids: dict[str, int] = {}
- for row in rows:
- if row["tool"]:
- tools[row["tool"]] = tools.get(row["tool"], 0) + 1
- keyword = row["normalized_keyword"] or row["keyword"]
- if keyword:
- keywords[keyword] = keywords.get(keyword, 0) + 1
- if row["mid"]:
- mids[row["mid"]] = mids.get(row["mid"], 0) + 1
- return {
- "entries": len(rows),
- "top_tools": sorted(tools.items(), key=lambda x: x[1], reverse=True)[:10],
- "top_keywords": sorted(keywords.items(), key=lambda x: x[1], reverse=True)[:10],
- "top_mids": sorted(mids.items(), key=lambda x: x[1], reverse=True)[:10],
- }
- def entity_history(entity: str, limit: int = 500) -> dict[str, Any]:
- init_db()
- entity_key = entity.strip().lower()
- conn = _connect()
- try:
- rows = conn.execute(
- """
- SELECT * FROM snapshots
- WHERE lower(coalesce(normalized_keyword, '')) = ?
- OR lower(coalesce(keyword, '')) = ?
- OR lower(coalesce(mid, '')) = ?
- OR lower(coalesce(canonical_label, '')) = ?
- ORDER BY ts DESC
- LIMIT ?
- """,
- (entity_key, entity_key, entity_key, entity_key, max(1, min(int(limit), 2000))),
- ).fetchall()
- return {
- "entity": entity,
- "count": len(rows),
- "entries": [_row_to_dict(row) for row in rows],
- }
- finally:
- conn.close()
|