ledger.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. from __future__ import annotations
  2. import json
  3. import sqlite3
  4. from datetime import datetime, timedelta, timezone
  5. from pathlib import Path
  6. from threading import Lock
  7. from typing import Any
  8. DB_PATH = Path(__file__).resolve().parent.parent / "data" / "trends_history.db"
  9. DEFAULT_RETENTION_DAYS = 30
  10. AUTO_PRUNE_INTERVAL_HOURS = 24
  11. _LOCK = Lock()
  12. def _connect() -> sqlite3.Connection:
  13. DB_PATH.parent.mkdir(parents=True, exist_ok=True)
  14. conn = sqlite3.connect(DB_PATH)
  15. conn.row_factory = sqlite3.Row
  16. return conn
  17. def init_db() -> None:
  18. with _LOCK:
  19. conn = _connect()
  20. try:
  21. conn.execute(
  22. """
  23. CREATE TABLE IF NOT EXISTS snapshots (
  24. id INTEGER PRIMARY KEY AUTOINCREMENT,
  25. ts TEXT NOT NULL,
  26. tool TEXT NOT NULL,
  27. keyword TEXT,
  28. normalized_keyword TEXT,
  29. mid TEXT,
  30. canonical_label TEXT,
  31. payload_json TEXT NOT NULL
  32. )
  33. """
  34. )
  35. conn.execute(
  36. """
  37. CREATE TABLE IF NOT EXISTS meta (
  38. key TEXT PRIMARY KEY,
  39. value TEXT NOT NULL
  40. )
  41. """
  42. )
  43. conn.execute("CREATE INDEX IF NOT EXISTS idx_snapshots_ts ON snapshots(ts)")
  44. conn.execute("CREATE INDEX IF NOT EXISTS idx_snapshots_tool ON snapshots(tool)")
  45. conn.execute("CREATE INDEX IF NOT EXISTS idx_snapshots_keyword ON snapshots(keyword)")
  46. conn.execute("CREATE INDEX IF NOT EXISTS idx_snapshots_norm_keyword ON snapshots(normalized_keyword)")
  47. conn.execute("CREATE INDEX IF NOT EXISTS idx_snapshots_mid ON snapshots(mid)")
  48. conn.commit()
  49. finally:
  50. conn.close()
  51. def _get_meta(conn: sqlite3.Connection, key: str) -> str | None:
  52. row = conn.execute("SELECT value FROM meta WHERE key = ?", (key,)).fetchone()
  53. return row[0] if row else None
  54. def _set_meta(conn: sqlite3.Connection, key: str, value: str) -> None:
  55. conn.execute(
  56. "INSERT INTO meta (key, value) VALUES (?, ?) ON CONFLICT(key) DO UPDATE SET value = excluded.value",
  57. (key, value),
  58. )
  59. def prune_snapshots(retention_days: int = DEFAULT_RETENTION_DAYS) -> int:
  60. cutoff = datetime.now(timezone.utc) - timedelta(days=max(1, int(retention_days)))
  61. with _LOCK:
  62. conn = _connect()
  63. try:
  64. cur = conn.execute("DELETE FROM snapshots WHERE ts < ?", (cutoff.isoformat(),))
  65. _set_meta(conn, "last_prune_ts", datetime.now(timezone.utc).isoformat())
  66. conn.commit()
  67. return int(cur.rowcount or 0)
  68. finally:
  69. conn.close()
  70. def auto_prune_if_due(retention_days: int = DEFAULT_RETENTION_DAYS) -> int:
  71. init_db()
  72. with _LOCK:
  73. conn = _connect()
  74. try:
  75. last_prune_ts = _get_meta(conn, "last_prune_ts")
  76. if last_prune_ts:
  77. try:
  78. last_prune = datetime.fromisoformat(last_prune_ts)
  79. if datetime.now(timezone.utc) - last_prune < timedelta(hours=AUTO_PRUNE_INTERVAL_HOURS):
  80. return 0
  81. except ValueError:
  82. pass
  83. finally:
  84. conn.close()
  85. return prune_snapshots(retention_days)
  86. def store_snapshot(
  87. *,
  88. tool: str,
  89. keyword: str | None,
  90. normalized_keyword: str | None,
  91. mid: str | None,
  92. canonical_label: str | None,
  93. payload: dict[str, Any],
  94. ) -> None:
  95. init_db()
  96. auto_prune_if_due()
  97. ts = datetime.now(timezone.utc).isoformat()
  98. with _LOCK:
  99. conn = _connect()
  100. try:
  101. conn.execute(
  102. """
  103. INSERT INTO snapshots (ts, tool, keyword, normalized_keyword, mid, canonical_label, payload_json)
  104. VALUES (?, ?, ?, ?, ?, ?, ?)
  105. """,
  106. (
  107. ts,
  108. tool,
  109. keyword,
  110. normalized_keyword,
  111. mid,
  112. canonical_label,
  113. json.dumps(payload, ensure_ascii=False),
  114. ),
  115. )
  116. conn.commit()
  117. finally:
  118. conn.close()
  119. def _row_to_dict(row: sqlite3.Row) -> dict[str, Any]:
  120. payload = json.loads(row["payload_json"])
  121. return {
  122. "id": row["id"],
  123. "ts": row["ts"],
  124. "tool": row["tool"],
  125. "keyword": row["keyword"],
  126. "normalized_keyword": row["normalized_keyword"],
  127. "mid": row["mid"],
  128. "canonical_label": row["canonical_label"],
  129. "payload": payload,
  130. }
  131. def read_recent(limit: int = 50) -> list[dict[str, Any]]:
  132. init_db()
  133. conn = _connect()
  134. try:
  135. rows = conn.execute(
  136. "SELECT * FROM snapshots ORDER BY ts DESC LIMIT ?",
  137. (max(1, min(int(limit), 200)),),
  138. ).fetchall()
  139. return [_row_to_dict(row) for row in rows]
  140. finally:
  141. conn.close()
  142. def summarize(limit: int = 500) -> dict[str, Any]:
  143. rows = read_recent(limit)
  144. tools: dict[str, int] = {}
  145. keywords: dict[str, int] = {}
  146. mids: dict[str, int] = {}
  147. for row in rows:
  148. if row["tool"]:
  149. tools[row["tool"]] = tools.get(row["tool"], 0) + 1
  150. keyword = row["normalized_keyword"] or row["keyword"]
  151. if keyword:
  152. keywords[keyword] = keywords.get(keyword, 0) + 1
  153. if row["mid"]:
  154. mids[row["mid"]] = mids.get(row["mid"], 0) + 1
  155. return {
  156. "entries": len(rows),
  157. "top_tools": sorted(tools.items(), key=lambda x: x[1], reverse=True)[:10],
  158. "top_keywords": sorted(keywords.items(), key=lambda x: x[1], reverse=True)[:10],
  159. "top_mids": sorted(mids.items(), key=lambda x: x[1], reverse=True)[:10],
  160. }
  161. def entity_history(entity: str, limit: int = 500) -> dict[str, Any]:
  162. init_db()
  163. entity_key = entity.strip().lower()
  164. conn = _connect()
  165. try:
  166. rows = conn.execute(
  167. """
  168. SELECT * FROM snapshots
  169. WHERE lower(coalesce(normalized_keyword, '')) = ?
  170. OR lower(coalesce(keyword, '')) = ?
  171. OR lower(coalesce(mid, '')) = ?
  172. OR lower(coalesce(canonical_label, '')) = ?
  173. ORDER BY ts DESC
  174. LIMIT ?
  175. """,
  176. (entity_key, entity_key, entity_key, entity_key, max(1, min(int(limit), 2000))),
  177. ).fetchall()
  178. return {
  179. "entity": entity,
  180. "count": len(rows),
  181. "entries": [_row_to_dict(row) for row in rows],
  182. }
  183. finally:
  184. conn.close()