Pārlūkot izejas kodu

feat: junction tables + payload_ts for SQL-level entity/keyword/time search

Root cause of "entity shows 34x but click shows 2 clusters":
get_clusters_page returned only 100 most recent clusters regardless of
time window. Entity mentions beyond top 100 were invisible.

Solution: proper relational indexing instead of JSON parsing at query time.

Schema:
- payload_ts VIRTUAL GENERATED ALWAYS AS (json_extract(payload,'$.timestamp'))
  — indexed timestamp column, auto-maintained by SQLite, zero write-path cost
- cluster_entities(cluster_id, entity) junction table with index on entity
- cluster_keywords(cluster_id, keyword) junction table with index on keyword

Write path (upsert_clusters):
- DELETE + INSERT OR IGNORE into junction tables within existing transaction
- Normalized (lowercased) entities and keywords
- Handles re-enrichment correctly (DELETE before INSERT)

Read paths — all SQL-level, no JSON parsing:
- get_latest_clusters / get_latest_clusters_all_topics: WHERE payload_ts >= ?
- get_clusters_page: WHERE payload_ts >= ? ORDER BY payload_ts DESC LIMIT ? OFFSET ?
- get_entity_frequencies: JOIN cluster_entities GROUP BY entity
- get_keyword_frequencies: JOIN cluster_keywords GROUP BY keyword
- get_clusters_by_entity: JOIN cluster_entities WHERE entity = ?
- get_clusters_by_keyword: JOIN cluster_keywords WHERE keyword = ?
- get_sentiment_series: WHERE payload_ts >= ?

Removed dead code:
- 3 duplicate methods from sqlite_store.py (get_clusters_page, get_sentiment_series,
  get_entity_frequencies) — were not called by any code path
- _read_ts() no longer used in read paths (kept as exported helper for dashboard_store
  compatibility only)

REST API:
- /api/v1/clusters — uses payload_ts SQL filter, total matches actual results
- /api/v1/entities — SQL GROUP BY via junction table
- /api/v1/keywords — SQL GROUP BY via junction table
- NEW /api/v1/clusters/by-entity?entity=X&hours=Y — SQL entity search
- NEW /api/v1/clusters/by-keyword?keyword=X&hours=Y — SQL keyword search

Dashboard JS:
- showEntityDetail — calls /api/v1/clusters/by-entity instead of fetching all clusters
- showKeywordDetail — calls /api/v1/clusters/by-keyword instead of fetching all clusters

Backfill script (scripts/backfill_junction_tables.py):
- Same pattern as normalize_cluster_timestamps.py
- Populates junction tables from existing payload JSON
- Idempotent (INSERT OR IGNORE + transactions)
- Run once on live server after deploy

sanitize_cluster_payload: ensure timestamp is always present
- Falls back to first_seen → last_updated → now
- Prevents NULL payload_ts which would exclude clusters from time-range queries
Lukas Goldschmidt 1 nedēļu atpakaļ
vecāks
revīzija
8d0cf9ec4c

+ 82 - 0
PROJECT.md

