from __future__ import annotations import json from datetime import datetime, timedelta, timezone from typing import Any from news_mcp.config import ( NEWS_PRUNE_INTERVAL_HOURS, NEWS_PRUNING_ENABLED, NEWS_REFRESH_INTERVAL_SECONDS, NEWS_RETENTION_DAYS, ) from news_mcp.storage.sqlite_store import SQLiteClusterStore class DashboardStore: """Read-only query layer for the dashboard.""" def __init__(self, store=None): if store is not None: self._store = store else: from news_mcp.config import DB_PATH self._store = SQLiteClusterStore(DB_PATH) # ── Health & Stats ────────────────────────────────────────────── def get_dashboard_stats(self) -> dict[str, Any]: with self._store._conn() as conn: total_clusters = conn.execute("SELECT COUNT(*) FROM clusters").fetchone()[0] total_entities = conn.execute("SELECT COUNT(*) FROM entity_metadata").fetchone()[0] cluster_entities = conn.execute( "SELECT COUNT(DISTINCT e.value) " "FROM clusters, json_each(clusters.payload, '$.entities') AS e" ).fetchone()[0] topic_counts = dict(conn.execute( "SELECT topic, COUNT(*) FROM clusters GROUP BY topic" ).fetchall()) last_refresh = self._store.get_meta("last_refresh_at") last_prune = self._store.get_meta("last_prune_at") # Freshness: did a refresh happen recently? (within 2x the configured interval) fresh = False if last_refresh: try: dt = datetime.fromisoformat(last_refresh.replace("Z", "+00:00")) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) age_hours = (datetime.now(timezone.utc) - dt).total_seconds() / 3600 fresh = age_hours < max(1.0, NEWS_REFRESH_INTERVAL_SECONDS / 3600) * 2 except Exception: pass feeds = {} with self._store._conn() as conn: for row in conn.execute("SELECT feed_key, last_hash, last_item_count, updated_at FROM feed_state ORDER BY updated_at DESC"): feeds[row[0]] = {"last_hash": row[1], "last_item_count": row[2], "updated_at": row[3]} return { "total_clusters": total_clusters, "total_entities": total_entities, "cluster_entities": cluster_entities, "clusters_by_topic": topic_counts, "last_refresh_at": last_refresh, "last_prune_at": last_prune, "data_fresh": fresh, "feeds": feeds, "feed_count": len(feeds), "pruning": { "enabled": NEWS_PRUNING_ENABLED, "retention_days": NEWS_RETENTION_DAYS, "interval_hours": NEWS_PRUNE_INTERVAL_HOURS, "last_prune_at": last_prune, }, } # ── Clusters ──────────────────────────────────────────────────── def get_clusters_page( self, topic: str | None = None, hours: float = 24, limit: int = 20, offset: int = 0, ) -> list[dict[str, Any]]: cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat() query = "SELECT payload FROM clusters WHERE updated_at >= ?" params: list = [cutoff] if topic and topic != "all": query += " AND topic = ?" params.append(topic) query += " ORDER BY updated_at DESC LIMIT ? OFFSET ?" params.extend([limit, offset]) with self._store._conn() as conn: cur = conn.execute(query, params) rows = cur.fetchall() clusters: list[dict[str, Any]] = [] for (payload_text,) in rows: c = json.loads(payload_text) clusters.append({ "cluster_id": c.get("cluster_id", ""), "headline": c.get("headline", ""), "topic": c.get("topic", ""), "sentiment": c.get("sentiment", "neutral"), "sentimentScore": c.get("sentimentScore"), "importance": c.get("importance", 0), "entities": c.get("entities", []), "sources": c.get("sources", []), "timestamp": c.get("timestamp", ""), "keywords": c.get("keywords", []), "article_count": len(c.get("articles", [])), }) return clusters def get_cluster_detail(self, cluster_id: str) -> dict[str, Any] | None: with self._store._conn() as conn: cur = conn.execute( "SELECT payload FROM clusters WHERE cluster_id = ?", (cluster_id,) ) row = cur.fetchone() if not row: return None c = json.loads(row[0]) summary = None if c.get("summary_payload"): try: summary = json.loads(c["summary_payload"]) except Exception: pass return { "cluster_id": c.get("cluster_id"), "headline": c.get("headline", ""), "summary": c.get("summary", ""), "topic": c.get("topic", ""), "sentiment": c.get("sentiment", "neutral"), "sentimentScore": c.get("sentimentScore"), "importance": c.get("importance", 0), "entities": c.get("entities", []), "entityResolutions": c.get("entityResolutions", []), "keywords": c.get("keywords", []), "sources": c.get("sources", []), "timestamp": c.get("timestamp", ""), "first_seen": c.get("first_seen", ""), "last_updated": c.get("last_updated", ""), "article_count": len(c.get("articles", [])), "articles": c.get("articles", []), "summary_text": summary.get("mergedSummary", "") if summary else "", "key_facts": summary.get("keyFacts", []) if summary else [], } # ── Sentiment Series ──────────────────────────────────────────── def get_sentiment_series( self, topic: str | None = None, hours: float = 24, bucket_hours: float = 1, ) -> list[dict[str, Any]]: cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat() query = "SELECT payload FROM clusters WHERE updated_at >= ?" params: list = [cutoff] if topic and topic != "all": query += " AND topic = ?" params.append(topic) query += " ORDER BY updated_at ASC" with self._store._conn() as conn: cur = conn.execute(query, params) rows = cur.fetchall() def _parse_ts(ts: Any) -> datetime | None: if not ts: return None try: dt = datetime.fromisoformat(str(ts).replace("Z", "+00:00")) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) return dt.astimezone(timezone.utc) except Exception: return None step_hours = max(1, int(bucket_hours)) buckets: dict[datetime, list[float]] = {} for (payload_text,) in rows: c = json.loads(payload_text) dt = _parse_ts(c.get("timestamp")) score = c.get("sentimentScore") if dt is None or score is None: continue bucket_key = dt.replace(minute=0, second=0, microsecond=0) if step_hours > 1: bucket_key = bucket_key.replace( hour=(bucket_key.hour // step_hours) * step_hours ) buckets.setdefault(bucket_key, []).append(float(score)) series: list[dict[str, Any]] = [] for bucket_key in sorted(buckets): scores = buckets[bucket_key] series.append({ "time": bucket_key.isoformat(), "avg_sentiment": round(sum(scores) / len(scores), 3), "count": len(scores), "min": round(min(scores), 3), "max": round(max(scores), 3), }) return series # ── Entity Frequencies ────────────────────────────────────────── def get_entity_frequencies( self, hours: float = 24, limit: int = 30, ) -> list[dict[str, Any]]: cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat() with self._store._conn() as conn: cur = conn.execute( "SELECT payload FROM clusters WHERE updated_at >= ? " "ORDER BY updated_at DESC LIMIT 500", (cutoff,), ) rows = cur.fetchall() counter: dict[str, int] = {} for (payload_text,) in rows: c = json.loads(payload_text) for ent in c.get("entities", []): counter[ent] = counter.get(ent, 0) + 1 sorted_entities = sorted(counter.items(), key=lambda x: -x[1])[:limit] result: list[dict[str, Any]] = [] for label, count in sorted_entities: meta = self._store.get_entity_metadata(label) result.append({ "label": label, "count": count, "canonical_label": meta["canonical_label"] if meta else label, "mid": meta["mid"] if meta else None, }) return result