| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116 |
- #!/usr/bin/env python3
- """Backfill: populate cluster_entities and cluster_keywords junction tables.
- Reads every cluster payload from the DB, extracts entities and keywords,
- and inserts them into the junction tables. Idempotent — safe to re-run.
- Usage:
- python3 scripts/backfill_junction_tables.py
- python3 scripts/backfill_junction_tables.py --db /path/to/news.sqlite
- """
- from __future__ import annotations
- import argparse
- import json
- import sqlite3
- import sys
- from pathlib import Path
- def main() -> None:
- parser = argparse.ArgumentParser(
- description="Populate cluster_entities and cluster_keywords from existing payloads"
- )
- parser.add_argument(
- "--db",
- default=str(Path(__file__).resolve().parent.parent / "news_mcp" / "data" / "news.sqlite"),
- help="Path to news.sqlite (default: dev DB)",
- )
- args = parser.parse_args()
- db_path = args.db
- conn = sqlite3.connect(db_path)
- # Ensure junction tables exist (same DDL as _init_db)
- conn.execute(
- """
- CREATE TABLE IF NOT EXISTS cluster_entities (
- cluster_id TEXT NOT NULL REFERENCES clusters(cluster_id) ON DELETE CASCADE,
- entity TEXT NOT NULL,
- PRIMARY KEY (cluster_id, entity)
- )
- """
- )
- conn.execute(
- "CREATE INDEX IF NOT EXISTS idx_cluster_entities_entity ON cluster_entities(entity)"
- )
- conn.execute(
- """
- CREATE TABLE IF NOT EXISTS cluster_keywords (
- cluster_id TEXT NOT NULL REFERENCES clusters(cluster_id) ON DELETE CASCADE,
- keyword TEXT NOT NULL,
- PRIMARY KEY (cluster_id, keyword)
- )
- """
- )
- conn.execute(
- "CREATE INDEX IF NOT EXISTS idx_cluster_keywords_keyword ON cluster_keywords(keyword)"
- )
- # Ensure payload_ts generated column exists
- try:
- conn.execute(
- "ALTER TABLE clusters ADD COLUMN payload_ts "
- "GENERATED ALWAYS AS (json_extract(payload, '$.timestamp')) VIRTUAL"
- )
- except sqlite3.OperationalError:
- pass # already exists
- rows = conn.execute("SELECT cluster_id, payload FROM clusters").fetchall()
- total = len(rows)
- entities_count = 0
- keywords_count = 0
- for cluster_id, payload_text in rows:
- payload = json.loads(payload_text)
- # Clear stale entries (idempotent re-run)
- conn.execute("DELETE FROM cluster_entities WHERE cluster_id = ?", (cluster_id,))
- conn.execute("DELETE FROM cluster_keywords WHERE cluster_id = ?", (cluster_id,))
- for entity in payload.get("entities", []):
- ent_norm = str(entity).strip().lower()
- if ent_norm:
- conn.execute(
- "INSERT OR IGNORE INTO cluster_entities(cluster_id, entity) VALUES(?, ?)",
- (cluster_id, ent_norm),
- )
- entities_count += 1
- for kw in payload.get("keywords", []):
- kw_norm = str(kw).strip().lower()
- if kw_norm:
- conn.execute(
- "INSERT OR IGNORE INTO cluster_keywords(cluster_id, keyword) VALUES(?, ?)",
- (cluster_id, kw_norm),
- )
- keywords_count += 1
- conn.commit()
- # Report
- final_entities = conn.execute("SELECT COUNT(*) FROM cluster_entities").fetchone()[0]
- final_keywords = conn.execute("SELECT COUNT(*) FROM cluster_keywords").fetchone()[0]
- conn.close()
- print(f"Backfill complete:")
- print(f" Clusters processed: {total}")
- print(f" Entity rows inserted this run: {entities_count}")
- print(f" Keyword rows inserted this run: {keywords_count}")
- print(f" Total entity rows in DB: {final_entities}")
- print(f" Total keyword rows in DB: {final_keywords}")
- if __name__ == "__main__":
- main()
|