| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249 |
- from __future__ import annotations
- import json
- from datetime import datetime, timedelta, timezone
- from typing import Any
- from email.utils import parsedate_to_datetime
- from news_mcp.config import (
- NEWS_PRUNE_INTERVAL_HOURS,
- NEWS_PRUNING_ENABLED,
- NEWS_REFRESH_INTERVAL_SECONDS,
- NEWS_RETENTION_DAYS,
- )
- from news_mcp.storage.sqlite_store import SQLiteClusterStore
- class DashboardStore:
- """Read-only query layer for the dashboard."""
- def __init__(self, store=None):
- if store is not None:
- self._store = store
- else:
- from news_mcp.config import DB_PATH
- self._store = SQLiteClusterStore(DB_PATH)
- # ── Health & Stats ──────────────────────────────────────────────
- def get_dashboard_stats(self) -> dict[str, Any]:
- with self._store._conn() as conn:
- total_clusters = conn.execute("SELECT COUNT(*) FROM clusters").fetchone()[0]
- total_entities = conn.execute("SELECT COUNT(*) FROM entity_metadata").fetchone()[0]
- cluster_entities = conn.execute(
- "SELECT COUNT(DISTINCT e.value) "
- "FROM clusters, json_each(clusters.payload, '$.entities') AS e"
- ).fetchone()[0]
- topic_counts = dict(conn.execute(
- "SELECT topic, COUNT(*) FROM clusters GROUP BY topic"
- ).fetchall())
- last_refresh = self._store.get_meta("last_refresh_at")
- last_prune = self._store.get_meta("last_prune_at")
-
- # Freshness: did a refresh happen recently? (within 2x the configured interval)
- fresh = False
- if last_refresh:
- try:
- dt = datetime.fromisoformat(last_refresh.replace("Z", "+00:00"))
- if dt.tzinfo is None:
- dt = dt.replace(tzinfo=timezone.utc)
- age_hours = (datetime.now(timezone.utc) - dt).total_seconds() / 3600
- fresh = age_hours < max(1.0, NEWS_REFRESH_INTERVAL_SECONDS / 3600) * 2
- except Exception:
- pass
- feeds = {}
- with self._store._conn() as conn:
- for row in conn.execute("SELECT feed_key, last_hash, last_item_count, enabled, updated_at FROM feed_state ORDER BY updated_at DESC"):
- feeds[row[0]] = {"last_hash": row[1], "last_item_count": row[2], "enabled": bool(row[3]), "updated_at": row[4]}
- return {
- "total_clusters": total_clusters,
- "total_entities": total_entities,
- "cluster_entities": cluster_entities,
- "clusters_by_topic": topic_counts,
- "last_refresh_at": last_refresh,
- "last_prune_at": last_prune,
- "data_fresh": fresh,
- "feeds": feeds,
- "feed_count": len(feeds),
- "pruning": {
- "enabled": NEWS_PRUNING_ENABLED,
- "retention_days": NEWS_RETENTION_DAYS,
- "interval_hours": NEWS_PRUNE_INTERVAL_HOURS,
- "last_prune_at": last_prune,
- },
- }
- # ── Clusters ────────────────────────────────────────────────────
- def get_clusters_page(
- self,
- topic: str | None = None,
- hours: float = 24,
- limit: int = 20,
- offset: int = 0,
- ) -> list[dict[str, Any]]:
- cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat()
- query = "SELECT payload FROM clusters WHERE updated_at >= ?"
- params: list = [cutoff]
- if topic and topic != "all":
- query += " AND topic = ?"
- params.append(topic)
- query += " ORDER BY updated_at DESC LIMIT ? OFFSET ?"
- params.extend([limit, offset])
- with self._store._conn() as conn:
- cur = conn.execute(query, params)
- rows = cur.fetchall()
- clusters: list[dict[str, Any]] = []
- for (payload_text,) in rows:
- c = json.loads(payload_text)
- clusters.append({
- "cluster_id": c.get("cluster_id", ""),
- "headline": c.get("headline", ""),
- "topic": c.get("topic", ""),
- "sentiment": c.get("sentiment", "neutral"),
- "sentimentScore": c.get("sentimentScore"),
- "importance": c.get("importance", 0),
- "entities": c.get("entities", []),
- "sources": c.get("sources", []),
- "timestamp": c.get("timestamp", ""),
- "keywords": c.get("keywords", []),
- "article_count": len(c.get("articles", [])),
- })
- return clusters
- def get_cluster_detail(self, cluster_id: str) -> dict[str, Any] | None:
- with self._store._conn() as conn:
- cur = conn.execute(
- "SELECT payload FROM clusters WHERE cluster_id = ?", (cluster_id,)
- )
- row = cur.fetchone()
- if not row:
- return None
- c = json.loads(row[0])
- summary = None
- if c.get("summary_payload"):
- try:
- summary = json.loads(c["summary_payload"])
- except Exception:
- pass
- return {
- "cluster_id": c.get("cluster_id"),
- "headline": c.get("headline", ""),
- "summary": c.get("summary", ""),
- "topic": c.get("topic", ""),
- "sentiment": c.get("sentiment", "neutral"),
- "sentimentScore": c.get("sentimentScore"),
- "importance": c.get("importance", 0),
- "entities": c.get("entities", []),
- "entityResolutions": c.get("entityResolutions", []),
- "keywords": c.get("keywords", []),
- "sources": c.get("sources", []),
- "timestamp": c.get("timestamp", ""),
- "first_seen": c.get("first_seen", ""),
- "last_updated": c.get("last_updated", ""),
- "article_count": len(c.get("articles", [])),
- "articles": c.get("articles", []),
- "summary_text": summary.get("mergedSummary", "") if summary else "",
- "key_facts": summary.get("keyFacts", []) if summary else [],
- }
- # ── Sentiment Series ────────────────────────────────────────────
- def get_sentiment_series(
- self,
- topic: str | None = None,
- hours: float = 24,
- bucket_hours: float = 1,
- ) -> list[dict[str, Any]]:
- cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat()
- query = "SELECT payload FROM clusters WHERE updated_at >= ?"
- params: list = [cutoff]
- if topic and topic != "all":
- query += " AND topic = ?"
- params.append(topic)
- query += " ORDER BY updated_at ASC"
- with self._store._conn() as conn:
- cur = conn.execute(query, params)
- rows = cur.fetchall()
- def _parse_ts(ts: Any) -> datetime | None:
- if not ts:
- return None
- s = str(ts)
- try:
- dt = datetime.fromisoformat(s.replace("Z", "+00:00"))
- except Exception:
- try:
- dt = parsedate_to_datetime(s)
- except Exception:
- return None
- if dt.tzinfo is None:
- dt = dt.replace(tzinfo=timezone.utc)
- return dt.astimezone(timezone.utc)
- step_hours = max(1, int(bucket_hours))
- buckets: dict[datetime, list[float]] = {}
- for (payload_text,) in rows:
- c = json.loads(payload_text)
- dt = _parse_ts(c.get("timestamp"))
- score = c.get("sentimentScore")
- if dt is None or score is None:
- continue
- bucket_key = dt.replace(minute=0, second=0, microsecond=0)
- if step_hours > 1:
- bucket_key = bucket_key.replace(
- hour=(bucket_key.hour // step_hours) * step_hours
- )
- buckets.setdefault(bucket_key, []).append(float(score))
- series: list[dict[str, Any]] = []
- for bucket_key in sorted(buckets):
- scores = buckets[bucket_key]
- series.append({
- "time": bucket_key.isoformat(),
- "avg_sentiment": round(sum(scores) / len(scores), 3),
- "count": len(scores),
- "min": round(min(scores), 3),
- "max": round(max(scores), 3),
- })
- return series
- # ── Entity Frequencies ──────────────────────────────────────────
- def get_entity_frequencies(
- self,
- hours: float = 24,
- limit: int = 30,
- ) -> list[dict[str, Any]]:
- cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat()
- with self._store._conn() as conn:
- cur = conn.execute(
- "SELECT payload FROM clusters WHERE updated_at >= ? "
- "ORDER BY updated_at DESC LIMIT 500",
- (cutoff,),
- )
- rows = cur.fetchall()
- counter: dict[str, int] = {}
- for (payload_text,) in rows:
- c = json.loads(payload_text)
- for ent in c.get("entities", []):
- counter[ent] = counter.get(ent, 0) + 1
- sorted_entities = sorted(counter.items(), key=lambda x: -x[1])[:limit]
- result: list[dict[str, Any]] = []
- for label, count in sorted_entities:
- meta = self._store.get_entity_metadata(label)
- result.append({
- "label": label,
- "count": count,
- "canonical_label": meta["canonical_label"] if meta else label,
- "mid": meta["mid"] if meta else None,
- })
- return result
|