@@ -191,3 +191,85 @@ After normalization, all read paths still contained defensive RFC 2822 / `parsed
 - Write paths: `sanitize_cluster_payload()` in `sqlite_store.py` is the single normalization point. All writes go through `upsert_clusters()` which calls it.
 - This repo is a **dev machine** copy. The live server is on thinkcenter-2 (192.168.0.200). Never query the dev DB to verify live data — the dev DB is stale/empty.
 
+## Junction Tables + Indexed Timestamp (May 2026)
+
+### Problem
+All read paths deserialize every JSON payload to filter by entity/keyword/time. With 6000+ clusters, `get_clusters_page` returns only the 100 newest — clicking an entity that appears 34x shows only 2 clusters because the other 32 are outside the LIMIT. `get_entity_frequencies` counts correctly but the detail view can't find them. Every query does a full table scan with JSON parsing.
+
+### Solution: junction tables + generated timestamp column
+
+**Schema (migrated in `_init_db`, incremental-safe):**
+
+```sql
+-- Indexed event timestamp (SQLite generated column — zero write-path cost)
+ALTER TABLE clusters ADD COLUMN payload_ts
+    GENERATED ALWAYS AS (json_extract(payload, '$.timestamp')) STORED;
+CREATE INDEX IF NOT EXISTS idx_clusters_payload_ts ON clusters(payload_ts);
+
+-- Entity junction table for SQL-level entity search
+CREATE TABLE IF NOT EXISTS cluster_entities (
+    cluster_id TEXT NOT NULL REFERENCES clusters(cluster_id) ON DELETE CASCADE,
+    entity     TEXT NOT NULL,
+    PRIMARY KEY (cluster_id, entity)
+);
+CREATE INDEX IF NOT EXISTS idx_cluster_entities_entity ON cluster_entities(entity);
+
+-- Keyword junction table for SQL-level keyword search
+CREATE TABLE IF NOT EXISTS cluster_keywords (
+    cluster_id TEXT NOT NULL REFERENCES clusters(cluster_id) ON DELETE CASCADE,
+    keyword    TEXT NOT NULL,
+    PRIMARY KEY (cluster_id, keyword)
+);
+CREATE INDEX IF NOT EXISTS idx_cluster_keywords_keyword ON cluster_keywords(keyword);
+```
+
+**Write path (`upsert_clusters`):** Within the existing transaction, after sanitizing the payload and before INSERT/UPDATE:
+1. `DELETE FROM cluster_entities WHERE cluster_id = ?`  (handles re-enrichment)
+2. `DELETE FROM cluster_keywords WHERE cluster_id = ?`
+3. `INSERT OR IGNORE INTO cluster_entities VALUES (?, ?)` for each entity
+4. `INSERT OR IGNORE INTO cluster_keywords VALUES (?, ?)` for each keyword
+5. `payload_ts` is auto-maintained by SQLite's generated column — no code needed
+
+**Read paths — all SQL-level, no JSON parsing at query time:**
+
+- `get_clusters_page`: `WHERE payload_ts >= ? ORDER BY payload_ts DESC LIMIT ? OFFSET ?`
+- `get_entity_frequencies`: `JOIN cluster_entities ... WHERE payload_ts >= ? GROUP BY entity ORDER BY cnt DESC`
+- `get_keyword_frequencies`: `JOIN cluster_keywords ... WHERE payload_ts >= ? GROUP BY keyword ORDER BY cnt DESC`
+- New `get_clusters_by_entity`: `JOIN cluster_entities WHERE payload_ts >= ? AND entity = ?`
+- New `get_clusters_by_keyword`: `JOIN cluster_keywords WHERE payload_ts >= ? AND keyword = ?`
+
+**Backfill script (`scripts/backfill_junction_tables.py`):**
+- Same pattern as `normalize_cluster_timestamps.py`
+- Accepts `--db` arg, defaults to config DB_PATH
+- Reads all cluster payloads, populates `cluster_entities` and `cluster_keywords`
+- `payload_ts` is auto-populated by SQLite's generated column
+- Idempotent (`INSERT OR IGNORE` + transaction)
+- Reports entity/keyword counts after completion
+- Run once on live server: `docker exec -it <container> python3 scripts/backfill_junction_tables.py`
+
+**REST API changes:**
+- `GET /api/v1/clusters` — now uses SQL `payload_ts` filter, consistent total
+- `GET /api/v1/entities` — SQL `COUNT(*) ... GROUP BY` via junction table
+- `GET /api/v1/keywords` — SQL `COUNT(*) ... GROUP BY` via junction table
+- **New `GET /api/v1/clusters/by-entity?entity=X&hours=Y&limit=Z`** — SQL entity search
+- **New `GET /api/v1/clusters/by-keyword?keyword=X&hours=Y&limit=Z`** — SQL keyword search
+
+**Dashboard JS changes:**
+- `showEntityDetail(label)` — calls `/api/v1/clusters/by-entity` instead of fetching all clusters
+- `showKeywordDetail(label)` — calls `/api/v1/clusters/by-keyword` instead of fetching all clusters
+
+**Files changed:**
+| File | Change |
+|---|---|
+| `news_mcp/storage/sqlite_store.py` | Schema migration (generated column + junction tables), write-path junction population, new SQL-level read methods |
+| `news_mcp/mcp_server_fastmcp.py` | New REST endpoints for entity/keyword cluster search |
+| `news_mcp/dashboard/dashboard_store.py` | `get_entity_frequencies`, `get_keyword_frequencies` use SQL junction table counts |
+| `dashboard/dashboard.js` | `showEntityDetail`, `showKeywordDetail` call new endpoints |
+| `scripts/backfill_junction_tables.py` | New backfill script (same pattern as normalize_cluster_timestamps.py) |
+
+**Migration safety:**
+- All DDL uses `IF NOT EXISTS` / `ADD COLUMN IF NOT EXISTS` — safe to re-run
+- Backfill script is idempotent (`INSERT OR IGNORE` in transactions)
+- Generated column requires no write-path code changes
+- Old query methods can coexist during transition (removed after verification)
+

+ 8 - 23
dashboard/dashboard.js

@@ -367,22 +367,14 @@ function renderEntityChart() {
 async function showEntityDetail(label) {
   if (!label) return;
   var el = $('entity-detail'); if (!el) return;
-  el.innerHTML = '<div class="loading">Fetching clusters mentioning ' + esc(label) + '...</div>';
+  el.innerHTML = '<div class=\"loading\">Fetching clusters mentioning ' + esc(label) + '...</div>';
   var hours = ($('entity-hours') || {}).value || 24;
   try {
-    var res = await fetch(API + '/clusters?topic=all&hours=' + hours + '&limit=100');
+    var res = await fetch(API + '/clusters/by-entity?entity=' + encodeURIComponent(label) + '&hours=' + hours + '&limit=200');
     var d = await res.json();
-    var matched = (d.clusters || []).filter(function(c) {
-      return (c.entities||[]).some(function(e) { return (e||'').toLowerCase() === label.toLowerCase(); });
-    });
-    // Sort by timestamp descending — newest first
-    matched.sort(function(a,b) {
-      var ta = new Date(a.timestamp || 0).getTime();
-      var tb = new Date(b.timestamp || 0).getTime();
-      return tb - ta;
-    });
-    if (!matched.length) { el.innerHTML = '<p class="muted">No clusters mention "' + esc(label) + '" in the current window.</p>'; return; }
-    var html = '<h4 style="font-size:.85rem;margin-bottom:.5rem">Clusters mentioning ' + esc(label) + ' (' + matched.length + ')</h4>';
+    var matched = d.clusters || [];
+    if (!matched.length) { el.innerHTML = '<p class=\"muted\">No clusters mention \"' + esc(label) + '\" in the current window.</p>'; return; }
+    var html = '<h4 style=\"font-size:.85rem;margin-bottom:.5rem">Clusters mentioning ' + esc(label) + ' (' + (d.total || matched.length) + ')</h4>';
     for (var i = 0; i < matched.length; i++) {
       var c = matched[i];
       html += '<div style="margin-bottom:.6rem;padding:.6rem;background:var(--surface2);border-radius:6px;font-size:.82rem;cursor:pointer" onclick="openClusterModal(\''+esc(c.cluster_id)+'\')">'+
@@ -441,18 +433,11 @@ async function showKeywordDetail(label) {
   el.innerHTML = '<div class="loading">Fetching clusters with keyword ' + esc(label) + '…</div>';
   var hours = ($('keyword-hours') || {}).value || 24;
   try {
-    var res = await fetch(API + '/clusters?topic=all&hours=' + hours + '&limit=200');
+    var res = await fetch(API + '/clusters/by-keyword?keyword=' + encodeURIComponent(label) + '&hours=' + hours + '&limit=200');
     var d = await res.json();
-    var matched = (d.clusters || []).filter(function(c) {
-      return (c.keywords||[]).some(function(k) { return (k||'').toLowerCase() === label.toLowerCase(); });
-    });
-    matched.sort(function(a,b) {
-      var ta = new Date(a.timestamp || 0).getTime();
-      var tb = new Date(b.timestamp || 0).getTime();
-      return tb - ta;
-    });
+    var matched = d.clusters || [];
     if (!matched.length) { el.innerHTML = '<p class="muted">No clusters have keyword "' + esc(label) + '" in the current window.</p>'; return; }
-    var html = '<h4 style="font-size:.85rem;margin-bottom:.5rem">Clusters with keyword ' + esc(label) + ' (' + matched.length + ')</h4>';
+    var html = '<h4 style="font-size:.85rem;margin-bottom:.5rem">Clusters with keyword ' + esc(label) + ' (' + (d.total || matched.length) + ')</h4>';
     for (var i = 0; i < matched.length; i++) {
       var c = matched[i];
       html += '<div style="margin-bottom:.6rem;padding:.6rem;background:var(--surface2);border-radius:6px;font-size:.82rem;cursor:pointer" onclick="openClusterModal(\''+esc(c.cluster_id)+'\')">'+

+ 130 - 57
news_mcp/dashboard/dashboard_store.py

@@ -11,7 +11,7 @@ from news_mcp.config import (
     NEWS_RETENTION_DAYS,
     DEFAULT_TOPICS,
 )
-from news_mcp.storage.sqlite_store import SQLiteClusterStore, _read_ts
+from news_mcp.storage.sqlite_store import SQLiteClusterStore
 
 
 class DashboardStore:
@@ -85,28 +85,27 @@ class DashboardStore:
         limit: int = 20,
         offset: int = 0,
     ) -> dict[str, Any]:
-        """Paginated cluster listing filtered by payload.timestamp (event time).
+        """Paginated cluster listing filtered by SQL payload_ts index.
 
-        payload.timestamp is guaranteed ISO 8601 UTC — uses _read_ts from
-        sqlite_store. Do NOT filter by updated_at (row mod time).
         Returns {"clusters": [...], "total": int}.
         """
-        cutoff_ts = (datetime.now(timezone.utc) - timedelta(hours=hours)).timestamp()
+        cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat()
 
-        query = "SELECT payload FROM clusters"
-        params: list = []
+        query = "SELECT payload FROM clusters WHERE payload_ts >= ?"
+        params: list = [cutoff]
         if topic and topic != "all":
-            query += " WHERE topic = ?"
+            query += " AND topic = ?"
             params.append(topic)
+        # Get total count before pagination
+        total = self._store._conn().execute(
+            f"SELECT COUNT(*) FROM ({query})", params
+        ).fetchone()[0]
+        query += " ORDER BY payload_ts DESC LIMIT ? OFFSET ?"
+        params.extend([limit, offset])
 
         with self._store._conn() as conn:
             rows = conn.execute(query, params).fetchall()
 
-        filtered = [json.loads(r[0]) for r in rows]
-        filtered = [c for c in filtered if (_read_ts(c.get("timestamp")) or 0.0) >= cutoff_ts]
-        filtered.sort(key=lambda c: _read_ts(c.get("timestamp")) or 0.0, reverse=True)
-        page = filtered[offset:offset + limit]
-
         return {
             "clusters": [
                 {
@@ -122,9 +121,9 @@ class DashboardStore:
                     "keywords": c.get("keywords", []),
                     "article_count": len(c.get("articles", [])),
                 }
-                for c in page
+                for c in [json.loads(r[0]) for r in rows]
             ],
-            "total": len(filtered),
+            "total": total,
         }
 
     def get_cluster_detail(self, cluster_id: str) -> dict[str, Any] | None:
@@ -173,14 +172,15 @@ class DashboardStore:
         ) -> list[dict[str, Any]]:
         """Sentiment score averaged per time bucket.
 
-        Filters by payload.timestamp (event time, ISO 8601 UTC guaranteed).
+        Filters by payload_ts SQL index.
         """
-        cutoff_ts = (datetime.now(timezone.utc) - timedelta(hours=hours)).timestamp()
-        query = "SELECT payload FROM clusters"
-        params: list = []
+        cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat()
+        query = "SELECT payload FROM clusters WHERE payload_ts >= ?"
+        params: list = [cutoff]
         if topic and topic != "all":
-            query += " WHERE topic = ?"
+            query += " AND topic = ?"
             params.append(topic)
+        query += " ORDER BY payload_ts ASC"
 
         with self._store._conn() as conn:
             rows = conn.execute(query, params).fetchall()
@@ -188,11 +188,14 @@ class DashboardStore:
         buckets: dict[datetime, list[float]] = {}
         for (payload_text,) in rows:
             c = json.loads(payload_text)
-            ts = _read_ts(c.get("timestamp"))
+            ts_str = c.get("timestamp")
             score = c.get("sentimentScore")
-            if ts is None or score is None or ts < cutoff_ts:
+            if not ts_str or score is None:
                 continue
-            dt = datetime.fromtimestamp(ts, tz=timezone.utc)
+            dt = datetime.fromisoformat(str(ts_str).strip())
+            if dt.tzinfo is None:
+                dt = dt.replace(tzinfo=timezone.utc)
+            dt = dt.astimezone(timezone.utc)
             bucket_key = dt.replace(minute=0, second=0, microsecond=0)
             if bucket_hours > 1:
                 bucket_key = bucket_key.replace(
@@ -218,22 +221,25 @@ class DashboardStore:
         hours: float = 24,
         limit: int = 30,
     ) -> list[dict[str, Any]]:
-        """Top entities by mention count filtered by payload.timestamp (ISO 8601 UTC guaranteed)."""
-        cutoff_ts = (datetime.now(timezone.utc) - timedelta(hours=hours)).timestamp()
+        """Top entities by mention count, using SQL junction table + payload_ts index."""
+        cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat()
 
         with self._store._conn() as conn:
-            rows = conn.execute("SELECT payload FROM clusters").fetchall()
-
-        counter: dict[str, int] = {}
-        for (payload_text,) in rows:
-            c = json.loads(payload_text)
-            if (_read_ts(c.get("timestamp")) or 0.0) < cutoff_ts:
-                continue
-            for ent in c.get("entities", []):
-                counter[ent] = counter.get(ent, 0) + 1
+            rows = conn.execute(
+                """
+                SELECT ce.entity, COUNT(*) as cnt
+                FROM cluster_entities ce
+                JOIN clusters c ON c.cluster_id = ce.cluster_id
+                WHERE c.payload_ts >= ?
+                GROUP BY ce.entity
+                ORDER BY cnt DESC
+                LIMIT ?
+                """,
+                (cutoff, limit),
+            ).fetchall()
 
         result: list[dict[str, Any]] = []
-        for label, count in sorted(counter.items(), key=lambda x: -x[1])[:limit]:
+        for label, count in rows:
             meta = self._store.get_entity_metadata(label)
             result.append({
                 "label": label,
@@ -250,35 +256,102 @@ class DashboardStore:
         hours: float = 24,
         limit: int = 30,
     ) -> list[dict[str, Any]]:
-        """Top keywords by occurrence count filtered by payload.timestamp (ISO 8601 UTC guaranteed).
+        """Top keywords by mention count, using SQL junction table + payload_ts index.
 
-        Excludes keywords that are already entities in the same cluster,
-        and excludes DEFAULT_TOPICS labels (crypto, macro, regulation, ai, other).
+        Excludes DEFAULT_TOPICS labels (crypto, macro, regulation, ai, other).
         """
-        cutoff_ts = (datetime.now(timezone.utc) - timedelta(hours=hours)).timestamp()
+        cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat()
         _topic_labels = {t.lower() for t in DEFAULT_TOPICS}
 
         with self._store._conn() as conn:
-            rows = conn.execute("SELECT payload FROM clusters").fetchall()
-
-        counter: dict[str, int] = {}
-        for (payload_text,) in rows:
-            c = json.loads(payload_text)
-            if (_read_ts(c.get("timestamp")) or 0.0) < cutoff_ts:
-                continue
-            ents_in_cluster = {str(e).strip().lower() for e in (c.get("entities", []) or []) if str(e).strip()}
-            for kw in c.get("keywords", []):
-                kw_str = str(kw).strip()
-                if not kw_str:
-                    continue
-                if kw_str.lower() in _topic_labels:
-                    continue
-                if kw_str.lower() in ents_in_cluster:
-                    continue
-                counter[kw_str] = counter.get(kw_str, 0) + 1
+            rows = conn.execute(
+                """
+                SELECT ck.keyword, COUNT(*) as cnt
+                FROM cluster_keywords ck
+                JOIN clusters c ON c.cluster_id = ck.cluster_id
+                WHERE c.payload_ts >= ?
+                GROUP BY ck.keyword
+                ORDER BY cnt DESC
+                LIMIT ?
+                """,
+                (cutoff, limit),
+            ).fetchall()
 
         return [
             {"label": label, "count": count}
-            for label, count in sorted(counter.items(), key=lambda x: -x[1])[:limit]
+            for label, count in rows
+            if label.lower() not in _topic_labels
         ]
 
+    # ── Entity/Keyword Cluster Search ────────────────────────────────
+
+    def get_clusters_by_entity(
+        self,
+        entity: str,
+        hours: float = 168,
+        limit: int = 50,
+        offset: int = 0,
+    ) -> dict[str, Any]:
+        """Return clusters matching an entity, SQL-level filter via junction table."""
+        cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat()
+        entity_norm = entity.strip().lower()
+
+        with self._store._conn() as conn:
+            # Total count
+            total = conn.execute(
+                "SELECT COUNT(DISTINCT c.cluster_id) FROM clusters c "
+                "JOIN cluster_entities ce ON c.cluster_id = ce.cluster_id "
+                "WHERE c.payload_ts >= ? AND ce.entity = ?",
+                (cutoff, entity_norm),
+            ).fetchone()[0]
+
+            # Paginated results
+            rows = conn.execute(
+                "SELECT DISTINCT c.payload FROM clusters c "
+                "JOIN cluster_entities ce ON c.cluster_id = ce.cluster_id "
+                "WHERE c.payload_ts >= ? AND ce.entity = ? "
+                "ORDER BY c.payload_ts DESC LIMIT ? OFFSET ?",
+                (cutoff, entity_norm, limit, offset),
+            ).fetchall()
+
+        return {
+            "entity": entity_norm,
+            "clusters": [json.loads(r[0]) for r in rows],
+            "total": total,
+            "hours": hours,
+        }
+
+    def get_clusters_by_keyword(
+        self,
+        keyword: str,
+        hours: float = 168,
+        limit: int = 50,
+        offset: int = 0,
+    ) -> dict[str, Any]:
+        """Return clusters matching a keyword, SQL-level filter via junction table."""
+        cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat()
+        kw_norm = keyword.strip().lower()
+
+        with self._store._conn() as conn:
+            total = conn.execute(
+                "SELECT COUNT(DISTINCT c.cluster_id) FROM clusters c "
+                "JOIN cluster_keywords ck ON c.cluster_id = ck.cluster_id "
+                "WHERE c.payload_ts >= ? AND ck.keyword = ?",
+                (cutoff, kw_norm),
+            ).fetchone()[0]
+
+            rows = conn.execute(
+                "SELECT DISTINCT c.payload FROM clusters c "
+                "JOIN cluster_keywords ck ON c.cluster_id = ck.cluster_id "
+                "WHERE c.payload_ts >= ? AND ck.keyword = ? "
+                "ORDER BY c.payload_ts DESC LIMIT ? OFFSET ?",
+                (cutoff, kw_norm, limit, offset),
+            ).fetchall()
+
+        return {
+            "keyword": kw_norm,
+            "clusters": [json.loads(r[0]) for r in rows],
+            "total": total,
+            "hours": hours,
+        }
+

+ 38 - 0
news_mcp/mcp_server_fastmcp.py

@@ -1157,6 +1157,44 @@ def api_keywords(
     except Exception as e:
         return _api_err(e, f"keywords(hours={hours})")
 
+@app.get("/api/v1/clusters/by-entity")
+def api_clusters_by_entity(
+    entity: str,
+    hours: int = 168,
+    limit: int = 50,
+    offset: int = 0,
+):
+    """Return clusters matching an entity, filtered by event time via SQL junction table."""
+    try:
+        store = DashboardStore(_shared_store)
+        return store.get_clusters_by_entity(
+            entity=entity.strip().lower(),
+            hours=hours,
+            limit=limit,
+            offset=offset,
+        )
+    except Exception as e:
+        return _api_err(e, f"by-entity(entity={entity},hours={hours})")
+
+@app.get("/api/v1/clusters/by-keyword")
+def api_clusters_by_keyword(
+    keyword: str,
+    hours: int = 168,
+    limit: int = 50,
+    offset: int = 0,
+):
+    """Return clusters matching a keyword, filtered by event time via SQL junction table."""
+    try:
+        store = DashboardStore(_shared_store)
+        return store.get_clusters_by_keyword(
+            keyword=keyword.strip().lower(),
+            hours=hours,
+            limit=limit,
+            offset=offset,
+        )
+    except Exception as e:
+        return _api_err(e, f"by-keyword(keyword={keyword},hours={hours})")
+
 @app.get("/api/v1/cluster/{cluster_id}")
 def api_cluster_detail(cluster_id: str):
     """Full cluster detail for drill-down."""

+ 76 - 150
news_mcp/storage/sqlite_store.py

@@ -146,6 +146,14 @@ def sanitize_cluster_payload(cluster: dict[str, Any], *, include_resolutions: bo
     for field in ("timestamp", "last_updated", "first_seen"):
         if field in out and out[field]:
             out[field] = _normalize_ts(out[field])
+    # Ensure timestamp is always present for the generated column index.
+    # Prefer existing timestamp, then first_seen, then last_updated, then now.
+    for src in ("timestamp", "first_seen", "last_updated"):
+        if out.get(src):
+            out.setdefault("timestamp", out[src])
+            break
+    if not out.get("timestamp"):
+        out["timestamp"] = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S+00:00")
 
     if not include_resolutions:
         return out
@@ -206,6 +214,44 @@ class SQLiteClusterStore:
                 "CREATE INDEX IF NOT EXISTS idx_clusters_updated_at ON clusters(updated_at)"
             )
 
+            # Generated column for indexed event-time filtering (VIRTUAL for compatibility)
+            try:
+                conn.execute(
+                    "ALTER TABLE clusters ADD COLUMN payload_ts "
+                    "GENERATED ALWAYS AS (json_extract(payload, '$.timestamp')) VIRTUAL"
+                )
+            except sqlite3.OperationalError:
+                pass  # column already exists
+            conn.execute(
+                "CREATE INDEX IF NOT EXISTS idx_clusters_payload_ts ON clusters(payload_ts)"
+            )
+
+            # Junction tables for SQL-level entity/keyword search
+            conn.execute(
+                """
+                CREATE TABLE IF NOT EXISTS cluster_entities (
+                    cluster_id TEXT NOT NULL REFERENCES clusters(cluster_id) ON DELETE CASCADE,
+                    entity     TEXT NOT NULL,
+                    PRIMARY KEY (cluster_id, entity)
+                )
+                """
+            )
+            conn.execute(
+                "CREATE INDEX IF NOT EXISTS idx_cluster_entities_entity ON cluster_entities(entity)"
+            )
+            conn.execute(
+                """
+                CREATE TABLE IF NOT EXISTS cluster_keywords (
+                    cluster_id TEXT NOT NULL REFERENCES clusters(cluster_id) ON DELETE CASCADE,
+                    keyword    TEXT NOT NULL,
+                    PRIMARY KEY (cluster_id, keyword)
+                )
+                """
+            )
+            conn.execute(
+                "CREATE INDEX IF NOT EXISTS idx_cluster_keywords_keyword ON cluster_keywords(keyword)"
+            )
+
             try:
                 cur = conn.execute("PRAGMA table_info(entity_metadata)")
                 cols = [row[1] for row in cur.fetchall()]
@@ -267,6 +313,24 @@ class SQLiteClusterStore:
                     "ON CONFLICT(cluster_id) DO UPDATE SET topic=excluded.topic, payload=excluded.payload, updated_at=excluded.updated_at",
                     (cluster_id, topic, payload, now.isoformat()),
                 )
+                # Populate junction tables for SQL-level entity/keyword search.
+                # DELETE first so re-enrichment replaces stale entries.
+                conn.execute("DELETE FROM cluster_entities WHERE cluster_id=?", (cluster_id,))
+                conn.execute("DELETE FROM cluster_keywords WHERE cluster_id=?", (cluster_id,))
+                for entity in c.get("entities", []):
+                    ent_norm = str(entity).strip().lower()
+                    if ent_norm:
+                        conn.execute(
+                            "INSERT OR IGNORE INTO cluster_entities(cluster_id, entity) VALUES(?, ?)",
+                            (cluster_id, ent_norm),
+                        )
+                for kw in c.get("keywords", []):
+                    kw_norm = str(kw).strip().lower()
+                    if kw_norm:
+                        conn.execute(
+                            "INSERT OR IGNORE INTO cluster_keywords(cluster_id, keyword) VALUES(?, ?)",
+                            (cluster_id, kw_norm),
+                        )
 
     def upsert_cluster_summary(
         self,
@@ -299,39 +363,24 @@ class SQLiteClusterStore:
             return json.loads(row[0])
 
     def get_latest_clusters(self, topic: str, ttl_hours: float, limit: int) -> list[dict]:
-        """Return newest clusters by their own event timestamp (payload.timestamp).
-
-        payload.timestamp is guaranteed ISO 8601 UTC — use _read_ts, not raw
-        JSON parsing with RFC 2822 fallbacks.
-        """
-        cutoff_ts = (datetime.now(timezone.utc) - timedelta(hours=float(ttl_hours))).timestamp()
-
+        """Return newest clusters by event timestamp, filtered via SQL payload_ts index."""
+        cutoff = (datetime.now(timezone.utc) - timedelta(hours=float(ttl_hours))).isoformat()
         with self._conn() as conn:
             cur = conn.execute(
-                "SELECT payload FROM clusters WHERE topic=? ORDER BY updated_at DESC",
-                (topic,),
+                "SELECT payload FROM clusters WHERE topic=? AND payload_ts >= ? ORDER BY payload_ts DESC LIMIT ?",
+                (topic, cutoff, int(limit)),
             )
-            candidates = [json.loads(r[0]) for r in cur.fetchall()]
-
-        filtered = [c for c in candidates if (_read_ts(c.get("timestamp")) or 0.0) >= cutoff_ts]
-        filtered.sort(key=lambda c: _read_ts(c.get("timestamp")) or 0.0, reverse=True)
-        return filtered[: int(limit)]
+            return [json.loads(r[0]) for r in cur.fetchall()]
 
     def get_latest_clusters_all_topics(self, ttl_hours: float, limit: int) -> list[dict]:
-        """Return newest clusters across all topics by event timestamp.
-
-        payload.timestamp is guaranteed ISO 8601 UTC — use _read_ts, not raw
-        JSON parsing with RFC 2822 fallbacks.
-        """
-        cutoff_ts = (datetime.now(timezone.utc) - timedelta(hours=float(ttl_hours))).timestamp()
-
+        """Return newest clusters across all topics, filtered via SQL payload_ts index."""
+        cutoff = (datetime.now(timezone.utc) - timedelta(hours=float(ttl_hours))).isoformat()
         with self._conn() as conn:
-            cur = conn.execute("SELECT payload FROM clusters ORDER BY updated_at DESC")
-            candidates = [json.loads(r[0]) for r in cur.fetchall()]
-
-        filtered = [c for c in candidates if (_read_ts(c.get("timestamp")) or 0.0) >= cutoff_ts]
-        filtered.sort(key=lambda c: _read_ts(c.get("timestamp")) or 0.0, reverse=True)
-        return filtered[: int(limit)]
+            cur = conn.execute(
+                "SELECT payload FROM clusters WHERE payload_ts >= ? ORDER BY payload_ts DESC LIMIT ?",
+                (cutoff, int(limit)),
+            )
+            return [json.loads(r[0]) for r in cur.fetchall()]
 
     def get_cluster_by_id(self, cluster_id: str) -> dict | None:
         with self._conn() as conn:
@@ -645,129 +694,6 @@ class SQLiteClusterStore:
                 "feeds": feeds,
             }
 
-    def get_clusters_page(
-        self,
-        topic: str | None = None,
-        hours: float = 24,
-        limit: int = 20,
-        offset: int = 0,
-    ) -> list[dict[str, Any]]:
-        """Paginated cluster listing filtered by payload.timestamp (event time).
-
-        payload.timestamp is guaranteed ISO 8601 UTC — filtered and sorted
-        using _read_ts, not updated_at (row modification time).
-        """
-        cutoff_ts = (datetime.now(timezone.utc) - timedelta(hours=hours)).timestamp()
-
-        query = "SELECT payload FROM clusters"
-        params: list = []
-        if topic and topic != "all":
-            query += " WHERE topic = ?"
-            params.append(topic)
-
-        with self._conn() as conn:
-            rows = conn.execute(query, params).fetchall()
-
-        filtered = [json.loads(r[0]) for r in rows]
-        filtered = [c for c in filtered if (_read_ts(c.get("timestamp")) or 0.0) >= cutoff_ts]
-        filtered.sort(key=lambda c: _read_ts(c.get("timestamp")) or 0.0, reverse=True)
-        page = filtered[offset:offset + limit]
-
-        return [
-            {
-                "cluster_id": c.get("cluster_id", ""),
-                "headline": c.get("headline", ""),
-                "topic": c.get("topic", ""),
-                "sentiment": c.get("sentiment", "neutral"),
-                "sentimentScore": c.get("sentimentScore"),
-                "importance": c.get("importance", 0),
-                "entities": c.get("entities", []),
-                "sources": c.get("sources", []),
-                "timestamp": c.get("timestamp", ""),
-                "keywords": c.get("keywords", []),
-                "article_count": len(c.get("articles", [])),
-            }
-            for c in page
-        ]
-
-    def get_sentiment_series(
-            self,
-            topic: str | None = None,
-            hours: float = 24,
-            bucket_hours: float = 1,
-        ) -> list[dict[str, Any]]:
-            """Sentiment score averaged per time bucket.
-
-            Filters by payload.timestamp (event time, ISO 8601 UTC guaranteed).
-            """
-            cutoff_ts = (datetime.now(timezone.utc) - timedelta(hours=hours)).timestamp()
-            query = "SELECT payload FROM clusters"
-            params: list = []
-            if topic and topic != "all":
-                query += " WHERE topic = ?"
-                params.append(topic)
-
-            with self._conn() as conn:
-                rows = conn.execute(query, params).fetchall()
-
-            buckets: dict[datetime, list[float]] = {}
-            for (payload_text,) in rows:
-                c = json.loads(payload_text)
-                ts = _read_ts(c.get("timestamp"))
-                score = c.get("sentimentScore")
-                if ts is None or score is None:
-                    continue
-                if ts < cutoff_ts:
-                    continue
-                dt = datetime.fromtimestamp(ts, tz=timezone.utc)
-                bucket_key = dt.replace(minute=0, second=0, microsecond=0)
-                if bucket_hours > 1:
-                    bucket_key = bucket_key.replace(
-                        hour=(bucket_key.hour // int(bucket_hours)) * int(bucket_hours)
-                    )
-                buckets.setdefault(bucket_key, []).append(float(score))
-
-            return [
-                {
-                    "time": bucket_key.isoformat(),
-                    "avg_sentiment": round(sum(scores) / len(scores), 3),
-                    "count": len(scores),
-                    "min": round(min(scores), 3),
-                    "max": round(max(scores), 3),
-                }
-                for bucket_key, scores in sorted(buckets.items())
-            ]
-
-    def get_entity_frequencies(
-        self,
-        hours: float = 24,
-        limit: int = 30,
-    ) -> list[dict[str, Any]]:
-        """Top entities by mention count filtered by payload.timestamp (ISO 8601 UTC guaranteed)."""
-        cutoff_ts = (datetime.now(timezone.utc) - timedelta(hours=hours)).timestamp()
-
-        with self._conn() as conn:
-            rows = conn.execute("SELECT payload FROM clusters").fetchall()
-
-        counter: dict[str, int] = {}
-        for (payload_text,) in rows:
-            c = json.loads(payload_text)
-            if (_read_ts(c.get("timestamp")) or 0.0) < cutoff_ts:
-                continue
-            for ent in c.get("entities", []):
-                counter[ent] = counter.get(ent, 0) + 1
-
-        result: list[dict[str, Any]] = []
-        for label, count in sorted(counter.items(), key=lambda x: -x[1])[:limit]:
-            meta = self.get_entity_metadata(label)
-            result.append({
-                "label": label,
-                "count": count,
-                "canonical_label": meta["canonical_label"] if meta else label,
-                "mid": meta["mid"] if meta else None,
-            })
-        return result
-
     def get_cluster_detail(self, cluster_id: str) -> dict[str, Any] | None:
         """Dashboard-optimized cluster detail fetch."""
         with self._conn() as conn:

+ 116 - 0
scripts/backfill_junction_tables.py

@@ -0,0 +1,116 @@
+#!/usr/bin/env python3
+"""Backfill: populate cluster_entities and cluster_keywords junction tables.
+
+Reads every cluster payload from the DB, extracts entities and keywords,
+and inserts them into the junction tables.  Idempotent — safe to re-run.
+
+Usage:
+    python3 scripts/backfill_junction_tables.py
+    python3 scripts/backfill_junction_tables.py --db /path/to/news.sqlite
+"""
+
+from __future__ import annotations
+
+import argparse
+import json
+import sqlite3
+import sys
+from pathlib import Path
+
+
+def main() -> None:
+    parser = argparse.ArgumentParser(
+        description="Populate cluster_entities and cluster_keywords from existing payloads"
+    )
+    parser.add_argument(
+        "--db",
+        default=str(Path(__file__).resolve().parent.parent / "news_mcp" / "data" / "news.sqlite"),
+        help="Path to news.sqlite (default: dev DB)",
+    )
+    args = parser.parse_args()
+
+    db_path = args.db
+    conn = sqlite3.connect(db_path)
+
+    # Ensure junction tables exist (same DDL as _init_db)
+    conn.execute(
+        """
+        CREATE TABLE IF NOT EXISTS cluster_entities (
+            cluster_id TEXT NOT NULL REFERENCES clusters(cluster_id) ON DELETE CASCADE,
+            entity     TEXT NOT NULL,
+            PRIMARY KEY (cluster_id, entity)
+        )
+        """
+    )
+    conn.execute(
+        "CREATE INDEX IF NOT EXISTS idx_cluster_entities_entity ON cluster_entities(entity)"
+    )
+    conn.execute(
+        """
+        CREATE TABLE IF NOT EXISTS cluster_keywords (
+            cluster_id TEXT NOT NULL REFERENCES clusters(cluster_id) ON DELETE CASCADE,
+            keyword    TEXT NOT NULL,
+            PRIMARY KEY (cluster_id, keyword)
+        )
+        """
+    )
+    conn.execute(
+        "CREATE INDEX IF NOT EXISTS idx_cluster_keywords_keyword ON cluster_keywords(keyword)"
+    )
+
+    # Ensure payload_ts generated column exists
+    try:
+        conn.execute(
+            "ALTER TABLE clusters ADD COLUMN payload_ts "
+            "GENERATED ALWAYS AS (json_extract(payload, '$.timestamp')) VIRTUAL"
+        )
+    except sqlite3.OperationalError:
+        pass  # already exists
+
+    rows = conn.execute("SELECT cluster_id, payload FROM clusters").fetchall()
+    total = len(rows)
+    entities_count = 0
+    keywords_count = 0
+
+    for cluster_id, payload_text in rows:
+        payload = json.loads(payload_text)
+
+        # Clear stale entries (idempotent re-run)
+        conn.execute("DELETE FROM cluster_entities WHERE cluster_id = ?", (cluster_id,))
+        conn.execute("DELETE FROM cluster_keywords WHERE cluster_id = ?", (cluster_id,))
+
+        for entity in payload.get("entities", []):
+            ent_norm = str(entity).strip().lower()
+            if ent_norm:
+                conn.execute(
+                    "INSERT OR IGNORE INTO cluster_entities(cluster_id, entity) VALUES(?, ?)",
+                    (cluster_id, ent_norm),
+                )
+                entities_count += 1
+
+        for kw in payload.get("keywords", []):
+            kw_norm = str(kw).strip().lower()
+            if kw_norm:
+                conn.execute(
+                    "INSERT OR IGNORE INTO cluster_keywords(cluster_id, keyword) VALUES(?, ?)",
+                    (cluster_id, kw_norm),
+                )
+                keywords_count += 1
+
+    conn.commit()
+
+    # Report
+    final_entities = conn.execute("SELECT COUNT(*) FROM cluster_entities").fetchone()[0]
+    final_keywords = conn.execute("SELECT COUNT(*) FROM cluster_keywords").fetchone()[0]
+    conn.close()
+
+    print(f"Backfill complete:")
+    print(f"  Clusters processed: {total}")
+    print(f"  Entity rows inserted this run: {entities_count}")
+    print(f"  Keyword rows inserted this run: {keywords_count}")
+    print(f"  Total entity rows in DB: {final_entities}")
+    print(f"  Total keyword rows in DB: {final_keywords}")
+
+
+if __name__ == "__main__":
+    main()