dashboard_store.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352
  1. from __future__ import annotations
  2. import json
  3. from datetime import datetime, timedelta, timezone
  4. from typing import Any
  5. from email.utils import parsedate_to_datetime
  6. from news_mcp.config import (
  7. NEWS_PRUNE_INTERVAL_HOURS,
  8. NEWS_PRUNING_ENABLED,
  9. NEWS_REFRESH_INTERVAL_SECONDS,
  10. NEWS_RETENTION_DAYS,
  11. DEFAULT_TOPICS,
  12. )
  13. from news_mcp.storage.sqlite_store import SQLiteClusterStore
  14. class DashboardStore:
  15. """Read-only query layer for the dashboard."""
  16. def __init__(self, store=None):
  17. if store is not None:
  18. self._store = store
  19. else:
  20. from news_mcp.config import DB_PATH
  21. self._store = SQLiteClusterStore(DB_PATH)
  22. # ── Health & Stats ──────────────────────────────────────────────
  23. def get_dashboard_stats(self) -> dict[str, Any]:
  24. with self._store._conn() as conn:
  25. total_clusters = conn.execute("SELECT COUNT(*) FROM clusters").fetchone()[0]
  26. total_entities = conn.execute("SELECT COUNT(*) FROM entity_metadata").fetchone()[0]
  27. cluster_entities = conn.execute(
  28. "SELECT COUNT(DISTINCT e.value) "
  29. "FROM clusters, json_each(clusters.payload, '$.entities') AS e"
  30. ).fetchone()[0]
  31. topic_counts = dict(conn.execute(
  32. "SELECT topic, COUNT(*) FROM clusters GROUP BY topic"
  33. ).fetchall())
  34. last_refresh = self._store.get_meta("last_refresh_at")
  35. last_prune = self._store.get_meta("last_prune_at")
  36. # Freshness: did a refresh happen recently? (within 2x the configured interval)
  37. fresh = False
  38. if last_refresh:
  39. try:
  40. dt = datetime.fromisoformat(last_refresh.replace("Z", "+00:00"))
  41. if dt.tzinfo is None:
  42. dt = dt.replace(tzinfo=timezone.utc)
  43. age_hours = (datetime.now(timezone.utc) - dt).total_seconds() / 3600
  44. fresh = age_hours < max(1.0, NEWS_REFRESH_INTERVAL_SECONDS / 3600) * 2
  45. except Exception:
  46. pass
  47. feeds = {}
  48. with self._store._conn() as conn:
  49. for row in conn.execute("SELECT feed_key, last_hash, last_item_count, enabled, updated_at FROM feed_state ORDER BY updated_at DESC"):
  50. feeds[row[0]] = {"last_hash": row[1], "last_item_count": row[2], "enabled": bool(row[3]), "updated_at": row[4]}
  51. return {
  52. "total_clusters": total_clusters,
  53. "total_entities": total_entities,
  54. "cluster_entities": cluster_entities,
  55. "clusters_by_topic": topic_counts,
  56. "last_refresh_at": last_refresh,
  57. "last_prune_at": last_prune,
  58. "data_fresh": fresh,
  59. "feeds": feeds,
  60. "feed_count": len(feeds),
  61. "pruning": {
  62. "enabled": NEWS_PRUNING_ENABLED,
  63. "retention_days": NEWS_RETENTION_DAYS,
  64. "interval_hours": NEWS_PRUNE_INTERVAL_HOURS,
  65. "last_prune_at": last_prune,
  66. },
  67. }
  68. # ── Clusters ────────────────────────────────────────────────────
  69. def get_clusters_page(
  70. self,
  71. topic: str | None = None,
  72. hours: float = 24,
  73. limit: int = 20,
  74. offset: int = 0,
  75. ) -> list[dict[str, Any]]:
  76. cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat()
  77. now = datetime.now(timezone.utc).isoformat()
  78. query = "SELECT payload FROM clusters WHERE updated_at >= ? AND updated_at <= ?"
  79. params: list = [cutoff, now]
  80. if topic and topic != "all":
  81. query += " AND topic = ?"
  82. params.append(topic)
  83. query += " ORDER BY updated_at DESC LIMIT ? OFFSET ?"
  84. params.extend([limit, offset])
  85. with self._store._conn() as conn:
  86. cur = conn.execute(query, params)
  87. rows = cur.fetchall()
  88. clusters: list[dict[str, Any]] = []
  89. for (payload_text,) in rows:
  90. c = json.loads(payload_text)
  91. clusters.append({
  92. "cluster_id": c.get("cluster_id", ""),
  93. "headline": c.get("headline", ""),
  94. "topic": c.get("topic", ""),
  95. "sentiment": c.get("sentiment", "neutral"),
  96. "sentimentScore": c.get("sentimentScore"),
  97. "importance": c.get("importance", 0),
  98. "entities": c.get("entities", []),
  99. "sources": c.get("sources", []),
  100. "timestamp": c.get("timestamp", ""),
  101. "keywords": c.get("keywords", []),
  102. "article_count": len(c.get("articles", [])),
  103. })
  104. return clusters
  105. def get_cluster_detail(self, cluster_id: str) -> dict[str, Any] | None:
  106. with self._store._conn() as conn:
  107. cur = conn.execute(
  108. "SELECT payload FROM clusters WHERE cluster_id = ?", (cluster_id,)
  109. )
  110. row = cur.fetchone()
  111. if not row:
  112. return None
  113. c = json.loads(row[0])
  114. summary = None
  115. if c.get("summary_payload"):
  116. try:
  117. summary = json.loads(c["summary_payload"])
  118. except Exception:
  119. pass
  120. return {
  121. "cluster_id": c.get("cluster_id"),
  122. "headline": c.get("headline", ""),
  123. "summary": c.get("summary", ""),
  124. "topic": c.get("topic", ""),
  125. "sentiment": c.get("sentiment", "neutral"),
  126. "sentimentScore": c.get("sentimentScore"),
  127. "importance": c.get("importance", 0),
  128. "entities": c.get("entities", []),
  129. "entityResolutions": c.get("entityResolutions", []),
  130. "keywords": c.get("keywords", []),
  131. "sources": c.get("sources", []),
  132. "timestamp": c.get("timestamp", ""),
  133. "first_seen": c.get("first_seen", ""),
  134. "last_updated": c.get("last_updated", ""),
  135. "article_count": len(c.get("articles", [])),
  136. "articles": c.get("articles", []),
  137. "summary_text": summary.get("mergedSummary", "") if summary else "",
  138. "key_facts": summary.get("keyFacts", []) if summary else [],
  139. }
  140. # ── Sentiment Series ────────────────────────────────────────────
  141. def get_sentiment_series(
  142. self,
  143. topic: str | None = None,
  144. hours: float = 24,
  145. bucket_hours: float = 1,
  146. ) -> list[dict[str, Any]]:
  147. """Sentiment score averaged per time bucket.
  148. Filters by the cluster's own event timestamp (payload.timestamp),
  149. not by updated_at which tracks row modification time.
  150. """
  151. cutoff = datetime.now(timezone.utc) - timedelta(hours=hours)
  152. query = "SELECT payload FROM clusters"
  153. params: list = []
  154. if topic and topic != "all":
  155. query += " WHERE topic = ?"
  156. params.append(topic)
  157. query += " ORDER BY updated_at ASC"
  158. with self._store._conn() as conn:
  159. cur = conn.execute(query, params)
  160. rows = cur.fetchall()
  161. def _parse_ts(ts: Any) -> datetime | None:
  162. if not ts:
  163. return None
  164. s = str(ts).strip()
  165. try:
  166. dt = datetime.fromisoformat(s.replace("Z", "+00:00"))
  167. except Exception:
  168. try:
  169. dt = parsedate_to_datetime(s)
  170. except Exception:
  171. return None
  172. if dt.tzinfo is None:
  173. dt = dt.replace(tzinfo=timezone.utc)
  174. return dt.astimezone(timezone.utc)
  175. buckets: dict[datetime, list[float]] = {}
  176. for (payload_text,) in rows:
  177. c = json.loads(payload_text)
  178. dt = _parse_ts(c.get("timestamp"))
  179. score = c.get("sentimentScore")
  180. if dt is None or score is None:
  181. continue
  182. if dt < cutoff:
  183. continue
  184. bucket_key = dt.replace(minute=0, second=0, microsecond=0)
  185. if bucket_hours > 1:
  186. bucket_key = bucket_key.replace(
  187. hour=(bucket_key.hour // int(bucket_hours)) * int(bucket_hours)
  188. )
  189. buckets.setdefault(bucket_key, []).append(float(score))
  190. series: list[dict[str, Any]] = []
  191. for bucket_key in sorted(buckets):
  192. scores = buckets[bucket_key]
  193. series.append({
  194. "time": bucket_key.isoformat(),
  195. "avg_sentiment": round(sum(scores) / len(scores), 3),
  196. "count": len(scores),
  197. "min": round(min(scores), 3),
  198. "max": round(max(scores), 3),
  199. })
  200. return series
  201. # ── Entity Frequencies ──────────────────────────────────────────
  202. def get_entity_frequencies(
  203. self,
  204. hours: float = 24,
  205. limit: int = 30,
  206. ) -> list[dict[str, Any]]:
  207. """Top entities by mention count in recent clusters.
  208. Filters by the cluster's own event timestamp (payload.timestamp),
  209. not by updated_at which tracks row modification time.
  210. """
  211. cutoff = datetime.now(timezone.utc) - timedelta(hours=hours)
  212. query = "SELECT payload FROM clusters"
  213. params: list = []
  214. with self._store._conn() as conn:
  215. cur = conn.execute(query, params)
  216. rows = cur.fetchall()
  217. def _parse_ts(ts):
  218. if not ts:
  219. return None
  220. s = str(ts).strip()
  221. try:
  222. dt = datetime.fromisoformat(s.replace("Z", "+00:00"))
  223. except Exception:
  224. try:
  225. from email.utils import parsedate_to_datetime
  226. dt = parsedate_to_datetime(s)
  227. except Exception:
  228. return None
  229. if dt.tzinfo is None:
  230. dt = dt.replace(tzinfo=timezone.utc)
  231. return dt.astimezone(timezone.utc)
  232. counter: dict[str, int] = {}
  233. for (payload_text,) in rows:
  234. c = json.loads(payload_text)
  235. dt = _parse_ts(c.get("timestamp"))
  236. if dt is None:
  237. continue
  238. if dt < cutoff:
  239. continue
  240. for ent in c.get("entities", []):
  241. counter[ent] = counter.get(ent, 0) + 1
  242. sorted_entities = sorted(counter.items(), key=lambda x: -x[1])[:limit]
  243. result: list[dict[str, Any]] = []
  244. for label, count in sorted_entities:
  245. meta = self._store.get_entity_metadata(label)
  246. result.append({
  247. "label": label,
  248. "count": count,
  249. "canonical_label": meta["canonical_label"] if meta else label,
  250. "mid": meta["mid"] if meta else None,
  251. })
  252. return result
  253. # ── Keyword Frequencies ─────────────────────────────────────────
  254. def get_keyword_frequencies(
  255. self,
  256. hours: float = 24,
  257. limit: int = 30,
  258. ) -> list[dict[str, Any]]:
  259. """Top keywords by occurrence count in recent clusters.
  260. Mirrors get_entity_frequencies but for LLM-curated thematic keywords.
  261. Filters by the cluster's own event timestamp (payload.timestamp).
  262. Only includes keywords that are NOT already extracted as entities
  263. in the same cluster — the entity signal is higher quality and is
  264. already shown in the entity frequencies view.
  265. """
  266. cutoff = datetime.now(timezone.utc) - timedelta(hours=hours)
  267. query = "SELECT payload FROM clusters"
  268. params: list = []
  269. with self._store._conn() as conn:
  270. cur = conn.execute(query, params)
  271. rows = cur.fetchall()
  272. def _parse_ts(ts):
  273. if not ts:
  274. return None
  275. s = str(ts).strip()
  276. try:
  277. dt = datetime.fromisoformat(s.replace("Z", "+00:00"))
  278. except Exception:
  279. try:
  280. dt = parsedate_to_datetime(s)
  281. except Exception:
  282. return None
  283. if dt.tzinfo is None:
  284. dt = dt.replace(tzinfo=timezone.utc)
  285. return dt.astimezone(timezone.utc)
  286. counter: dict[str, int] = {}
  287. _topic_labels = {t.lower() for t in DEFAULT_TOPICS}
  288. for (payload_text,) in rows:
  289. c = json.loads(payload_text)
  290. dt = _parse_ts(c.get("timestamp"))
  291. if dt is None:
  292. continue
  293. if dt < cutoff:
  294. continue
  295. # Get entities in this cluster to dedup against keywords
  296. ents_in_cluster = {str(e).strip().lower() for e in (c.get("entities", []) or []) if str(e).strip()}
  297. for kw in c.get("keywords", []):
  298. kw_str = str(kw).strip()
  299. if not kw_str:
  300. continue
  301. # Skip topic labels (crypto, macro, regulation, ai, other)
  302. # that the LLM sometimes returns as keywords.
  303. if kw_str.lower() in _topic_labels:
  304. continue
  305. # Skip keywords that are already entities in this cluster
  306. if kw_str.lower() in ents_in_cluster:
  307. continue
  308. counter[kw_str] = counter.get(kw_str, 0) + 1
  309. sorted_kws = sorted(counter.items(), key=lambda x: -x[1])[:limit]
  310. return [
  311. {"label": label, "count": count}
  312. for label, count in sorted_kws
  313. ]