dashboard_store.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280
  1. from __future__ import annotations
  2. import json
  3. from datetime import datetime, timedelta, timezone
  4. from typing import Any
  5. from news_mcp.config import (
  6. NEWS_PRUNE_INTERVAL_HOURS,
  7. NEWS_PRUNING_ENABLED,
  8. NEWS_REFRESH_INTERVAL_SECONDS,
  9. NEWS_RETENTION_DAYS,
  10. DEFAULT_TOPICS,
  11. )
  12. from news_mcp.storage.sqlite_store import SQLiteClusterStore, _read_ts
  13. class DashboardStore:
  14. """Read-only query layer for the dashboard."""
  15. def __init__(self, store=None):
  16. if store is not None:
  17. self._store = store
  18. else:
  19. from news_mcp.config import DB_PATH
  20. self._store = SQLiteClusterStore(DB_PATH)
  21. # ── Health & Stats ──────────────────────────────────────────────
  22. def get_dashboard_stats(self) -> dict[str, Any]:
  23. with self._store._conn() as conn:
  24. total_clusters = conn.execute("SELECT COUNT(*) FROM clusters").fetchone()[0]
  25. total_entities = conn.execute("SELECT COUNT(*) FROM entity_metadata").fetchone()[0]
  26. cluster_entities = conn.execute(
  27. "SELECT COUNT(DISTINCT e.value) "
  28. "FROM clusters, json_each(clusters.payload, '$.entities') AS e"
  29. ).fetchone()[0]
  30. topic_counts = dict(conn.execute(
  31. "SELECT topic, COUNT(*) FROM clusters GROUP BY topic"
  32. ).fetchall())
  33. last_refresh = self._store.get_meta("last_refresh_at")
  34. last_prune = self._store.get_meta("last_prune_at")
  35. # Freshness: did a refresh happen recently? (within 2x the configured interval)
  36. fresh = False
  37. if last_refresh:
  38. try:
  39. dt = datetime.fromisoformat(last_refresh.replace("Z", "+00:00"))
  40. if dt.tzinfo is None:
  41. dt = dt.replace(tzinfo=timezone.utc)
  42. age_hours = (datetime.now(timezone.utc) - dt).total_seconds() / 3600
  43. fresh = age_hours < max(1.0, NEWS_REFRESH_INTERVAL_SECONDS / 3600) * 2
  44. except Exception:
  45. pass
  46. feeds = {}
  47. with self._store._conn() as conn:
  48. for row in conn.execute("SELECT feed_key, last_hash, last_item_count, enabled, updated_at FROM feed_state ORDER BY updated_at DESC"):
  49. feeds[row[0]] = {"last_hash": row[1], "last_item_count": row[2], "enabled": bool(row[3]), "updated_at": row[4]}
  50. return {
  51. "total_clusters": total_clusters,
  52. "total_entities": total_entities,
  53. "cluster_entities": cluster_entities,
  54. "clusters_by_topic": topic_counts,
  55. "last_refresh_at": last_refresh,
  56. "last_prune_at": last_prune,
  57. "data_fresh": fresh,
  58. "feeds": feeds,
  59. "feed_count": len(feeds),
  60. "pruning": {
  61. "enabled": NEWS_PRUNING_ENABLED,
  62. "retention_days": NEWS_RETENTION_DAYS,
  63. "interval_hours": NEWS_PRUNE_INTERVAL_HOURS,
  64. "last_prune_at": last_prune,
  65. },
  66. }
  67. # ── Clusters ────────────────────────────────────────────────────
  68. def get_clusters_page(
  69. self,
  70. topic: str | None = None,
  71. hours: float = 24,
  72. limit: int = 20,
  73. offset: int = 0,
  74. ) -> list[dict[str, Any]]:
  75. """Paginated cluster listing filtered by payload.timestamp (event time).
  76. payload.timestamp is guaranteed ISO 8601 UTC — uses _read_ts from
  77. sqlite_store. Do NOT filter by updated_at (row mod time).
  78. """
  79. cutoff_ts = (datetime.now(timezone.utc) - timedelta(hours=hours)).timestamp()
  80. query = "SELECT payload FROM clusters"
  81. params: list = []
  82. if topic and topic != "all":
  83. query += " WHERE topic = ?"
  84. params.append(topic)
  85. with self._store._conn() as conn:
  86. rows = conn.execute(query, params).fetchall()
  87. filtered = [json.loads(r[0]) for r in rows]
  88. filtered = [c for c in filtered if (_read_ts(c.get("timestamp")) or 0.0) >= cutoff_ts]
  89. filtered.sort(key=lambda c: _read_ts(c.get("timestamp")) or 0.0, reverse=True)
  90. page = filtered[offset:offset + limit]
  91. return [
  92. {
  93. "cluster_id": c.get("cluster_id", ""),
  94. "headline": c.get("headline", ""),
  95. "topic": c.get("topic", ""),
  96. "sentiment": c.get("sentiment", "neutral"),
  97. "sentimentScore": c.get("sentimentScore"),
  98. "importance": c.get("importance", 0),
  99. "entities": c.get("entities", []),
  100. "sources": c.get("sources", []),
  101. "timestamp": c.get("timestamp", ""),
  102. "keywords": c.get("keywords", []),
  103. "article_count": len(c.get("articles", [])),
  104. }
  105. for c in page
  106. ]
  107. def get_cluster_detail(self, cluster_id: str) -> dict[str, Any] | None:
  108. with self._store._conn() as conn:
  109. cur = conn.execute(
  110. "SELECT payload FROM clusters WHERE cluster_id = ?", (cluster_id,)
  111. )
  112. row = cur.fetchone()
  113. if not row:
  114. return None
  115. c = json.loads(row[0])
  116. summary = None
  117. if c.get("summary_payload"):
  118. try:
  119. summary = json.loads(c["summary_payload"])
  120. except Exception:
  121. pass
  122. return {
  123. "cluster_id": c.get("cluster_id"),
  124. "headline": c.get("headline", ""),
  125. "summary": c.get("summary", ""),
  126. "topic": c.get("topic", ""),
  127. "sentiment": c.get("sentiment", "neutral"),
  128. "sentimentScore": c.get("sentimentScore"),
  129. "importance": c.get("importance", 0),
  130. "entities": c.get("entities", []),
  131. "entityResolutions": c.get("entityResolutions", []),
  132. "keywords": c.get("keywords", []),
  133. "sources": c.get("sources", []),
  134. "timestamp": c.get("timestamp", ""),
  135. "first_seen": c.get("first_seen", ""),
  136. "last_updated": c.get("last_updated", ""),
  137. "article_count": len(c.get("articles", [])),
  138. "articles": c.get("articles", []),
  139. "summary_text": summary.get("mergedSummary", "") if summary else "",
  140. "key_facts": summary.get("keyFacts", []) if summary else [],
  141. }
  142. # ── Sentiment Series ────────────────────────────────────────────
  143. def get_sentiment_series(
  144. self,
  145. topic: str | None = None,
  146. hours: float = 24,
  147. bucket_hours: float = 1,
  148. ) -> list[dict[str, Any]]:
  149. """Sentiment score averaged per time bucket.
  150. Filters by payload.timestamp (event time, ISO 8601 UTC guaranteed).
  151. """
  152. cutoff_ts = (datetime.now(timezone.utc) - timedelta(hours=hours)).timestamp()
  153. query = "SELECT payload FROM clusters"
  154. params: list = []
  155. if topic and topic != "all":
  156. query += " WHERE topic = ?"
  157. params.append(topic)
  158. with self._store._conn() as conn:
  159. rows = conn.execute(query, params).fetchall()
  160. buckets: dict[datetime, list[float]] = {}
  161. for (payload_text,) in rows:
  162. c = json.loads(payload_text)
  163. ts = _read_ts(c.get("timestamp"))
  164. score = c.get("sentimentScore")
  165. if ts is None or score is None or ts < cutoff_ts:
  166. continue
  167. dt = datetime.fromtimestamp(ts, tz=timezone.utc)
  168. bucket_key = dt.replace(minute=0, second=0, microsecond=0)
  169. if bucket_hours > 1:
  170. bucket_key = bucket_key.replace(
  171. hour=(bucket_key.hour // int(bucket_hours)) * int(bucket_hours)
  172. )
  173. buckets.setdefault(bucket_key, []).append(float(score))
  174. return [
  175. {
  176. "time": bucket_key.isoformat(),
  177. "avg_sentiment": round(sum(scores) / len(scores), 3),
  178. "count": len(scores),
  179. "min": round(min(scores), 3),
  180. "max": round(max(scores), 3),
  181. }
  182. for bucket_key, scores in sorted(buckets.items())
  183. ]
  184. # ── Entity Frequencies ──────────────────────────────────────────
  185. def get_entity_frequencies(
  186. self,
  187. hours: float = 24,
  188. limit: int = 30,
  189. ) -> list[dict[str, Any]]:
  190. """Top entities by mention count filtered by payload.timestamp (ISO 8601 UTC guaranteed)."""
  191. cutoff_ts = (datetime.now(timezone.utc) - timedelta(hours=hours)).timestamp()
  192. with self._store._conn() as conn:
  193. rows = conn.execute("SELECT payload FROM clusters").fetchall()
  194. counter: dict[str, int] = {}
  195. for (payload_text,) in rows:
  196. c = json.loads(payload_text)
  197. if (_read_ts(c.get("timestamp")) or 0.0) < cutoff_ts:
  198. continue
  199. for ent in c.get("entities", []):
  200. counter[ent] = counter.get(ent, 0) + 1
  201. result: list[dict[str, Any]] = []
  202. for label, count in sorted(counter.items(), key=lambda x: -x[1])[:limit]:
  203. meta = self._store.get_entity_metadata(label)
  204. result.append({
  205. "label": label,
  206. "count": count,
  207. "canonical_label": meta["canonical_label"] if meta else label,
  208. "mid": meta["mid"] if meta else None,
  209. })
  210. return result
  211. # ── Keyword Frequencies ─────────────────────────────────────────
  212. def get_keyword_frequencies(
  213. self,
  214. hours: float = 24,
  215. limit: int = 30,
  216. ) -> list[dict[str, Any]]:
  217. """Top keywords by occurrence count filtered by payload.timestamp (ISO 8601 UTC guaranteed).
  218. Excludes keywords that are already entities in the same cluster,
  219. and excludes DEFAULT_TOPICS labels (crypto, macro, regulation, ai, other).
  220. """
  221. cutoff_ts = (datetime.now(timezone.utc) - timedelta(hours=hours)).timestamp()
  222. _topic_labels = {t.lower() for t in DEFAULT_TOPICS}
  223. with self._store._conn() as conn:
  224. rows = conn.execute("SELECT payload FROM clusters").fetchall()
  225. counter: dict[str, int] = {}
  226. for (payload_text,) in rows:
  227. c = json.loads(payload_text)
  228. if (_read_ts(c.get("timestamp")) or 0.0) < cutoff_ts:
  229. continue
  230. ents_in_cluster = {str(e).strip().lower() for e in (c.get("entities", []) or []) if str(e).strip()}
  231. for kw in c.get("keywords", []):
  232. kw_str = str(kw).strip()
  233. if not kw_str:
  234. continue
  235. if kw_str.lower() in _topic_labels:
  236. continue
  237. if kw_str.lower() in ents_in_cluster:
  238. continue
  239. counter[kw_str] = counter.get(kw_str, 0) + 1
  240. return [
  241. {"label": label, "count": count}
  242. for label, count in sorted(counter.items(), key=lambda x: -x[1])[:limit]
  243. ]