dashboard_store.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357
  1. from __future__ import annotations
  2. import json
  3. from datetime import datetime, timedelta, timezone
  4. from typing import Any
  5. from news_mcp.config import (
  6. NEWS_PRUNE_INTERVAL_HOURS,
  7. NEWS_PRUNING_ENABLED,
  8. NEWS_REFRESH_INTERVAL_SECONDS,
  9. NEWS_RETENTION_DAYS,
  10. DEFAULT_TOPICS,
  11. )
  12. from news_mcp.storage.sqlite_store import SQLiteClusterStore
  13. class DashboardStore:
  14. """Read-only query layer for the dashboard."""
  15. def __init__(self, store=None):
  16. if store is not None:
  17. self._store = store
  18. else:
  19. from news_mcp.config import DB_PATH
  20. self._store = SQLiteClusterStore(DB_PATH)
  21. # ── Health & Stats ──────────────────────────────────────────────
  22. def get_dashboard_stats(self) -> dict[str, Any]:
  23. with self._store._conn() as conn:
  24. total_clusters = conn.execute("SELECT COUNT(*) FROM clusters").fetchone()[0]
  25. total_entities = conn.execute("SELECT COUNT(*) FROM entity_metadata").fetchone()[0]
  26. cluster_entities = conn.execute(
  27. "SELECT COUNT(DISTINCT e.value) "
  28. "FROM clusters, json_each(clusters.payload, '$.entities') AS e"
  29. ).fetchone()[0]
  30. topic_counts = dict(conn.execute(
  31. "SELECT topic, COUNT(*) FROM clusters GROUP BY topic"
  32. ).fetchall())
  33. last_refresh = self._store.get_meta("last_refresh_at")
  34. last_prune = self._store.get_meta("last_prune_at")
  35. # Freshness: did a refresh happen recently? (within 2x the configured interval)
  36. fresh = False
  37. if last_refresh:
  38. try:
  39. dt = datetime.fromisoformat(last_refresh.replace("Z", "+00:00"))
  40. if dt.tzinfo is None:
  41. dt = dt.replace(tzinfo=timezone.utc)
  42. age_hours = (datetime.now(timezone.utc) - dt).total_seconds() / 3600
  43. fresh = age_hours < max(1.0, NEWS_REFRESH_INTERVAL_SECONDS / 3600) * 2
  44. except Exception:
  45. pass
  46. feeds = {}
  47. with self._store._conn() as conn:
  48. for row in conn.execute("SELECT feed_key, last_hash, last_item_count, enabled, updated_at FROM feed_state ORDER BY updated_at DESC"):
  49. feeds[row[0]] = {"last_hash": row[1], "last_item_count": row[2], "enabled": bool(row[3]), "updated_at": row[4]}
  50. return {
  51. "total_clusters": total_clusters,
  52. "total_entities": total_entities,
  53. "cluster_entities": cluster_entities,
  54. "clusters_by_topic": topic_counts,
  55. "last_refresh_at": last_refresh,
  56. "last_prune_at": last_prune,
  57. "data_fresh": fresh,
  58. "feeds": feeds,
  59. "feed_count": len(feeds),
  60. "pruning": {
  61. "enabled": NEWS_PRUNING_ENABLED,
  62. "retention_days": NEWS_RETENTION_DAYS,
  63. "interval_hours": NEWS_PRUNE_INTERVAL_HOURS,
  64. "last_prune_at": last_prune,
  65. },
  66. }
  67. # ── Clusters ────────────────────────────────────────────────────
  68. def get_clusters_page(
  69. self,
  70. topic: str | None = None,
  71. hours: float = 24,
  72. limit: int = 20,
  73. offset: int = 0,
  74. ) -> dict[str, Any]:
  75. """Paginated cluster listing filtered by SQL payload_ts index.
  76. Returns {"clusters": [...], "total": int}.
  77. """
  78. cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat()
  79. query = "SELECT payload FROM clusters WHERE payload_ts >= ?"
  80. params: list = [cutoff]
  81. if topic and topic != "all":
  82. query += " AND topic = ?"
  83. params.append(topic)
  84. # Get total count before pagination
  85. total = self._store._conn().execute(
  86. f"SELECT COUNT(*) FROM ({query})", params
  87. ).fetchone()[0]
  88. query += " ORDER BY payload_ts DESC LIMIT ? OFFSET ?"
  89. params.extend([limit, offset])
  90. with self._store._conn() as conn:
  91. rows = conn.execute(query, params).fetchall()
  92. return {
  93. "clusters": [
  94. {
  95. "cluster_id": c.get("cluster_id", ""),
  96. "headline": c.get("headline", ""),
  97. "topic": c.get("topic", ""),
  98. "sentiment": c.get("sentiment", "neutral"),
  99. "sentimentScore": c.get("sentimentScore"),
  100. "importance": c.get("importance", 0),
  101. "entities": c.get("entities", []),
  102. "sources": c.get("sources", []),
  103. "timestamp": c.get("timestamp", ""),
  104. "keywords": c.get("keywords", []),
  105. "article_count": len(c.get("articles", [])),
  106. }
  107. for c in [json.loads(r[0]) for r in rows]
  108. ],
  109. "total": total,
  110. }
  111. def get_cluster_detail(self, cluster_id: str) -> dict[str, Any] | None:
  112. with self._store._conn() as conn:
  113. cur = conn.execute(
  114. "SELECT payload FROM clusters WHERE cluster_id = ?", (cluster_id,)
  115. )
  116. row = cur.fetchone()
  117. if not row:
  118. return None
  119. c = json.loads(row[0])
  120. summary = None
  121. if c.get("summary_payload"):
  122. try:
  123. summary = json.loads(c["summary_payload"])
  124. except Exception:
  125. pass
  126. return {
  127. "cluster_id": c.get("cluster_id"),
  128. "headline": c.get("headline", ""),
  129. "summary": c.get("summary", ""),
  130. "topic": c.get("topic", ""),
  131. "sentiment": c.get("sentiment", "neutral"),
  132. "sentimentScore": c.get("sentimentScore"),
  133. "importance": c.get("importance", 0),
  134. "entities": c.get("entities", []),
  135. "entityResolutions": c.get("entityResolutions", []),
  136. "keywords": c.get("keywords", []),
  137. "sources": c.get("sources", []),
  138. "timestamp": c.get("timestamp", ""),
  139. "first_seen": c.get("first_seen", ""),
  140. "last_updated": c.get("last_updated", ""),
  141. "article_count": len(c.get("articles", [])),
  142. "articles": c.get("articles", []),
  143. "summary_text": summary.get("mergedSummary", "") if summary else "",
  144. "key_facts": summary.get("keyFacts", []) if summary else [],
  145. }
  146. # ── Sentiment Series ────────────────────────────────────────────
  147. def get_sentiment_series(
  148. self,
  149. topic: str | None = None,
  150. hours: float = 24,
  151. bucket_hours: float = 1,
  152. ) -> list[dict[str, Any]]:
  153. """Sentiment score averaged per time bucket.
  154. Filters by payload_ts SQL index.
  155. """
  156. cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat()
  157. query = "SELECT payload FROM clusters WHERE payload_ts >= ?"
  158. params: list = [cutoff]
  159. if topic and topic != "all":
  160. query += " AND topic = ?"
  161. params.append(topic)
  162. query += " ORDER BY payload_ts ASC"
  163. with self._store._conn() as conn:
  164. rows = conn.execute(query, params).fetchall()
  165. buckets: dict[datetime, list[float]] = {}
  166. for (payload_text,) in rows:
  167. c = json.loads(payload_text)
  168. ts_str = c.get("timestamp")
  169. score = c.get("sentimentScore")
  170. if not ts_str or score is None:
  171. continue
  172. dt = datetime.fromisoformat(str(ts_str).strip())
  173. if dt.tzinfo is None:
  174. dt = dt.replace(tzinfo=timezone.utc)
  175. dt = dt.astimezone(timezone.utc)
  176. bucket_key = dt.replace(minute=0, second=0, microsecond=0)
  177. if bucket_hours > 1:
  178. bucket_key = bucket_key.replace(
  179. hour=(bucket_key.hour // int(bucket_hours)) * int(bucket_hours)
  180. )
  181. buckets.setdefault(bucket_key, []).append(float(score))
  182. return [
  183. {
  184. "time": bucket_key.isoformat(),
  185. "avg_sentiment": round(sum(scores) / len(scores), 3),
  186. "count": len(scores),
  187. "min": round(min(scores), 3),
  188. "max": round(max(scores), 3),
  189. }
  190. for bucket_key, scores in sorted(buckets.items())
  191. ]
  192. # ── Entity Frequencies ──────────────────────────────────────────
  193. def get_entity_frequencies(
  194. self,
  195. hours: float = 24,
  196. limit: int = 30,
  197. ) -> list[dict[str, Any]]:
  198. """Top entities by mention count, using SQL junction table + payload_ts index."""
  199. cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat()
  200. with self._store._conn() as conn:
  201. rows = conn.execute(
  202. """
  203. SELECT ce.entity, COUNT(*) as cnt
  204. FROM cluster_entities ce
  205. JOIN clusters c ON c.cluster_id = ce.cluster_id
  206. WHERE c.payload_ts >= ?
  207. GROUP BY ce.entity
  208. ORDER BY cnt DESC
  209. LIMIT ?
  210. """,
  211. (cutoff, limit),
  212. ).fetchall()
  213. result: list[dict[str, Any]] = []
  214. for label, count in rows:
  215. meta = self._store.get_entity_metadata(label)
  216. result.append({
  217. "label": label,
  218. "count": count,
  219. "canonical_label": meta["canonical_label"] if meta else label,
  220. "mid": meta["mid"] if meta else None,
  221. })
  222. return result
  223. # ── Keyword Frequencies ─────────────────────────────────────────
  224. def get_keyword_frequencies(
  225. self,
  226. hours: float = 24,
  227. limit: int = 30,
  228. ) -> list[dict[str, Any]]:
  229. """Top keywords by mention count, using SQL junction table + payload_ts index.
  230. Excludes DEFAULT_TOPICS labels (crypto, macro, regulation, ai, other).
  231. """
  232. cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat()
  233. _topic_labels = {t.lower() for t in DEFAULT_TOPICS}
  234. with self._store._conn() as conn:
  235. rows = conn.execute(
  236. """
  237. SELECT ck.keyword, COUNT(*) as cnt
  238. FROM cluster_keywords ck
  239. JOIN clusters c ON c.cluster_id = ck.cluster_id
  240. WHERE c.payload_ts >= ?
  241. GROUP BY ck.keyword
  242. ORDER BY cnt DESC
  243. LIMIT ?
  244. """,
  245. (cutoff, limit),
  246. ).fetchall()
  247. return [
  248. {"label": label, "count": count}
  249. for label, count in rows
  250. if label.lower() not in _topic_labels
  251. ]
  252. # ── Entity/Keyword Cluster Search ────────────────────────────────
  253. def get_clusters_by_entity(
  254. self,
  255. entity: str,
  256. hours: float = 168,
  257. limit: int = 50,
  258. offset: int = 0,
  259. ) -> dict[str, Any]:
  260. """Return clusters matching an entity, SQL-level filter via junction table."""
  261. cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat()
  262. entity_norm = entity.strip().lower()
  263. with self._store._conn() as conn:
  264. # Total count
  265. total = conn.execute(
  266. "SELECT COUNT(DISTINCT c.cluster_id) FROM clusters c "
  267. "JOIN cluster_entities ce ON c.cluster_id = ce.cluster_id "
  268. "WHERE c.payload_ts >= ? AND ce.entity = ?",
  269. (cutoff, entity_norm),
  270. ).fetchone()[0]
  271. # Paginated results
  272. rows = conn.execute(
  273. "SELECT DISTINCT c.payload FROM clusters c "
  274. "JOIN cluster_entities ce ON c.cluster_id = ce.cluster_id "
  275. "WHERE c.payload_ts >= ? AND ce.entity = ? "
  276. "ORDER BY c.payload_ts DESC LIMIT ? OFFSET ?",
  277. (cutoff, entity_norm, limit, offset),
  278. ).fetchall()
  279. return {
  280. "entity": entity_norm,
  281. "clusters": [json.loads(r[0]) for r in rows],
  282. "total": total,
  283. "hours": hours,
  284. }
  285. def get_clusters_by_keyword(
  286. self,
  287. keyword: str,
  288. hours: float = 168,
  289. limit: int = 50,
  290. offset: int = 0,
  291. ) -> dict[str, Any]:
  292. """Return clusters matching a keyword, SQL-level filter via junction table."""
  293. cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat()
  294. kw_norm = keyword.strip().lower()
  295. with self._store._conn() as conn:
  296. total = conn.execute(
  297. "SELECT COUNT(DISTINCT c.cluster_id) FROM clusters c "
  298. "JOIN cluster_keywords ck ON c.cluster_id = ck.cluster_id "
  299. "WHERE c.payload_ts >= ? AND ck.keyword = ?",
  300. (cutoff, kw_norm),
  301. ).fetchone()[0]
  302. rows = conn.execute(
  303. "SELECT DISTINCT c.payload FROM clusters c "
  304. "JOIN cluster_keywords ck ON c.cluster_id = ck.cluster_id "
  305. "WHERE c.payload_ts >= ? AND ck.keyword = ? "
  306. "ORDER BY c.payload_ts DESC LIMIT ? OFFSET ?",
  307. (cutoff, kw_norm, limit, offset),
  308. ).fetchall()
  309. return {
  310. "keyword": kw_norm,
  311. "clusters": [json.loads(r[0]) for r in rows],
  312. "total": total,
  313. "hours": hours,
  314. }