sqlite_store.py 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806
  1. from __future__ import annotations
  2. import json
  3. import sqlite3
  4. from dataclasses import dataclass
  5. from datetime import datetime, timezone, timedelta
  6. from pathlib import Path
  7. from typing import Any
  8. from urllib.parse import urlparse
  9. from email.utils import parsedate_to_datetime
  10. from news_mcp.config import (
  11. NEWS_PRUNE_INTERVAL_HOURS,
  12. NEWS_PRUNING_ENABLED,
  13. NEWS_RETENTION_DAYS,
  14. )
  15. from news_mcp.entity_normalize import normalize_entities
  16. from news_mcp.trends_resolution import resolve_entity_via_trends
  17. def _normalize_ts(ts: Any) -> str:
  18. """Parse any timestamp string and return ISO 8601 UTC.
  19. Handles ISO 8601, RFC 2822/HTTP-date, and unix epoch seconds.
  20. Returns empty string if unparseable.
  21. """
  22. if ts is None:
  23. return ""
  24. if isinstance(ts, (int, float)):
  25. try:
  26. dt = datetime.fromtimestamp(float(ts), tz=timezone.utc)
  27. return dt.strftime("%Y-%m-%dT%H:%M:%S+00:00")
  28. except Exception:
  29. return ""
  30. text = str(ts).strip()
  31. if not text:
  32. return ""
  33. # Try ISO 8601
  34. try:
  35. dt = datetime.fromisoformat(text.replace("Z", "+00:00"))
  36. if dt.tzinfo is None:
  37. dt = dt.replace(tzinfo=timezone.utc)
  38. return dt.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S+00:00")
  39. except Exception:
  40. pass
  41. # Try RFC 2822 / HTTP-date
  42. try:
  43. dt = parsedate_to_datetime(text)
  44. if dt.tzinfo is None:
  45. dt = dt.replace(tzinfo=timezone.utc)
  46. return dt.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S+00:00")
  47. except Exception:
  48. pass
  49. # Return original if we can't parse — better than losing data
  50. return text
  51. def _read_ts(ts: Any) -> float | None:
  52. """Parse a stored, already-normalized ISO 8601 UTC timestamp to a unix float.
  53. All payload.timestamp / payload.first_seen / payload.last_updated values
  54. are guaranteed YYYY-MM-DDTHH:MM:SS+00:00 at write time (enforced by
  55. sanitize_cluster_payload → _normalize_ts). Only datetime.fromisoformat is
  56. needed here. Do NOT add RFC 2822 / parsedate_to_datetime fallbacks — if
  57. this function can't parse a stored timestamp it means the normalization
  58. pipeline has a bug that should be fixed there, not papered over here.
  59. """
  60. if not ts:
  61. return None
  62. try:
  63. dt = datetime.fromisoformat(str(ts).strip())
  64. if dt.tzinfo is None:
  65. dt = dt.replace(tzinfo=timezone.utc)
  66. return dt.astimezone(timezone.utc).timestamp()
  67. except Exception:
  68. return None
  69. @dataclass
  70. class ClusterRow:
  71. cluster_id: str
  72. topic: str
  73. payload: dict
  74. updated_at: datetime
  75. META_LAST_PRUNE_AT = "last_prune_at"
  76. def _article_key(article: dict[str, Any]) -> str:
  77. url = str(article.get("url") or "").strip()
  78. if not url:
  79. return str(article.get("title") or "")
  80. try:
  81. parsed = urlparse(url)
  82. parts = [p for p in parsed.path.split("/") if p]
  83. if parts:
  84. return parts[-1]
  85. except Exception:
  86. pass
  87. return url
  88. def _dedup_articles(articles: list[dict[str, Any]]) -> list[dict[str, Any]]:
  89. seen: set[str] = set()
  90. out: list[dict[str, Any]] = []
  91. for article in articles:
  92. key = _article_key(article)
  93. if key in seen:
  94. continue
  95. seen.add(key)
  96. out.append(article)
  97. return out
  98. def _has_valid_entity_resolutions(resolutions: Any, entities: list[str]) -> bool:
  99. if not isinstance(resolutions, list):
  100. return False
  101. if len(resolutions) != len(entities):
  102. return False
  103. for res in resolutions:
  104. if not isinstance(res, dict):
  105. return False
  106. if not res.get("normalized") or not res.get("canonical_label"):
  107. return False
  108. return True
  109. def sanitize_cluster_payload(cluster: dict[str, Any], *, include_resolutions: bool = True) -> dict[str, Any]:
  110. """Normalize cluster payload so every stored payload is internally consistent."""
  111. out = dict(cluster)
  112. raw_articles = out.get("articles", []) or []
  113. articles = [a for a in raw_articles if isinstance(a, dict)]
  114. # Normalize article timestamps
  115. for a in articles:
  116. if "timestamp" in a:
  117. a["timestamp"] = _normalize_ts(a["timestamp"])
  118. out["articles"] = _dedup_articles(articles)
  119. raw_entities = out.get("entities", []) or []
  120. entities = normalize_entities(raw_entities)
  121. out["entities"] = entities
  122. # Normalize cluster-level timestamps
  123. for field in ("timestamp", "last_updated", "first_seen"):
  124. if field in out and out[field]:
  125. out[field] = _normalize_ts(out[field])
  126. if not include_resolutions:
  127. return out
  128. resolutions = out.get("entityResolutions", None)
  129. if entities:
  130. if not _has_valid_entity_resolutions(resolutions, entities):
  131. out["entityResolutions"] = [resolve_entity_via_trends(e) for e in entities]
  132. else:
  133. # Keep the empty case explicit and stable.
  134. out["entityResolutions"] = []
  135. return out
  136. class SQLiteClusterStore:
  137. def __init__(self, db_path: str | Path):
  138. self.db_path = str(db_path)
  139. self._init_db()
  140. def _conn(self) -> sqlite3.Connection:
  141. return sqlite3.connect(self.db_path)
  142. def _init_db(self) -> None:
  143. Path(self.db_path).parent.mkdir(parents=True, exist_ok=True)
  144. with self._conn() as conn:
  145. conn.execute("PRAGMA journal_mode=WAL")
  146. conn.execute("PRAGMA synchronous=NORMAL")
  147. conn.execute("PRAGMA busy_timeout=5000")
  148. conn.execute(
  149. """
  150. CREATE TABLE IF NOT EXISTS clusters (
  151. cluster_id TEXT PRIMARY KEY,
  152. topic TEXT NOT NULL,
  153. payload TEXT NOT NULL,
  154. updated_at TEXT NOT NULL,
  155. summary_payload TEXT,
  156. summary_updated_at TEXT
  157. )
  158. """
  159. )
  160. # If the table already exists without the summary columns,
  161. # add them (SQLite-friendly incremental migrations).
  162. for col_def in [
  163. "summary_payload TEXT",
  164. "summary_updated_at TEXT",
  165. ]:
  166. col = col_def.split()[0]
  167. try:
  168. conn.execute(f"ALTER TABLE clusters ADD COLUMN {col_def}")
  169. except sqlite3.OperationalError:
  170. pass
  171. conn.execute(
  172. "CREATE INDEX IF NOT EXISTS idx_clusters_topic ON clusters(topic)"
  173. )
  174. conn.execute(
  175. "CREATE INDEX IF NOT EXISTS idx_clusters_updated_at ON clusters(updated_at)"
  176. )
  177. try:
  178. cur = conn.execute("PRAGMA table_info(entity_metadata)")
  179. cols = [row[1] for row in cur.fetchall()]
  180. if cols and "entity_id" not in cols:
  181. conn.execute("DROP TABLE entity_metadata")
  182. except sqlite3.OperationalError:
  183. pass
  184. conn.execute(
  185. """
  186. CREATE TABLE IF NOT EXISTS entity_metadata (
  187. entity_id TEXT PRIMARY KEY,
  188. normalized_label TEXT NOT NULL,
  189. canonical_label TEXT,
  190. mid TEXT,
  191. sources_json TEXT,
  192. updated_at TEXT,
  193. last_requested_at TEXT
  194. )
  195. """
  196. )
  197. conn.execute(
  198. "CREATE UNIQUE INDEX IF NOT EXISTS idx_entity_metadata_mid ON entity_metadata(mid) WHERE mid IS NOT NULL"
  199. )
  200. conn.execute(
  201. """
  202. CREATE TABLE IF NOT EXISTS feed_state (
  203. feed_key TEXT PRIMARY KEY,
  204. last_hash TEXT NOT NULL,
  205. last_item_count INTEGER,
  206. enabled INTEGER DEFAULT 1,
  207. updated_at TEXT NOT NULL
  208. )
  209. """
  210. )
  211. try:
  212. conn.execute("ALTER TABLE feed_state ADD COLUMN enabled INTEGER DEFAULT 1")
  213. except sqlite3.OperationalError:
  214. pass
  215. conn.execute(
  216. """
  217. CREATE TABLE IF NOT EXISTS meta (
  218. key TEXT PRIMARY KEY,
  219. value TEXT NOT NULL
  220. )
  221. """
  222. )
  223. def upsert_clusters(self, clusters: list[dict], topic: str) -> None:
  224. now = datetime.now(timezone.utc)
  225. with self._conn() as conn:
  226. for c in clusters:
  227. c = sanitize_cluster_payload(c)
  228. cluster_id = c["cluster_id"]
  229. payload = json.dumps(c, ensure_ascii=False)
  230. conn.execute(
  231. "INSERT INTO clusters(cluster_id, topic, payload, updated_at) VALUES(?,?,?,?) "
  232. "ON CONFLICT(cluster_id) DO UPDATE SET topic=excluded.topic, payload=excluded.payload, updated_at=excluded.updated_at",
  233. (cluster_id, topic, payload, now.isoformat()),
  234. )
  235. def upsert_cluster_summary(
  236. self,
  237. cluster_id: str,
  238. summary_payload: dict,
  239. ) -> None:
  240. now = datetime.now(timezone.utc).isoformat()
  241. with self._conn() as conn:
  242. conn.execute(
  243. "UPDATE clusters SET summary_payload=?, summary_updated_at=? WHERE cluster_id=?",
  244. (
  245. json.dumps(summary_payload, ensure_ascii=False),
  246. now,
  247. cluster_id,
  248. ),
  249. )
  250. def get_cluster_summary(self, cluster_id: str, ttl_hours: float) -> dict | None:
  251. cutoff = datetime.now(timezone.utc) - timedelta(hours=ttl_hours)
  252. cutoff_iso = cutoff.isoformat()
  253. with self._conn() as conn:
  254. cur = conn.execute(
  255. "SELECT summary_payload, summary_updated_at FROM clusters "
  256. "WHERE cluster_id=? AND summary_updated_at >= ?",
  257. (cluster_id, cutoff_iso),
  258. )
  259. row = cur.fetchone()
  260. if not row or not row[0]:
  261. return None
  262. return json.loads(row[0])
  263. def get_latest_clusters(self, topic: str, ttl_hours: float, limit: int) -> list[dict]:
  264. """Return newest clusters by their own event timestamp (payload.timestamp).
  265. payload.timestamp is guaranteed ISO 8601 UTC — use _read_ts, not raw
  266. JSON parsing with RFC 2822 fallbacks.
  267. """
  268. cutoff_ts = (datetime.now(timezone.utc) - timedelta(hours=float(ttl_hours))).timestamp()
  269. with self._conn() as conn:
  270. cur = conn.execute(
  271. "SELECT payload FROM clusters WHERE topic=? ORDER BY updated_at DESC",
  272. (topic,),
  273. )
  274. candidates = [json.loads(r[0]) for r in cur.fetchall()]
  275. filtered = [c for c in candidates if (_read_ts(c.get("timestamp")) or 0.0) >= cutoff_ts]
  276. filtered.sort(key=lambda c: _read_ts(c.get("timestamp")) or 0.0, reverse=True)
  277. return filtered[: int(limit)]
  278. def get_latest_clusters_all_topics(self, ttl_hours: float, limit: int) -> list[dict]:
  279. """Return newest clusters across all topics by event timestamp.
  280. payload.timestamp is guaranteed ISO 8601 UTC — use _read_ts, not raw
  281. JSON parsing with RFC 2822 fallbacks.
  282. """
  283. cutoff_ts = (datetime.now(timezone.utc) - timedelta(hours=float(ttl_hours))).timestamp()
  284. with self._conn() as conn:
  285. cur = conn.execute("SELECT payload FROM clusters ORDER BY updated_at DESC")
  286. candidates = [json.loads(r[0]) for r in cur.fetchall()]
  287. filtered = [c for c in candidates if (_read_ts(c.get("timestamp")) or 0.0) >= cutoff_ts]
  288. filtered.sort(key=lambda c: _read_ts(c.get("timestamp")) or 0.0, reverse=True)
  289. return filtered[: int(limit)]
  290. def get_cluster_by_id(self, cluster_id: str) -> dict | None:
  291. with self._conn() as conn:
  292. cur = conn.execute(
  293. "SELECT payload FROM clusters WHERE cluster_id=?",
  294. (cluster_id,),
  295. )
  296. row = cur.fetchone()
  297. return json.loads(row[0]) if row else None
  298. def get_feed_hash(self, feed_key: str) -> str | None:
  299. with self._conn() as conn:
  300. cur = conn.execute(
  301. "SELECT last_hash FROM feed_state WHERE feed_key=?",
  302. (feed_key,),
  303. )
  304. row = cur.fetchone()
  305. return row[0] if row else None
  306. def set_feed_hash(self, feed_key: str, last_hash: str) -> None:
  307. self.set_feed_state(feed_key, last_hash, None)
  308. def set_feed_state(self, feed_key: str, last_hash: str, last_item_count: int | None = None) -> None:
  309. now = datetime.now(timezone.utc).isoformat()
  310. with self._conn() as conn:
  311. conn.execute(
  312. "INSERT INTO feed_state(feed_key, last_hash, last_item_count, updated_at) VALUES(?,?,?,?) "
  313. "ON CONFLICT(feed_key) DO UPDATE SET last_hash=excluded.last_hash, last_item_count=excluded.last_item_count, updated_at=excluded.updated_at",
  314. (feed_key, last_hash, last_item_count, now),
  315. )
  316. def get_feed_state(self, feed_key: str) -> dict | None:
  317. with self._conn() as conn:
  318. cur = conn.execute(
  319. "SELECT last_hash, updated_at FROM feed_state WHERE feed_key=?",
  320. (feed_key,),
  321. )
  322. row = cur.fetchone()
  323. if not row:
  324. return None
  325. return {"last_hash": row[0], "updated_at": row[1]}
  326. def get_all_feed_states(self) -> list[dict[str, Any]]:
  327. """All feed_state rows."""
  328. with self._conn() as conn:
  329. cur = conn.execute(
  330. "SELECT feed_key, last_hash, last_item_count, enabled, updated_at FROM feed_state ORDER BY updated_at DESC"
  331. )
  332. return [
  333. {
  334. "feed_key": row[0],
  335. "last_hash": row[1],
  336. "last_item_count": row[2],
  337. "enabled": bool(row[3]),
  338. "updated_at": row[4],
  339. }
  340. for row in cur.fetchall()
  341. ]
  342. def feed_ensure_seeded(self, feed_urls: list[str]) -> None:
  343. """Insert any feed URLs not yet present in feed_state (enabled by default)."""
  344. if not feed_urls:
  345. return
  346. with self._conn() as conn:
  347. for url in feed_urls:
  348. conn.execute(
  349. "INSERT OR IGNORE INTO feed_state(feed_key, last_hash, last_item_count, enabled, updated_at) VALUES(?, '', 0, 1, '')",
  350. (url,),
  351. )
  352. def get_feed_state_list(self) -> list[dict[str, Any]]:
  353. """Return all feeds with enabled/disabled status for the dashboard."""
  354. return self.get_all_feed_states()
  355. def set_feed_enabled(self, feed_url: str, enabled: bool) -> bool:
  356. """Toggle a feed's enabled state. Returns True if the feed existed and was updated."""
  357. with self._conn() as conn:
  358. cur = conn.execute(
  359. "UPDATE feed_state SET enabled = ? WHERE feed_key = ?",
  360. (1 if enabled else 0, feed_url),
  361. )
  362. return cur.rowcount > 0
  363. def get_enabled_feed_urls(self, feed_urls: list[str]) -> list[str]:
  364. """From a list of configured feed URLs, return only those that are enabled in feed_state.
  365. URLs not yet present in feed_state are seeded as enabled.
  366. """
  367. self.feed_ensure_seeded(feed_urls)
  368. with self._conn() as conn:
  369. placeholders = ",".join("?" for _ in feed_urls) if feed_urls else ""
  370. if placeholders:
  371. cur = conn.execute(
  372. f"SELECT feed_key FROM feed_state WHERE feed_key IN ({placeholders}) AND enabled = 1",
  373. feed_urls,
  374. )
  375. else:
  376. cur = conn.execute("SELECT feed_key FROM feed_state WHERE enabled = 1")
  377. return [row[0] for row in cur.fetchall()]
  378. def get_meta(self, key: str) -> str | None:
  379. with self._conn() as conn:
  380. cur = conn.execute("SELECT value FROM meta WHERE key=?", (key,))
  381. row = cur.fetchone()
  382. return row[0] if row else None
  383. def set_meta(self, key: str, value: str) -> None:
  384. with self._conn() as conn:
  385. conn.execute(
  386. "INSERT INTO meta(key, value) VALUES(?, ?) "
  387. "ON CONFLICT(key) DO UPDATE SET value=excluded.value",
  388. (key, value),
  389. )
  390. def upsert_entity_metadata(
  391. self,
  392. normalized_label: str,
  393. canonical_label: str | None = None,
  394. mid: str | None = None,
  395. sources: list[str] | None = None,
  396. ) -> None:
  397. normalized_label = str(normalized_label or "").strip()
  398. if not normalized_label:
  399. return
  400. canonical_label = str(canonical_label).strip() if canonical_label else None
  401. mid = str(mid).strip() if mid else None
  402. entity_id = mid if mid else f"local:{normalized_label}"
  403. sources = sorted({s for s in (sources or []) if s})
  404. sources_json = json.dumps(sources, ensure_ascii=False)
  405. now = datetime.now(timezone.utc).isoformat()
  406. with self._conn() as conn:
  407. conn.execute(
  408. """
  409. INSERT INTO entity_metadata(entity_id, normalized_label, canonical_label, mid, sources_json, updated_at)
  410. VALUES(?,?,?,?,?,?)
  411. ON CONFLICT(entity_id) DO UPDATE SET
  412. canonical_label=excluded.canonical_label,
  413. mid=excluded.mid,
  414. sources_json=excluded.sources_json,
  415. updated_at=excluded.updated_at
  416. """,
  417. (entity_id, normalized_label, canonical_label, mid, sources_json, now),
  418. )
  419. def get_entity_metadata(self, normalized_label: str) -> dict[str, Any] | None:
  420. normalized_label = str(normalized_label or "").strip()
  421. if not normalized_label:
  422. return None
  423. with self._conn() as conn:
  424. cur = conn.execute(
  425. "SELECT entity_id, canonical_label, mid, sources_json, updated_at, last_requested_at "
  426. "FROM entity_metadata "
  427. "WHERE normalized_label=? "
  428. "ORDER BY CASE WHEN mid IS NOT NULL THEN 0 ELSE 1 END, "
  429. "COALESCE(last_requested_at, updated_at) DESC, updated_at DESC "
  430. "LIMIT 1",
  431. (normalized_label,),
  432. )
  433. row = cur.fetchone()
  434. if not row:
  435. return None
  436. sources = []
  437. if row[2]:
  438. try:
  439. sources = json.loads(row[2])
  440. except Exception:
  441. sources = []
  442. return {
  443. "entity_id": row[0],
  444. "normalized_label": normalized_label,
  445. "canonical_label": row[1],
  446. "mid": row[2],
  447. "sources": sources,
  448. "updated_at": row[4],
  449. "last_requested_at": row[5],
  450. }
  451. def record_entity_request(self, normalized_label: str, mid: str | None = None) -> None:
  452. normalized_label = str(normalized_label or "").strip()
  453. if not normalized_label:
  454. return
  455. mid = str(mid).strip() if mid else None
  456. entity_id = mid if mid else f"local:{normalized_label}"
  457. now = datetime.now(timezone.utc).isoformat()
  458. with self._conn() as conn:
  459. conn.execute(
  460. """
  461. INSERT INTO entity_metadata(entity_id, normalized_label, canonical_label, mid, sources_json, updated_at, last_requested_at)
  462. VALUES(?,?,?,?,?,?,?)
  463. ON CONFLICT(entity_id) DO UPDATE SET
  464. last_requested_at=excluded.last_requested_at
  465. """,
  466. (entity_id, normalized_label, None, mid, json.dumps([], ensure_ascii=False), now, now),
  467. )
  468. def get_failed_enrichment_clusters(self, max_retries: int = 3) -> list[dict]:
  469. """Return clusters whose last enrichment failed and haven't exceeded max_retries.
  470. These are candidates for re-enrichment on the next polling cycle.
  471. """
  472. with self._conn() as conn:
  473. cur = conn.execute(
  474. "SELECT payload FROM clusters "
  475. "WHERE json_extract(payload, '$.enrichment_failed_at') IS NOT NULL "
  476. "AND (json_extract(payload, '$.enrichment_retry_count') IS NULL "
  477. " OR json_extract(payload, '$.enrichment_retry_count') < ?) "
  478. "ORDER BY updated_at DESC LIMIT 500",
  479. (max_retries,),
  480. )
  481. rows = cur.fetchall()
  482. return [json.loads(r[0]) for r in rows]
  483. def prune_clusters(self, retention_days: float) -> int:
  484. retention_days = float(retention_days)
  485. if retention_days <= 0:
  486. return 0
  487. cutoff = datetime.now(timezone.utc) - timedelta(days=retention_days)
  488. cutoff_iso = cutoff.isoformat()
  489. pruned_at = datetime.now(timezone.utc).isoformat()
  490. with self._conn() as conn:
  491. cur = conn.execute("DELETE FROM clusters WHERE updated_at < ?", (cutoff_iso,))
  492. deleted = int(cur.rowcount or 0)
  493. conn.execute(
  494. "INSERT INTO meta(key, value) VALUES(?, ?) "
  495. "ON CONFLICT(key) DO UPDATE SET value=excluded.value",
  496. (META_LAST_PRUNE_AT, pruned_at),
  497. )
  498. return deleted
  499. def prune_if_due(self, pruning_enabled: bool, retention_days: float, interval_hours: float = 24.0) -> dict[str, Any]:
  500. retention_days = float(retention_days)
  501. interval_hours = float(interval_hours)
  502. if (not pruning_enabled) or retention_days <= 0:
  503. return {
  504. "enabled": bool(pruning_enabled),
  505. "deleted": 0,
  506. "due": False,
  507. "retention_days": retention_days,
  508. "interval_hours": interval_hours,
  509. "last_prune_at": self.get_meta(META_LAST_PRUNE_AT),
  510. }
  511. last_prune_at = self.get_meta(META_LAST_PRUNE_AT)
  512. now = datetime.now(timezone.utc)
  513. due = True
  514. if last_prune_at:
  515. try:
  516. last_dt = datetime.fromisoformat(last_prune_at)
  517. due = now - last_dt >= timedelta(hours=max(1.0, interval_hours))
  518. except Exception:
  519. due = True
  520. if not due:
  521. return {
  522. "enabled": True,
  523. "deleted": 0,
  524. "due": False,
  525. "retention_days": retention_days,
  526. "interval_hours": interval_hours,
  527. "last_prune_at": last_prune_at,
  528. }
  529. deleted = self.prune_clusters(retention_days)
  530. last_prune_at = self.get_meta(META_LAST_PRUNE_AT)
  531. return {
  532. "enabled": True,
  533. "deleted": deleted,
  534. "due": True,
  535. "retention_days": retention_days,
  536. "interval_hours": interval_hours,
  537. "last_prune_at": last_prune_at,
  538. }
  539. def get_prune_state(self, pruning_enabled: bool, retention_days: float, interval_hours: float = 24.0) -> dict[str, Any]:
  540. return {
  541. "enabled": bool(pruning_enabled),
  542. "retention_days": float(retention_days),
  543. "interval_hours": float(interval_hours),
  544. "last_prune_at": self.get_meta(META_LAST_PRUNE_AT),
  545. }
  546. # ------------------------------------------------------------------
  547. # Dashboard query helpers
  548. # ------------------------------------------------------------------
  549. def get_dashboard_stats(self) -> dict[str, Any]:
  550. """Aggregate status numbers for the health panel."""
  551. with self._conn() as conn:
  552. total_clusters = conn.execute("SELECT COUNT(*) FROM clusters").fetchone()[0]
  553. total_entities = conn.execute("SELECT COUNT(*) FROM entity_metadata").fetchone()[0]
  554. topic_counts = dict(conn.execute(
  555. "SELECT topic, COUNT(*) FROM clusters GROUP BY topic"
  556. ).fetchall())
  557. last_refresh = self.get_meta("last_refresh_at")
  558. feeds = {}
  559. for row in conn.execute("SELECT feed_key, last_hash, last_item_count, updated_at FROM feed_state"):
  560. feeds[row[0]] = {"last_hash": row[1], "last_item_count": row[2], "updated_at": row[3]}
  561. last_prune = self.get_meta(META_LAST_PRUNE_AT)
  562. prune_state = self.get_prune_state(
  563. pruning_enabled=NEWS_PRUNING_ENABLED,
  564. retention_days=NEWS_RETENTION_DAYS,
  565. interval_hours=NEWS_PRUNE_INTERVAL_HOURS,
  566. )
  567. return {
  568. "total_clusters": total_clusters,
  569. "total_entities": total_entities,
  570. "clusters_by_topic": topic_counts,
  571. "last_refresh_at": last_refresh,
  572. "last_prune_at": last_prune,
  573. "prune_state": prune_state,
  574. "feeds": feeds,
  575. }
  576. def get_clusters_page(
  577. self,
  578. topic: str | None = None,
  579. hours: float = 24,
  580. limit: int = 20,
  581. offset: int = 0,
  582. ) -> list[dict[str, Any]]:
  583. """Paginated cluster listing filtered by payload.timestamp (event time).
  584. payload.timestamp is guaranteed ISO 8601 UTC — filtered and sorted
  585. using _read_ts, not updated_at (row modification time).
  586. """
  587. cutoff_ts = (datetime.now(timezone.utc) - timedelta(hours=hours)).timestamp()
  588. query = "SELECT payload FROM clusters"
  589. params: list = []
  590. if topic and topic != "all":
  591. query += " WHERE topic = ?"
  592. params.append(topic)
  593. with self._conn() as conn:
  594. rows = conn.execute(query, params).fetchall()
  595. filtered = [json.loads(r[0]) for r in rows]
  596. filtered = [c for c in filtered if (_read_ts(c.get("timestamp")) or 0.0) >= cutoff_ts]
  597. filtered.sort(key=lambda c: _read_ts(c.get("timestamp")) or 0.0, reverse=True)
  598. page = filtered[offset:offset + limit]
  599. return [
  600. {
  601. "cluster_id": c.get("cluster_id", ""),
  602. "headline": c.get("headline", ""),
  603. "topic": c.get("topic", ""),
  604. "sentiment": c.get("sentiment", "neutral"),
  605. "sentimentScore": c.get("sentimentScore"),
  606. "importance": c.get("importance", 0),
  607. "entities": c.get("entities", []),
  608. "sources": c.get("sources", []),
  609. "timestamp": c.get("timestamp", ""),
  610. "keywords": c.get("keywords", []),
  611. "article_count": len(c.get("articles", [])),
  612. }
  613. for c in page
  614. ]
  615. def get_sentiment_series(
  616. self,
  617. topic: str | None = None,
  618. hours: float = 24,
  619. bucket_hours: float = 1,
  620. ) -> list[dict[str, Any]]:
  621. """Sentiment score averaged per time bucket.
  622. Filters by payload.timestamp (event time, ISO 8601 UTC guaranteed).
  623. """
  624. cutoff_ts = (datetime.now(timezone.utc) - timedelta(hours=hours)).timestamp()
  625. query = "SELECT payload FROM clusters"
  626. params: list = []
  627. if topic and topic != "all":
  628. query += " WHERE topic = ?"
  629. params.append(topic)
  630. with self._conn() as conn:
  631. rows = conn.execute(query, params).fetchall()
  632. buckets: dict[datetime, list[float]] = {}
  633. for (payload_text,) in rows:
  634. c = json.loads(payload_text)
  635. ts = _read_ts(c.get("timestamp"))
  636. score = c.get("sentimentScore")
  637. if ts is None or score is None:
  638. continue
  639. if ts < cutoff_ts:
  640. continue
  641. dt = datetime.fromtimestamp(ts, tz=timezone.utc)
  642. bucket_key = dt.replace(minute=0, second=0, microsecond=0)
  643. if bucket_hours > 1:
  644. bucket_key = bucket_key.replace(
  645. hour=(bucket_key.hour // int(bucket_hours)) * int(bucket_hours)
  646. )
  647. buckets.setdefault(bucket_key, []).append(float(score))
  648. return [
  649. {
  650. "time": bucket_key.isoformat(),
  651. "avg_sentiment": round(sum(scores) / len(scores), 3),
  652. "count": len(scores),
  653. "min": round(min(scores), 3),
  654. "max": round(max(scores), 3),
  655. }
  656. for bucket_key, scores in sorted(buckets.items())
  657. ]
  658. def get_entity_frequencies(
  659. self,
  660. hours: float = 24,
  661. limit: int = 30,
  662. ) -> list[dict[str, Any]]:
  663. """Top entities by mention count filtered by payload.timestamp (ISO 8601 UTC guaranteed)."""
  664. cutoff_ts = (datetime.now(timezone.utc) - timedelta(hours=hours)).timestamp()
  665. with self._conn() as conn:
  666. rows = conn.execute("SELECT payload FROM clusters").fetchall()
  667. counter: dict[str, int] = {}
  668. for (payload_text,) in rows:
  669. c = json.loads(payload_text)
  670. if (_read_ts(c.get("timestamp")) or 0.0) < cutoff_ts:
  671. continue
  672. for ent in c.get("entities", []):
  673. counter[ent] = counter.get(ent, 0) + 1
  674. result: list[dict[str, Any]] = []
  675. for label, count in sorted(counter.items(), key=lambda x: -x[1])[:limit]:
  676. meta = self.get_entity_metadata(label)
  677. result.append({
  678. "label": label,
  679. "count": count,
  680. "canonical_label": meta["canonical_label"] if meta else label,
  681. "mid": meta["mid"] if meta else None,
  682. })
  683. return result
  684. def get_cluster_detail(self, cluster_id: str) -> dict[str, Any] | None:
  685. """Dashboard-optimized cluster detail fetch."""
  686. with self._conn() as conn:
  687. cur = conn.execute(
  688. "SELECT payload FROM clusters WHERE cluster_id = ?", (cluster_id,)
  689. )
  690. row = cur.fetchone()
  691. if not row:
  692. return None
  693. c = json.loads(row[0])
  694. summary = None
  695. if c.get("summary_payload"):
  696. try:
  697. summary = json.loads(c["summary_payload"])
  698. except Exception:
  699. pass
  700. return {
  701. "cluster_id": c.get("cluster_id"),
  702. "headline": c.get("headline", ""),
  703. "summary": c.get("summary", ""),
  704. "topic": c.get("topic", ""),
  705. "sentiment": c.get("sentiment", "neutral"),
  706. "sentimentScore": c.get("sentimentScore"),
  707. "importance": c.get("importance", 0),
  708. "entities": c.get("entities", []),
  709. "entityResolutions": c.get("entityResolutions", []),
  710. "keywords": c.get("keywords", []),
  711. "sources": c.get("sources", []),
  712. "timestamp": c.get("timestamp", ""),
  713. "first_seen": c.get("first_seen", ""),
  714. "last_updated": c.get("last_updated", ""),
  715. "article_count": len(c.get("articles", [])),
  716. "articles": c.get("articles", []),
  717. "summary_text": summary.get("mergedSummary", "") if summary else "",
  718. "key_facts": summary.get("keyFacts", []) if summary else [],
  719. }