from __future__ import annotations import json from datetime import datetime, timedelta, timezone from typing import Any from news_mcp.config import ( NEWS_PRUNE_INTERVAL_HOURS, NEWS_PRUNING_ENABLED, NEWS_REFRESH_INTERVAL_SECONDS, NEWS_RETENTION_DAYS, DEFAULT_TOPICS, ) from news_mcp.storage.sqlite_store import SQLiteClusterStore class DashboardStore: """Read-only query layer for the dashboard.""" def __init__(self, store=None): if store is not None: self._store = store else: from news_mcp.config import DB_PATH self._store = SQLiteClusterStore(DB_PATH) # ── Health & Stats ────────────────────────────────────────────── def get_dashboard_stats(self) -> dict[str, Any]: with self._store._conn() as conn: total_clusters = conn.execute("SELECT COUNT(*) FROM clusters").fetchone()[0] total_entities = conn.execute("SELECT COUNT(*) FROM entity_metadata").fetchone()[0] cluster_entities = conn.execute( "SELECT COUNT(DISTINCT e.value) " "FROM clusters, json_each(clusters.payload, '$.entities') AS e" ).fetchone()[0] topic_counts = dict(conn.execute( "SELECT topic, COUNT(*) FROM clusters GROUP BY topic" ).fetchall()) last_refresh = self._store.get_meta("last_refresh_at") last_prune = self._store.get_meta("last_prune_at") # Freshness: did a refresh happen recently? (within 2x the configured interval) fresh = False if last_refresh: try: dt = datetime.fromisoformat(last_refresh.replace("Z", "+00:00")) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) age_hours = (datetime.now(timezone.utc) - dt).total_seconds() / 3600 fresh = age_hours < max(1.0, NEWS_REFRESH_INTERVAL_SECONDS / 3600) * 2 except Exception: pass feeds = {} with self._store._conn() as conn: for row in conn.execute("SELECT feed_key, last_hash, last_item_count, enabled, updated_at FROM feed_state ORDER BY updated_at DESC"): feeds[row[0]] = {"last_hash": row[1], "last_item_count": row[2], "enabled": bool(row[3]), "updated_at": row[4]} return { "total_clusters": total_clusters, "total_entities": total_entities, "cluster_entities": cluster_entities, "clusters_by_topic": topic_counts, "last_refresh_at": last_refresh, "last_prune_at": last_prune, "data_fresh": fresh, "feeds": feeds, "feed_count": len(feeds), "pruning": { "enabled": NEWS_PRUNING_ENABLED, "retention_days": NEWS_RETENTION_DAYS, "interval_hours": NEWS_PRUNE_INTERVAL_HOURS, "last_prune_at": last_prune, }, } # ── Clusters ──────────────────────────────────────────────────── def get_clusters_page( self, topic: str | None = None, hours: float = 24, limit: int = 20, offset: int = 0, ) -> dict[str, Any]: """Paginated cluster listing filtered by SQL payload_ts index. Returns {"clusters": [...], "total": int}. """ cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat() query = "SELECT payload FROM clusters WHERE payload_ts >= ?" params: list = [cutoff] if topic and topic != "all": query += " AND topic = ?" params.append(topic) # Get total count before pagination total = self._store._conn().execute( f"SELECT COUNT(*) FROM ({query})", params ).fetchone()[0] query += " ORDER BY payload_ts DESC LIMIT ? OFFSET ?" params.extend([limit, offset]) with self._store._conn() as conn: rows = conn.execute(query, params).fetchall() return { "clusters": [ { "cluster_id": c.get("cluster_id", ""), "headline": c.get("headline", ""), "topic": c.get("topic", ""), "sentiment": c.get("sentiment", "neutral"), "sentimentScore": c.get("sentimentScore"), "importance": c.get("importance", 0), "entities": c.get("entities", []), "sources": c.get("sources", []), "timestamp": c.get("timestamp", ""), "keywords": c.get("keywords", []), "article_count": len(c.get("articles", [])), } for c in [json.loads(r[0]) for r in rows] ], "total": total, } def get_cluster_detail(self, cluster_id: str) -> dict[str, Any] | None: with self._store._conn() as conn: cur = conn.execute( "SELECT payload FROM clusters WHERE cluster_id = ?", (cluster_id,) ) row = cur.fetchone() if not row: return None c = json.loads(row[0]) summary = None if c.get("summary_payload"): try: summary = json.loads(c["summary_payload"]) except Exception: pass return { "cluster_id": c.get("cluster_id"), "headline": c.get("headline", ""), "summary": c.get("summary", ""), "topic": c.get("topic", ""), "sentiment": c.get("sentiment", "neutral"), "sentimentScore": c.get("sentimentScore"), "importance": c.get("importance", 0), "entities": c.get("entities", []), "entityResolutions": c.get("entityResolutions", []), "keywords": c.get("keywords", []), "sources": c.get("sources", []), "timestamp": c.get("timestamp", ""), "first_seen": c.get("first_seen", ""), "last_updated": c.get("last_updated", ""), "article_count": len(c.get("articles", [])), "articles": c.get("articles", []), "summary_text": summary.get("mergedSummary", "") if summary else "", "key_facts": summary.get("keyFacts", []) if summary else [], } # ── Sentiment Series ──────────────────────────────────────────── def get_sentiment_series( self, topic: str | None = None, hours: float = 24, bucket_hours: float = 1, ) -> list[dict[str, Any]]: """Sentiment score averaged per time bucket. Filters by payload_ts SQL index. """ cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat() query = "SELECT payload FROM clusters WHERE payload_ts >= ?" params: list = [cutoff] if topic and topic != "all": query += " AND topic = ?" params.append(topic) query += " ORDER BY payload_ts ASC" with self._store._conn() as conn: rows = conn.execute(query, params).fetchall() buckets: dict[datetime, list[float]] = {} for (payload_text,) in rows: c = json.loads(payload_text) ts_str = c.get("timestamp") score = c.get("sentimentScore") if not ts_str or score is None: continue dt = datetime.fromisoformat(str(ts_str).strip()) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) dt = dt.astimezone(timezone.utc) bucket_key = dt.replace(minute=0, second=0, microsecond=0) if bucket_hours > 1: bucket_key = bucket_key.replace( hour=(bucket_key.hour // int(bucket_hours)) * int(bucket_hours) ) buckets.setdefault(bucket_key, []).append(float(score)) return [ { "time": bucket_key.isoformat(), "avg_sentiment": round(sum(scores) / len(scores), 3), "count": len(scores), "min": round(min(scores), 3), "max": round(max(scores), 3), } for bucket_key, scores in sorted(buckets.items()) ] # ── Entity Frequencies ────────────────────────────────────────── def get_entity_frequencies( self, hours: float = 24, limit: int = 30, ) -> list[dict[str, Any]]: """Top entities by mention count, using SQL junction table + payload_ts index.""" cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat() with self._store._conn() as conn: rows = conn.execute( """ SELECT ce.entity, COUNT(*) as cnt FROM cluster_entities ce JOIN clusters c ON c.cluster_id = ce.cluster_id WHERE c.payload_ts >= ? GROUP BY ce.entity ORDER BY cnt DESC LIMIT ? """, (cutoff, limit), ).fetchall() result: list[dict[str, Any]] = [] for label, count in rows: meta = self._store.get_entity_metadata(label) result.append({ "label": label, "count": count, "canonical_label": meta["canonical_label"] if meta else label, "mid": meta["mid"] if meta else None, }) return result # ── Keyword Frequencies ───────────────────────────────────────── def get_keyword_frequencies( self, hours: float = 24, limit: int = 30, ) -> list[dict[str, Any]]: """Top keywords by mention count, using SQL junction table + payload_ts index. Excludes DEFAULT_TOPICS labels (crypto, macro, regulation, ai, other). """ cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat() _topic_labels = {t.lower() for t in DEFAULT_TOPICS} with self._store._conn() as conn: rows = conn.execute( """ SELECT ck.keyword, COUNT(*) as cnt FROM cluster_keywords ck JOIN clusters c ON c.cluster_id = ck.cluster_id WHERE c.payload_ts >= ? GROUP BY ck.keyword ORDER BY cnt DESC LIMIT ? """, (cutoff, limit), ).fetchall() return [ {"label": label, "count": count} for label, count in rows if label.lower() not in _topic_labels ] # ── Entity/Keyword Cluster Search ──────────────────────────────── def get_clusters_by_entity( self, entity: str, hours: float = 168, limit: int = 50, offset: int = 0, ) -> dict[str, Any]: """Return clusters matching an entity, SQL-level filter via junction table.""" cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat() entity_norm = entity.strip().lower() with self._store._conn() as conn: # Total count total = conn.execute( "SELECT COUNT(DISTINCT c.cluster_id) FROM clusters c " "JOIN cluster_entities ce ON c.cluster_id = ce.cluster_id " "WHERE c.payload_ts >= ? AND ce.entity = ?", (cutoff, entity_norm), ).fetchone()[0] # Paginated results rows = conn.execute( "SELECT DISTINCT c.payload FROM clusters c " "JOIN cluster_entities ce ON c.cluster_id = ce.cluster_id " "WHERE c.payload_ts >= ? AND ce.entity = ? " "ORDER BY c.payload_ts DESC LIMIT ? OFFSET ?", (cutoff, entity_norm, limit, offset), ).fetchall() return { "entity": entity_norm, "clusters": [json.loads(r[0]) for r in rows], "total": total, "hours": hours, } def get_clusters_by_keyword( self, keyword: str, hours: float = 168, limit: int = 50, offset: int = 0, ) -> dict[str, Any]: """Return clusters matching a keyword, SQL-level filter via junction table.""" cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat() kw_norm = keyword.strip().lower() with self._store._conn() as conn: total = conn.execute( "SELECT COUNT(DISTINCT c.cluster_id) FROM clusters c " "JOIN cluster_keywords ck ON c.cluster_id = ck.cluster_id " "WHERE c.payload_ts >= ? AND ck.keyword = ?", (cutoff, kw_norm), ).fetchone()[0] rows = conn.execute( "SELECT DISTINCT c.payload FROM clusters c " "JOIN cluster_keywords ck ON c.cluster_id = ck.cluster_id " "WHERE c.payload_ts >= ? AND ck.keyword = ? " "ORDER BY c.payload_ts DESC LIMIT ? OFFSET ?", (cutoff, kw_norm, limit, offset), ).fetchall() return { "keyword": kw_norm, "clusters": [json.loads(r[0]) for r in rows], "total": total, "hours": hours, }