dashboard_store.py 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
  1. from __future__ import annotations
  2. import json
  3. from datetime import datetime, timedelta, timezone
  4. from typing import Any
  5. from news_mcp.config import (
  6. NEWS_PRUNE_INTERVAL_HOURS,
  7. NEWS_PRUNING_ENABLED,
  8. NEWS_REFRESH_INTERVAL_SECONDS,
  9. NEWS_RETENTION_DAYS,
  10. )
  11. from news_mcp.storage.sqlite_store import SQLiteClusterStore
  12. class DashboardStore:
  13. """Read-only query layer for the dashboard."""
  14. def __init__(self, store=None):
  15. if store is not None:
  16. self._store = store
  17. else:
  18. from news_mcp.config import DB_PATH
  19. self._store = SQLiteClusterStore(DB_PATH)
  20. # ── Health & Stats ──────────────────────────────────────────────
  21. def get_dashboard_stats(self) -> dict[str, Any]:
  22. with self._store._conn() as conn:
  23. total_clusters = conn.execute("SELECT COUNT(*) FROM clusters").fetchone()[0]
  24. total_entities = conn.execute("SELECT COUNT(*) FROM entity_metadata").fetchone()[0]
  25. topic_counts = dict(conn.execute(
  26. "SELECT topic, COUNT(*) FROM clusters GROUP BY topic"
  27. ).fetchall())
  28. last_refresh = self._store.get_meta("last_refresh_at")
  29. last_prune = self._store.get_meta("last_prune_at")
  30. # Freshness: did a refresh happen recently? (within 2x the configured interval)
  31. fresh = False
  32. if last_refresh:
  33. try:
  34. dt = datetime.fromisoformat(last_refresh.replace("Z", "+00:00"))
  35. if dt.tzinfo is None:
  36. dt = dt.replace(tzinfo=timezone.utc)
  37. age_hours = (datetime.now(timezone.utc) - dt).total_seconds() / 3600
  38. fresh = age_hours < max(1.0, NEWS_REFRESH_INTERVAL_SECONDS / 3600) * 2
  39. except Exception:
  40. pass
  41. feeds = {}
  42. with self._store._conn() as conn:
  43. for row in conn.execute("SELECT feed_key, last_hash, updated_at FROM feed_state"):
  44. feeds[row[0]] = {"last_hash": row[1], "updated_at": row[2]}
  45. return {
  46. "total_clusters": total_clusters,
  47. "total_entities": total_entities,
  48. "clusters_by_topic": topic_counts,
  49. "last_refresh_at": last_refresh,
  50. "last_prune_at": last_prune,
  51. "data_fresh": fresh,
  52. "feeds": feeds,
  53. "pruning": {
  54. "enabled": NEWS_PRUNING_ENABLED,
  55. "retention_days": NEWS_RETENTION_DAYS,
  56. "interval_hours": NEWS_PRUNE_INTERVAL_HOURS,
  57. "last_prune_at": last_prune,
  58. },
  59. }
  60. # ── Clusters ────────────────────────────────────────────────────
  61. def get_clusters_page(
  62. self,
  63. topic: str | None = None,
  64. hours: float = 24,
  65. limit: int = 20,
  66. offset: int = 0,
  67. ) -> list[dict[str, Any]]:
  68. cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat()
  69. query = "SELECT payload FROM clusters WHERE updated_at >= ?"
  70. params: list = [cutoff]
  71. if topic and topic != "all":
  72. query += " AND topic = ?"
  73. params.append(topic)
  74. query += " ORDER BY updated_at DESC LIMIT ? OFFSET ?"
  75. params.extend([limit, offset])
  76. with self._store._conn() as conn:
  77. cur = conn.execute(query, params)
  78. rows = cur.fetchall()
  79. clusters: list[dict[str, Any]] = []
  80. for (payload_text,) in rows:
  81. c = json.loads(payload_text)
  82. clusters.append({
  83. "cluster_id": c.get("cluster_id", ""),
  84. "headline": c.get("headline", ""),
  85. "topic": c.get("topic", ""),
  86. "sentiment": c.get("sentiment", "neutral"),
  87. "sentimentScore": c.get("sentimentScore"),
  88. "importance": c.get("importance", 0),
  89. "entities": c.get("entities", []),
  90. "sources": c.get("sources", []),
  91. "timestamp": c.get("timestamp", ""),
  92. "keywords": c.get("keywords", []),
  93. "article_count": len(c.get("articles", [])),
  94. })
  95. return clusters
  96. def get_cluster_detail(self, cluster_id: str) -> dict[str, Any] | None:
  97. with self._store._conn() as conn:
  98. cur = conn.execute(
  99. "SELECT payload FROM clusters WHERE cluster_id = ?", (cluster_id,)
  100. )
  101. row = cur.fetchone()
  102. if not row:
  103. return None
  104. c = json.loads(row[0])
  105. summary = None
  106. if c.get("summary_payload"):
  107. try:
  108. summary = json.loads(c["summary_payload"])
  109. except Exception:
  110. pass
  111. return {
  112. "cluster_id": c.get("cluster_id"),
  113. "headline": c.get("headline", ""),
  114. "summary": c.get("summary", ""),
  115. "topic": c.get("topic", ""),
  116. "sentiment": c.get("sentiment", "neutral"),
  117. "sentimentScore": c.get("sentimentScore"),
  118. "importance": c.get("importance", 0),
  119. "entities": c.get("entities", []),
  120. "entityResolutions": c.get("entityResolutions", []),
  121. "keywords": c.get("keywords", []),
  122. "sources": c.get("sources", []),
  123. "timestamp": c.get("timestamp", ""),
  124. "first_seen": c.get("first_seen", ""),
  125. "last_updated": c.get("last_updated", ""),
  126. "article_count": len(c.get("articles", [])),
  127. "articles": c.get("articles", []),
  128. "summary_text": summary.get("mergedSummary", "") if summary else "",
  129. "key_facts": summary.get("keyFacts", []) if summary else [],
  130. }
  131. # ── Sentiment Series ────────────────────────────────────────────
  132. def get_sentiment_series(
  133. self,
  134. topic: str | None = None,
  135. hours: float = 24,
  136. bucket_hours: float = 1,
  137. ) -> list[dict[str, Any]]:
  138. cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat()
  139. query = "SELECT payload FROM clusters WHERE updated_at >= ?"
  140. params: list = [cutoff]
  141. if topic and topic != "all":
  142. query += " AND topic = ?"
  143. params.append(topic)
  144. query += " ORDER BY updated_at ASC"
  145. with self._store._conn() as conn:
  146. cur = conn.execute(query, params)
  147. rows = cur.fetchall()
  148. def _parse_ts(ts: Any) -> datetime | None:
  149. if not ts:
  150. return None
  151. try:
  152. dt = datetime.fromisoformat(str(ts).replace("Z", "+00:00"))
  153. if dt.tzinfo is None:
  154. dt = dt.replace(tzinfo=timezone.utc)
  155. return dt.astimezone(timezone.utc)
  156. except Exception:
  157. return None
  158. step_hours = max(1, int(bucket_hours))
  159. buckets: dict[datetime, list[float]] = {}
  160. for (payload_text,) in rows:
  161. c = json.loads(payload_text)
  162. dt = _parse_ts(c.get("timestamp"))
  163. score = c.get("sentimentScore")
  164. if dt is None or score is None:
  165. continue
  166. bucket_key = dt.replace(minute=0, second=0, microsecond=0)
  167. if step_hours > 1:
  168. bucket_key = bucket_key.replace(
  169. hour=(bucket_key.hour // step_hours) * step_hours
  170. )
  171. buckets.setdefault(bucket_key, []).append(float(score))
  172. series: list[dict[str, Any]] = []
  173. for bucket_key in sorted(buckets):
  174. scores = buckets[bucket_key]
  175. series.append({
  176. "time": bucket_key.isoformat(),
  177. "avg_sentiment": round(sum(scores) / len(scores), 3),
  178. "count": len(scores),
  179. "min": round(min(scores), 3),
  180. "max": round(max(scores), 3),
  181. })
  182. return series
  183. # ── Entity Frequencies ──────────────────────────────────────────
  184. def get_entity_frequencies(
  185. self,
  186. hours: float = 24,
  187. limit: int = 30,
  188. ) -> list[dict[str, Any]]:
  189. cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat()
  190. with self._store._conn() as conn:
  191. cur = conn.execute(
  192. "SELECT payload FROM clusters WHERE updated_at >= ? "
  193. "ORDER BY updated_at DESC LIMIT 500",
  194. (cutoff,),
  195. )
  196. rows = cur.fetchall()
  197. counter: dict[str, int] = {}
  198. for (payload_text,) in rows:
  199. c = json.loads(payload_text)
  200. for ent in c.get("entities", []):
  201. counter[ent] = counter.get(ent, 0) + 1
  202. sorted_entities = sorted(counter.items(), key=lambda x: -x[1])[:limit]
  203. result: list[dict[str, Any]] = []
  204. for label, count in sorted_entities:
  205. meta = self._store.get_entity_metadata(label)
  206. result.append({
  207. "label": label,
  208. "count": count,
  209. "canonical_label": meta["canonical_label"] if meta else label,
  210. "mid": meta["mid"] if meta else None,
  211. })
  212. return result