#!/usr/bin/env python3 """Backfill: populate cluster_entities and cluster_keywords junction tables. Reads every cluster payload from the DB, extracts entities and keywords, and inserts them into the junction tables. Idempotent — safe to re-run. Usage: python3 scripts/backfill_junction_tables.py python3 scripts/backfill_junction_tables.py --db /path/to/news.sqlite The script uses NEWS_MCP_DB_PATH env var or --db arg to locate the database. In the Docker container, NEWS_MCP_DB_PATH is set to ./data/news.sqlite. """ from __future__ import annotations import argparse import json import sqlite3 import sys from pathlib import Path ROOT = Path(__file__).resolve().parents[1] sys.path.insert(0, str(ROOT)) from news_mcp.config import DB_PATH def main() -> None: parser = argparse.ArgumentParser( description="Populate cluster_entities and cluster_keywords from existing payloads" ) parser.add_argument("--db", type=Path, default=DB_PATH) args = parser.parse_args() db_path = str(args.db) if not Path(db_path).exists(): print(f"ERROR: database file not found: {db_path}") print("Check NEWS_MCP_DB_PATH env var or pass --db /path/to/db") sys.exit(1) conn = sqlite3.connect(db_path) conn.execute("PRAGMA journal_mode=WAL") # Ensure junction tables exist (same DDL as _init_db) conn.execute( """ CREATE TABLE IF NOT EXISTS cluster_entities ( cluster_id TEXT NOT NULL REFERENCES clusters(cluster_id) ON DELETE CASCADE, entity TEXT NOT NULL, PRIMARY KEY (cluster_id, entity) ) """ ) conn.execute( "CREATE INDEX IF NOT EXISTS idx_cluster_entities_entity ON cluster_entities(entity)" ) conn.execute( """ CREATE TABLE IF NOT EXISTS cluster_keywords ( cluster_id TEXT NOT NULL REFERENCES clusters(cluster_id) ON DELETE CASCADE, keyword TEXT NOT NULL, PRIMARY KEY (cluster_id, keyword) ) """ ) conn.execute( "CREATE INDEX IF NOT EXISTS idx_cluster_keywords_keyword ON cluster_keywords(keyword)" ) # Ensure payload_ts generated column exists try: conn.execute( "ALTER TABLE clusters ADD COLUMN payload_ts " "GENERATED ALWAYS AS (json_extract(payload, '$.timestamp')) VIRTUAL" ) except sqlite3.OperationalError: pass # column already exists rows = conn.execute("SELECT cluster_id, payload FROM clusters").fetchall() total = len(rows) entities_inserted = 0 keywords_inserted = 0 for i, (cluster_id, payload_text) in enumerate(rows): try: payload = json.loads(payload_text) except Exception: continue # Clear stale entries (idempotent re-run handles re-enrichment) conn.execute("DELETE FROM cluster_entities WHERE cluster_id = ?", (cluster_id,)) conn.execute("DELETE FROM cluster_keywords WHERE cluster_id = ?", (cluster_id,)) for entity in payload.get("entities", []): ent_norm = str(entity).strip().lower() if ent_norm: conn.execute( "INSERT OR IGNORE INTO cluster_entities(cluster_id, entity) VALUES(?, ?)", (cluster_id, ent_norm), ) entities_inserted += 1 for kw in payload.get("keywords", []): kw_norm = str(kw).strip().lower() if kw_norm: conn.execute( "INSERT OR IGNORE INTO cluster_keywords(cluster_id, keyword) VALUES(?, ?)", (cluster_id, kw_norm), ) keywords_inserted += 1 if (i + 1) % 500 == 0 or i == total - 1: print(f" Processed {i+1}/{total}...", flush=True) conn.commit() # Report final_entities = conn.execute("SELECT COUNT(*) FROM cluster_entities").fetchone()[0] final_keywords = conn.execute("SELECT COUNT(*) FROM cluster_keywords").fetchone()[0] conn.close() print(f"\nBackfill complete:") print(f" Clusters processed: {total}") print(f" Entity rows inserted this run: {entities_inserted}") print(f" Keyword rows inserted this run: {keywords_inserted}") print(f" Total entity rows in DB: {final_entities}") print(f" Total keyword rows in DB: {final_keywords}") if __name__ == "__main__": main()