from __future__ import annotations import json import sqlite3 from dataclasses import dataclass from datetime import datetime, timezone, timedelta from pathlib import Path from typing import Any from email.utils import parsedate_to_datetime from news_mcp.article_identity import article_key from news_mcp.config import ( NEWS_PRUNE_INTERVAL_HOURS, NEWS_PRUNING_ENABLED, NEWS_REFRESH_INTERVAL_SECONDS, NEWS_RETENTION_DAYS, ) from news_mcp.entity_normalize import normalize_entities from news_mcp.trends_resolution import resolve_entity_via_trends def _normalize_ts(ts: Any) -> str: """Parse any timestamp string and return ISO 8601 UTC. Handles ISO 8601, RFC 2822/HTTP-date, and unix epoch seconds. Returns empty string if unparseable. """ if ts is None: return "" if isinstance(ts, (int, float)): try: dt = datetime.fromtimestamp(float(ts), tz=timezone.utc) return dt.strftime("%Y-%m-%dT%H:%M:%S+00:00") except Exception: return "" text = str(ts).strip() if not text: return "" # Try ISO 8601 try: dt = datetime.fromisoformat(text.replace("Z", "+00:00")) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) return dt.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S+00:00") except Exception: pass # Try RFC 2822 / HTTP-date try: dt = parsedate_to_datetime(text) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) return dt.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S+00:00") except Exception: pass # Return original if we can't parse — better than losing data return text def _read_ts(ts: Any) -> float | None: """Parse a stored, already-normalized ISO 8601 UTC timestamp to a unix float. All payload.timestamp / payload.first_seen / payload.last_updated values are guaranteed YYYY-MM-DDTHH:MM:SS+00:00 at write time (enforced by sanitize_cluster_payload → _normalize_ts). Only datetime.fromisoformat is needed here. Do NOT add RFC 2822 / parsedate_to_datetime fallbacks — if this function can't parse a stored timestamp it means the normalization pipeline has a bug that should be fixed there, not papered over here. """ if not ts: return None try: dt = datetime.fromisoformat(str(ts).strip()) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) return dt.astimezone(timezone.utc).timestamp() except Exception: return None @dataclass class ClusterRow: cluster_id: str topic: str payload: dict updated_at: datetime META_LAST_PRUNE_AT = "last_prune_at" # For internal use — canonical name is article_key(article) from article_identity _article_key = article_key def _dedup_articles(articles: list[dict[str, Any]]) -> list[dict[str, Any]]: seen: set[str] = set() out: list[dict[str, Any]] = [] for article in articles: key = _article_key(article) if key in seen: continue seen.add(key) out.append(article) return out def _has_valid_entity_resolutions(resolutions: Any, entities: list[str]) -> bool: if not isinstance(resolutions, list): return False if len(resolutions) != len(entities): return False for res in resolutions: if not isinstance(res, dict): return False if not res.get("normalized") or not res.get("canonical_label"): return False return True def sanitize_cluster_payload(cluster: dict[str, Any], *, include_resolutions: bool = True) -> dict[str, Any]: """Normalize cluster payload so every stored payload is internally consistent.""" out = dict(cluster) raw_articles = out.get("articles", []) or [] articles = [a for a in raw_articles if isinstance(a, dict)] # Normalize article timestamps, clamping future dates to now. now_str = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S+00:00") for a in articles: if "timestamp" in a: a["timestamp"] = _normalize_ts(a["timestamp"]) if a["timestamp"] > now_str: a["timestamp"] = now_str out["articles"] = _dedup_articles(articles) raw_entities = out.get("entities", []) or [] entities = normalize_entities(raw_entities) out["entities"] = entities # Normalize cluster-level timestamps, clamping future dates to now. now_str = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S+00:00") for field in ("timestamp", "last_updated", "first_seen"): if field in out and out[field]: ts = _normalize_ts(out[field]) if ts > now_str: ts = now_str out[field] = ts # Ensure timestamp is always present for the generated column index. # Prefer existing timestamp, then first_seen, then last_updated, then now. for src in ("timestamp", "first_seen", "last_updated"): if out.get(src): out.setdefault("timestamp", out[src]) break if not out.get("timestamp"): out["timestamp"] = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S+00:00") # Preserve enrichment metadata across sanitization so the # poller's enriched_at cache check works on DB round-trips. for _fld in ("enriched_at", "enrichment_failed_at", "enrichment_retry_count"): if _fld in cluster: out.setdefault(_fld, cluster[_fld]) if not include_resolutions: return out resolutions = out.get("entityResolutions", None) if entities: if not _has_valid_entity_resolutions(resolutions, entities): out["entityResolutions"] = [resolve_entity_via_trends(e) for e in entities] else: # Keep the empty case explicit and stable. out["entityResolutions"] = [] return out class SQLiteClusterStore: def __init__(self, db_path: str | Path): self.db_path = str(db_path) self._init_db() def _conn(self) -> sqlite3.Connection: return sqlite3.connect(self.db_path) def _init_db(self) -> None: Path(self.db_path).parent.mkdir(parents=True, exist_ok=True) with self._conn() as conn: conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA synchronous=NORMAL") conn.execute("PRAGMA busy_timeout=5000") conn.execute( """ CREATE TABLE IF NOT EXISTS clusters ( cluster_id TEXT PRIMARY KEY, topic TEXT NOT NULL, payload TEXT NOT NULL, updated_at TEXT NOT NULL, summary_payload TEXT, summary_updated_at TEXT ) """ ) # If the table already exists without the summary columns, # add them (SQLite-friendly incremental migrations). for col_def in [ "summary_payload TEXT", "summary_updated_at TEXT", ]: col = col_def.split()[0] try: conn.execute(f"ALTER TABLE clusters ADD COLUMN {col_def}") except sqlite3.OperationalError: pass conn.execute( "CREATE INDEX IF NOT EXISTS idx_clusters_topic ON clusters(topic)" ) conn.execute( "CREATE INDEX IF NOT EXISTS idx_clusters_updated_at ON clusters(updated_at)" ) # Generated column for indexed event-time filtering (VIRTUAL for compatibility) try: conn.execute( "ALTER TABLE clusters ADD COLUMN payload_ts " "GENERATED ALWAYS AS (json_extract(payload, '$.timestamp')) VIRTUAL" ) except sqlite3.OperationalError: pass # column already exists conn.execute( "CREATE INDEX IF NOT EXISTS idx_clusters_payload_ts ON clusters(payload_ts)" ) # Junction tables for SQL-level entity/keyword search conn.execute( """ CREATE TABLE IF NOT EXISTS cluster_entities ( cluster_id TEXT NOT NULL REFERENCES clusters(cluster_id) ON DELETE CASCADE, entity TEXT NOT NULL, PRIMARY KEY (cluster_id, entity) ) """ ) conn.execute( "CREATE INDEX IF NOT EXISTS idx_cluster_entities_entity ON cluster_entities(entity)" ) conn.execute( """ CREATE TABLE IF NOT EXISTS cluster_keywords ( cluster_id TEXT NOT NULL REFERENCES clusters(cluster_id) ON DELETE CASCADE, keyword TEXT NOT NULL, PRIMARY KEY (cluster_id, keyword) ) """ ) conn.execute( "CREATE INDEX IF NOT EXISTS idx_cluster_keywords_keyword ON cluster_keywords(keyword)" ) # Seen-articles table: tracks every article_key that has been # clustered, so the poller can skip already-processed articles # entirely (no re-clustering, no re-enrichment). conn.execute( """ CREATE TABLE IF NOT EXISTS seen_articles ( article_key TEXT PRIMARY KEY, cluster_id TEXT NOT NULL, first_seen TEXT NOT NULL, url TEXT NOT NULL DEFAULT '' ) """ ) try: cur = conn.execute("PRAGMA table_info(entity_metadata)") cols = [row[1] for row in cur.fetchall()] if cols and "entity_id" not in cols: conn.execute("DROP TABLE entity_metadata") except sqlite3.OperationalError: pass conn.execute( """ CREATE TABLE IF NOT EXISTS entity_metadata ( entity_id TEXT PRIMARY KEY, normalized_label TEXT NOT NULL, canonical_label TEXT, mid TEXT, sources_json TEXT, updated_at TEXT, last_requested_at TEXT ) """ ) conn.execute( "CREATE UNIQUE INDEX IF NOT EXISTS idx_entity_metadata_mid ON entity_metadata(mid) WHERE mid IS NOT NULL" ) conn.execute( """ CREATE TABLE IF NOT EXISTS feed_state ( feed_key TEXT PRIMARY KEY, last_hash TEXT NOT NULL, last_item_count INTEGER, enabled INTEGER DEFAULT 1, updated_at TEXT NOT NULL ) """ ) try: conn.execute("ALTER TABLE feed_state ADD COLUMN enabled INTEGER DEFAULT 1") except sqlite3.OperationalError: pass conn.execute( """ CREATE TABLE IF NOT EXISTS meta ( key TEXT PRIMARY KEY, value TEXT NOT NULL ) """ ) # Seed site_config from .env / defaults (no-op if already populated) from news_mcp.site_config import seed_site_config seeded = seed_site_config(conn) if seeded: import logging logging.getLogger(__name__).info("site_config: seeded %d rows from env/defaults", seeded) def upsert_clusters(self, clusters: list[dict], topic: str) -> None: now = datetime.now(timezone.utc) with self._conn() as conn: for c in clusters: c = sanitize_cluster_payload(c) cluster_id = c["cluster_id"] payload = json.dumps(c, ensure_ascii=False) conn.execute( "INSERT INTO clusters(cluster_id, topic, payload, updated_at) VALUES(?,?,?,?) " "ON CONFLICT(cluster_id) DO UPDATE SET topic=excluded.topic, payload=excluded.payload, updated_at=excluded.updated_at", (cluster_id, topic, payload, now.isoformat()), ) # Populate junction tables for SQL-level entity/keyword search. # DELETE first so re-enrichment replaces stale entries. conn.execute("DELETE FROM cluster_entities WHERE cluster_id=?", (cluster_id,)) conn.execute("DELETE FROM cluster_keywords WHERE cluster_id=?", (cluster_id,)) for entity in c.get("entities", []): ent_norm = str(entity).strip().lower() if ent_norm: conn.execute( "INSERT OR IGNORE INTO cluster_entities(cluster_id, entity) VALUES(?, ?)", (cluster_id, ent_norm), ) for kw in c.get("keywords", []): kw_norm = str(kw).strip().lower() if kw_norm: conn.execute( "INSERT OR IGNORE INTO cluster_keywords(cluster_id, keyword) VALUES(?, ?)", (cluster_id, kw_norm), ) # Record every article in seen_articles so the poller can # skip already-processed articles on future cycles. for art in c.get("articles", []): akey = _article_key(art) if akey: art_url = str(art.get("url") or "").strip() conn.execute( "INSERT OR IGNORE INTO seen_articles(article_key, cluster_id, first_seen, url) VALUES(?,?,?,?)", (akey, cluster_id, now.isoformat(), art_url), ) def upsert_cluster_summary( self, cluster_id: str, summary_payload: dict, ) -> None: now = datetime.now(timezone.utc).isoformat() with self._conn() as conn: conn.execute( "UPDATE clusters SET summary_payload=?, summary_updated_at=? WHERE cluster_id=?", ( json.dumps(summary_payload, ensure_ascii=False), now, cluster_id, ), ) def get_cluster_summary(self, cluster_id: str, ttl_hours: float) -> dict | None: cutoff = datetime.now(timezone.utc) - timedelta(hours=ttl_hours) cutoff_iso = cutoff.isoformat() with self._conn() as conn: cur = conn.execute( "SELECT summary_payload, summary_updated_at FROM clusters " "WHERE cluster_id=? AND summary_updated_at >= ?", (cluster_id, cutoff_iso), ) row = cur.fetchone() if not row or not row[0]: return None return json.loads(row[0]) def get_latest_clusters(self, topic: str, ttl_hours: float, limit: int) -> list[dict]: """Return newest clusters by event timestamp, filtered via SQL payload_ts index.""" cutoff = (datetime.now(timezone.utc) - timedelta(hours=float(ttl_hours))).isoformat() with self._conn() as conn: cur = conn.execute( "SELECT payload FROM clusters WHERE topic=? AND payload_ts >= ? ORDER BY payload_ts DESC LIMIT ?", (topic, cutoff, int(limit)), ) return [json.loads(r[0]) for r in cur.fetchall()] def get_latest_clusters_all_topics(self, ttl_hours: float, limit: int) -> list[dict]: """Return newest clusters across all topics, filtered via SQL payload_ts index.""" cutoff = (datetime.now(timezone.utc) - timedelta(hours=float(ttl_hours))).isoformat() with self._conn() as conn: cur = conn.execute( "SELECT payload FROM clusters WHERE payload_ts >= ? ORDER BY payload_ts DESC LIMIT ?", (cutoff, int(limit)), ) return [json.loads(r[0]) for r in cur.fetchall()] def get_cluster_by_id(self, cluster_id: str) -> dict | None: with self._conn() as conn: cur = conn.execute( "SELECT payload FROM clusters WHERE cluster_id=?", (cluster_id,), ) row = cur.fetchone() return json.loads(row[0]) if row else None def get_feed_hash(self, feed_key: str) -> str | None: with self._conn() as conn: cur = conn.execute( "SELECT last_hash FROM feed_state WHERE feed_key=?", (feed_key,), ) row = cur.fetchone() return row[0] if row else None def set_feed_hash(self, feed_key: str, last_hash: str) -> None: self.set_feed_state(feed_key, last_hash, None) def set_feed_state(self, feed_key: str, last_hash: str, last_item_count: int | None = None) -> None: now = datetime.now(timezone.utc).isoformat() with self._conn() as conn: conn.execute( "INSERT INTO feed_state(feed_key, last_hash, last_item_count, updated_at) VALUES(?,?,?,?) " "ON CONFLICT(feed_key) DO UPDATE SET last_hash=excluded.last_hash, last_item_count=excluded.last_item_count, updated_at=excluded.updated_at", (feed_key, last_hash, last_item_count, now), ) def get_feed_state(self, feed_key: str) -> dict | None: with self._conn() as conn: cur = conn.execute( "SELECT last_hash, updated_at FROM feed_state WHERE feed_key=?", (feed_key,), ) row = cur.fetchone() if not row: return None return {"last_hash": row[0], "updated_at": row[1]} def get_all_feed_states(self) -> list[dict[str, Any]]: """All feed_state rows.""" with self._conn() as conn: cur = conn.execute( "SELECT feed_key, last_hash, last_item_count, enabled, updated_at FROM feed_state ORDER BY updated_at DESC" ) return [ { "feed_key": row[0], "last_hash": row[1], "last_item_count": row[2], "enabled": bool(row[3]), "updated_at": row[4], } for row in cur.fetchall() ] def feed_ensure_seeded(self, feed_urls: list[str]) -> None: """Insert any feed URLs not yet present in feed_state (enabled by default).""" if not feed_urls: return with self._conn() as conn: for url in feed_urls: conn.execute( "INSERT OR IGNORE INTO feed_state(feed_key, last_hash, last_item_count, enabled, updated_at) VALUES(?, '', 0, 1, '')", (url,), ) def get_feed_state_list(self) -> list[dict[str, Any]]: """Return all feeds with enabled/disabled status for the dashboard.""" return self.get_all_feed_states() def set_feed_enabled(self, feed_url: str, enabled: bool) -> bool: """Toggle a feed's enabled state. Returns True if the feed existed and was updated.""" with self._conn() as conn: cur = conn.execute( "UPDATE feed_state SET enabled = ? WHERE feed_key = ?", (1 if enabled else 0, feed_url), ) return cur.rowcount > 0 def get_enabled_feed_urls(self, feed_urls: list[str]) -> list[str]: """From a list of configured feed URLs, return only those that are enabled in feed_state. URLs not yet present in feed_state are seeded as enabled. """ self.feed_ensure_seeded(feed_urls) with self._conn() as conn: placeholders = ",".join("?" for _ in feed_urls) if feed_urls else "" if placeholders: cur = conn.execute( f"SELECT feed_key FROM feed_state WHERE feed_key IN ({placeholders}) AND enabled = 1", feed_urls, ) else: cur = conn.execute("SELECT feed_key FROM feed_state WHERE enabled = 1") return [row[0] for row in cur.fetchall()] # ------------------------------------------------------------------ # # Seen-articles: skip already-processed articles at ingestion # ------------------------------------------------------------------ # def filter_already_seen(self, articles: list[dict]) -> tuple[list[dict], list[dict]]: """Split articles into (new, already_seen) based on seen_articles table. Uses _article_key (derived from URL) as the identity check. Returns two lists: articles never seen before, and articles already processed in a previous cycle. """ keys = [_article_key(a) for a in articles] if not keys: return [], [] with self._conn() as conn: placeholders = ",".join("?" for _ in keys) cur = conn.execute( f"SELECT article_key FROM seen_articles WHERE article_key IN ({placeholders})", keys, ) seen_set = {row[0] for row in cur.fetchall()} new_articles = [] seen_articles = [] for art, key in zip(articles, keys): if key in seen_set: seen_articles.append(art) else: new_articles.append(art) return new_articles, seen_articles def get_seen_article_count(self) -> int: """Total rows in seen_articles (for diagnostics).""" with self._conn() as conn: return conn.execute("SELECT count(*) FROM seen_articles").fetchone()[0] def get_meta(self, key: str) -> str | None: with self._conn() as conn: cur = conn.execute("SELECT value FROM meta WHERE key=?", (key,)) row = cur.fetchone() return row[0] if row else None def set_meta(self, key: str, value: str) -> None: with self._conn() as conn: conn.execute( "INSERT INTO meta(key, value) VALUES(?, ?) " "ON CONFLICT(key) DO UPDATE SET value=excluded.value", (key, value), ) def upsert_entity_metadata( self, normalized_label: str, canonical_label: str | None = None, mid: str | None = None, sources: list[str] | None = None, ) -> None: normalized_label = str(normalized_label or "").strip() if not normalized_label: return canonical_label = str(canonical_label).strip() if canonical_label else None mid = str(mid).strip() if mid else None entity_id = mid if mid else f"local:{normalized_label}" sources = sorted({s for s in (sources or []) if s}) sources_json = json.dumps(sources, ensure_ascii=False) now = datetime.now(timezone.utc).isoformat() with self._conn() as conn: conn.execute( """ INSERT INTO entity_metadata(entity_id, normalized_label, canonical_label, mid, sources_json, updated_at) VALUES(?,?,?,?,?,?) ON CONFLICT(entity_id) DO UPDATE SET canonical_label=excluded.canonical_label, mid=excluded.mid, sources_json=excluded.sources_json, updated_at=excluded.updated_at """, (entity_id, normalized_label, canonical_label, mid, sources_json, now), ) def get_entity_metadata(self, normalized_label: str) -> dict[str, Any] | None: normalized_label = str(normalized_label or "").strip() if not normalized_label: return None with self._conn() as conn: cur = conn.execute( "SELECT entity_id, canonical_label, mid, sources_json, updated_at, last_requested_at " "FROM entity_metadata " "WHERE normalized_label=? " "ORDER BY CASE WHEN mid IS NOT NULL THEN 0 ELSE 1 END, " "COALESCE(last_requested_at, updated_at) DESC, updated_at DESC " "LIMIT 1", (normalized_label,), ) row = cur.fetchone() if not row: return None sources = [] if row[2]: try: sources = json.loads(row[2]) except Exception: sources = [] return { "entity_id": row[0], "normalized_label": normalized_label, "canonical_label": row[1], "mid": row[2], "sources": sources, "updated_at": row[4], "last_requested_at": row[5], } def record_entity_request(self, normalized_label: str, mid: str | None = None) -> None: normalized_label = str(normalized_label or "").strip() if not normalized_label: return mid = str(mid).strip() if mid else None entity_id = mid if mid else f"local:{normalized_label}" now = datetime.now(timezone.utc).isoformat() with self._conn() as conn: conn.execute( """ INSERT INTO entity_metadata(entity_id, normalized_label, canonical_label, mid, sources_json, updated_at, last_requested_at) VALUES(?,?,?,?,?,?,?) ON CONFLICT(entity_id) DO UPDATE SET last_requested_at=excluded.last_requested_at """, (entity_id, normalized_label, None, mid, json.dumps([], ensure_ascii=False), now, now), ) def get_failed_enrichment_clusters(self, max_retries: int = 3) -> list[dict]: """Return clusters whose last enrichment failed and haven't exceeded max_retries. These are candidates for re-enrichment on the next polling cycle. """ with self._conn() as conn: cur = conn.execute( "SELECT payload FROM clusters " "WHERE json_extract(payload, '$.enrichment_failed_at') IS NOT NULL " "AND (json_extract(payload, '$.enrichment_retry_count') IS NULL " " OR json_extract(payload, '$.enrichment_retry_count') < ?) " "ORDER BY updated_at DESC LIMIT 500", (max_retries,), ) rows = cur.fetchall() return [json.loads(r[0]) for r in rows] def prune_clusters(self, retention_days: float) -> int: retention_days = float(retention_days) if retention_days <= 0: return 0 cutoff = datetime.now(timezone.utc) - timedelta(days=retention_days) cutoff_iso = cutoff.isoformat() pruned_at = datetime.now(timezone.utc).isoformat() with self._conn() as conn: # Use payload_ts (event time from payload.timestamp) not updated_at # (row write time). updated_at is refreshed on every upsert, which # would keep re-ingested old articles alive forever. # Collect cluster_ids being pruned so we can clean seen_articles. pruned_ids = [ row[0] for row in conn.execute( "SELECT cluster_id FROM clusters WHERE payload_ts < ?", (cutoff_iso,) ).fetchall() ] cur = conn.execute("DELETE FROM clusters WHERE payload_ts < ?", (cutoff_iso,)) deleted = int(cur.rowcount or 0) # Clean up seen_articles rows pointing to pruned clusters if pruned_ids: placeholders = ",".join("?" for _ in pruned_ids) conn.execute( f"DELETE FROM seen_articles WHERE cluster_id IN ({placeholders})", pruned_ids, ) conn.execute( "INSERT INTO meta(key, value) VALUES(?, ?) " "ON CONFLICT(key) DO UPDATE SET value=excluded.value", (META_LAST_PRUNE_AT, pruned_at), ) return deleted def prune_if_due(self, pruning_enabled: bool, retention_days: float, interval_hours: float = 24.0) -> dict[str, Any]: retention_days = float(retention_days) interval_hours = float(interval_hours) if (not pruning_enabled) or retention_days <= 0: return { "enabled": bool(pruning_enabled), "deleted": 0, "due": False, "retention_days": retention_days, "interval_hours": interval_hours, "last_prune_at": self.get_meta(META_LAST_PRUNE_AT), } last_prune_at = self.get_meta(META_LAST_PRUNE_AT) now = datetime.now(timezone.utc) due = True if last_prune_at: try: last_dt = datetime.fromisoformat(last_prune_at) due = now - last_dt >= timedelta(hours=max(1.0, interval_hours)) except Exception: due = True if not due: return { "enabled": True, "deleted": 0, "due": False, "retention_days": retention_days, "interval_hours": interval_hours, "last_prune_at": last_prune_at, } deleted = self.prune_clusters(retention_days) last_prune_at = self.get_meta(META_LAST_PRUNE_AT) return { "enabled": True, "deleted": deleted, "due": True, "retention_days": retention_days, "interval_hours": interval_hours, "last_prune_at": last_prune_at, } def get_prune_state(self, pruning_enabled: bool, retention_days: float, interval_hours: float = 24.0) -> dict[str, Any]: return { "enabled": bool(pruning_enabled), "retention_days": float(retention_days), "interval_hours": float(interval_hours), "last_prune_at": self.get_meta(META_LAST_PRUNE_AT), } # ------------------------------------------------------------------ # Dashboard query helpers # ------------------------------------------------------------------ def get_dashboard_stats(self) -> dict[str, Any]: """Aggregate status numbers for the health panel.""" with self._conn() as conn: total_clusters = conn.execute("SELECT COUNT(*) FROM clusters").fetchone()[0] total_entities = conn.execute("SELECT COUNT(*) FROM entity_metadata").fetchone()[0] cluster_entities = conn.execute( "SELECT COUNT(DISTINCT e.value) " "FROM clusters, json_each(clusters.payload, '$.entities') AS e" ).fetchone()[0] topic_counts = dict(conn.execute( "SELECT topic, COUNT(*) FROM clusters GROUP BY topic" ).fetchall()) last_refresh = self.get_meta("last_refresh_at") feeds = {} for row in conn.execute( "SELECT feed_key, last_hash, last_item_count, enabled, updated_at FROM feed_state ORDER BY updated_at DESC" ): feeds[row[0]] = { "last_hash": row[1], "last_item_count": row[2], "enabled": bool(row[3]), "updated_at": row[4], } # Freshness: did a refresh happen recently? (within 2x the configured interval) fresh = False if last_refresh: try: dt = datetime.fromisoformat(last_refresh.replace("Z", "+00:00")) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) age_hours = (datetime.now(timezone.utc) - dt).total_seconds() / 3600 fresh = age_hours < max(1.0, NEWS_REFRESH_INTERVAL_SECONDS / 3600) * 2 except Exception: pass last_prune = self.get_meta(META_LAST_PRUNE_AT) prune_state = self.get_prune_state( pruning_enabled=NEWS_PRUNING_ENABLED, retention_days=NEWS_RETENTION_DAYS, interval_hours=NEWS_PRUNE_INTERVAL_HOURS, ) return { "total_clusters": total_clusters, "total_entities": total_entities, "cluster_entities": cluster_entities, "clusters_by_topic": topic_counts, "last_refresh_at": last_refresh, "last_prune_at": last_prune, "data_fresh": fresh, "feeds": feeds, "feed_count": len(feeds), "seen_article_count": self.get_seen_article_count(), "prune_state": prune_state, } def get_cluster_detail(self, cluster_id: str) -> dict[str, Any] | None: """Dashboard-optimized cluster detail fetch.""" with self._conn() as conn: cur = conn.execute( "SELECT payload FROM clusters WHERE cluster_id = ?", (cluster_id,) ) row = cur.fetchone() if not row: return None c = json.loads(row[0]) summary = None if c.get("summary_payload"): try: summary = json.loads(c["summary_payload"]) except Exception: pass return { "cluster_id": c.get("cluster_id"), "headline": c.get("headline", ""), "summary": c.get("summary", ""), "topic": c.get("topic", ""), "sentiment": c.get("sentiment", "neutral"), "sentimentScore": c.get("sentimentScore"), "importance": c.get("importance", 0), "entities": c.get("entities", []), "entityResolutions": c.get("entityResolutions", []), "keywords": c.get("keywords", []), "sources": c.get("sources", []), "timestamp": c.get("timestamp", ""), "first_seen": c.get("first_seen", ""), "last_updated": c.get("last_updated", ""), "article_count": len(c.get("articles", [])), "articles": c.get("articles", []), "summary_text": summary.get("mergedSummary", "") if summary else "", "key_facts": summary.get("keyFacts", []) if summary else [], } # ── Paginated Clusters ──────────────────────────────────────────── def get_clusters_page( self, topic: str | None = None, hours: float = 24, limit: int = 20, offset: int = 0, ) -> dict[str, Any]: """Paginated cluster listing filtered by SQL payload_ts index. Returns {"clusters": [...], "total": int}. """ cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat() query = "SELECT payload FROM clusters WHERE payload_ts >= ?" params: list = [cutoff] if topic and topic != "all": query += " AND topic = ?" params.append(topic) total = self._conn().execute( f"SELECT COUNT(*) FROM ({query})", params ).fetchone()[0] query += " ORDER BY payload_ts DESC LIMIT ? OFFSET ?" params.extend([limit, offset]) with self._conn() as conn: rows = conn.execute(query, params).fetchall() return { "clusters": [ { "cluster_id": c.get("cluster_id", ""), "headline": c.get("headline", ""), "topic": c.get("topic", ""), "sentiment": c.get("sentiment", "neutral"), "sentimentScore": c.get("sentimentScore"), "importance": c.get("importance", 0), "entities": c.get("entities", []), "sources": c.get("sources", []), "timestamp": c.get("timestamp", ""), "keywords": c.get("keywords", []), "article_count": len(c.get("articles", [])), } for c in [json.loads(r[0]) for r in rows] ], "total": total, } # ── Sentiment Series ────────────────────────────────────────────── def get_sentiment_series( self, topic: str | None = None, hours: float = 24, bucket_hours: float = 1, ) -> list[dict[str, Any]]: """Sentiment score averaged per time bucket. Filters by payload_ts SQL index.""" cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat() query = "SELECT payload FROM clusters WHERE payload_ts >= ?" params: list = [cutoff] if topic and topic != "all": query += " AND topic = ?" params.append(topic) query += " ORDER BY payload_ts ASC" with self._conn() as conn: rows = conn.execute(query, params).fetchall() buckets: dict[datetime, list[float]] = {} for (payload_text,) in rows: c = json.loads(payload_text) ts_str = c.get("timestamp") score = c.get("sentimentScore") if not ts_str or score is None: continue dt = datetime.fromisoformat(str(ts_str).strip()) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) dt = dt.astimezone(timezone.utc) bucket_key = dt.replace(minute=0, second=0, microsecond=0) if bucket_hours > 1: bucket_key = bucket_key.replace( hour=(bucket_key.hour // int(bucket_hours)) * int(bucket_hours) ) buckets.setdefault(bucket_key, []).append(float(score)) return [ { "time": bucket_key.isoformat(), "avg_sentiment": round(sum(scores) / len(scores), 3), "count": len(scores), "min": round(min(scores), 3), "max": round(max(scores), 3), } for bucket_key, scores in sorted(buckets.items()) ] # ── Entity / Keyword Frequencies ────────────────────────────────── def get_entity_frequencies( self, hours: float = 24, limit: int = 30, ) -> list[dict[str, Any]]: """Top entities by mention count, using SQL junction table + payload_ts index.""" cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat() with self._conn() as conn: rows = conn.execute( """\ SELECT ce.entity, COUNT(*) as cnt FROM cluster_entities ce JOIN clusters c ON c.cluster_id = ce.cluster_id WHERE c.payload_ts >= ? GROUP BY ce.entity ORDER BY cnt DESC LIMIT ? """, (cutoff, limit), ).fetchall() result: list[dict[str, Any]] = [] for label, count in rows: meta = self.get_entity_metadata(label) result.append({ "label": label, "count": count, "canonical_label": meta["canonical_label"] if meta else label, "mid": meta["mid"] if meta else None, }) return result def get_keyword_frequencies( self, hours: float = 24, limit: int = 30, ) -> list[dict[str, Any]]: """Top keywords by mention count, using SQL junction table + payload_ts index. Excludes DEFAULT_TOPICS labels (crypto, macro, regulation, ai, other). """ cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat() _topic_labels = {"crypto", "macro", "regulation", "ai", "other"} with self._conn() as conn: rows = conn.execute( """\ SELECT ck.keyword, COUNT(*) as cnt FROM cluster_keywords ck JOIN clusters c ON c.cluster_id = ck.cluster_id WHERE c.payload_ts >= ? GROUP BY ck.keyword ORDER BY cnt DESC LIMIT ? """, (cutoff, limit), ).fetchall() return [ {"label": label, "count": count} for label, count in rows if label.lower() not in _topic_labels ] # ── Junction-Table Entity / Keyword Cluster Search ──────────────── def get_clusters_by_entity( self, entity: str, hours: float = 168, limit: int = 50, offset: int = 0, ) -> dict[str, Any]: """Return clusters matching an entity, SQL-level filter via junction table. Returns {"entity": ..., "clusters": [...], "total": int, "hours": float}. """ cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat() entity_norm = entity.strip().lower() with self._conn() as conn: total = conn.execute( "SELECT COUNT(DISTINCT c.cluster_id) FROM clusters c " "JOIN cluster_entities ce ON c.cluster_id = ce.cluster_id " "WHERE c.payload_ts >= ? AND ce.entity = ?", (cutoff, entity_norm), ).fetchone()[0] rows = conn.execute( "SELECT DISTINCT c.payload FROM clusters c " "JOIN cluster_entities ce ON c.cluster_id = ce.cluster_id " "WHERE c.payload_ts >= ? AND ce.entity = ? " "ORDER BY c.payload_ts DESC LIMIT ? OFFSET ?", (cutoff, entity_norm, limit, offset), ).fetchall() return { "entity": entity_norm, "clusters": [json.loads(r[0]) for r in rows], "total": total, "hours": hours, } def get_clusters_by_keyword( self, keyword: str, hours: float = 168, limit: int = 50, offset: int = 0, ) -> dict[str, Any]: """Return clusters matching a keyword, SQL-level filter via junction table.""" cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat() kw_norm = keyword.strip().lower() with self._conn() as conn: total = conn.execute( "SELECT COUNT(DISTINCT c.cluster_id) FROM clusters c " "JOIN cluster_keywords ck ON c.cluster_id = ck.cluster_id " "WHERE c.payload_ts >= ? AND ck.keyword = ?", (cutoff, kw_norm), ).fetchone()[0] rows = conn.execute( "SELECT DISTINCT c.payload FROM clusters c " "JOIN cluster_keywords ck ON c.cluster_id = ck.cluster_id " "WHERE c.payload_ts >= ? AND ck.keyword = ? " "ORDER BY c.payload_ts DESC LIMIT ? OFFSET ?", (cutoff, kw_norm, limit, offset), ).fetchall() return { "keyword": kw_norm, "clusters": [json.loads(r[0]) for r in rows], "total": total, "hours": hours, } def get_clusters_by_entity_or_keyword( self, query_terms: set[str], hours: float, limit: int, ) -> list[dict]: """Search clusters by matching ANY query term against entities OR keywords. Uses SQL-level junction-table filtering — no row-limit blind spot. Returns clusters sorted by recency. """ terms = [q.strip().lower() for q in query_terms if q.strip()] if not terms: return [] cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat() placeholders = ",".join("?" for _ in terms) with self._conn() as conn: rows = conn.execute( f"SELECT DISTINCT c.payload FROM clusters c " f"LEFT JOIN cluster_entities ce ON c.cluster_id = ce.cluster_id " f"LEFT JOIN cluster_keywords ck ON c.cluster_id = ck.cluster_id " f"WHERE c.payload_ts >= ? " f" AND (ce.entity IN ({placeholders}) OR ck.keyword IN ({placeholders})) " f"ORDER BY c.payload_ts DESC LIMIT ?", (cutoff, *terms, *terms, limit), ).fetchall() return [json.loads(r[0]) for r in rows]