backfill_junction_tables.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. #!/usr/bin/env python3
  2. """Backfill: populate cluster_entities and cluster_keywords junction tables.
  3. Reads every cluster payload from the DB, extracts entities and keywords,
  4. and inserts them into the junction tables. Idempotent — safe to re-run.
  5. Usage:
  6. python3 scripts/backfill_junction_tables.py
  7. python3 scripts/backfill_junction_tables.py --db /path/to/news.sqlite
  8. The script uses NEWS_MCP_DB_PATH env var or --db arg to locate the database.
  9. In the Docker container, NEWS_MCP_DB_PATH is set to ./data/news.sqlite.
  10. """
  11. from __future__ import annotations
  12. import argparse
  13. import json
  14. import sqlite3
  15. import sys
  16. from pathlib import Path
  17. ROOT = Path(__file__).resolve().parents[1]
  18. sys.path.insert(0, str(ROOT))
  19. from news_mcp.config import DB_PATH
  20. def main() -> None:
  21. parser = argparse.ArgumentParser(
  22. description="Populate cluster_entities and cluster_keywords from existing payloads"
  23. )
  24. parser.add_argument("--db", type=Path, default=DB_PATH)
  25. args = parser.parse_args()
  26. db_path = str(args.db)
  27. if not Path(db_path).exists():
  28. print(f"ERROR: database file not found: {db_path}")
  29. print("Check NEWS_MCP_DB_PATH env var or pass --db /path/to/db")
  30. sys.exit(1)
  31. conn = sqlite3.connect(db_path)
  32. conn.execute("PRAGMA journal_mode=WAL")
  33. # Ensure junction tables exist (same DDL as _init_db)
  34. conn.execute(
  35. """
  36. CREATE TABLE IF NOT EXISTS cluster_entities (
  37. cluster_id TEXT NOT NULL REFERENCES clusters(cluster_id) ON DELETE CASCADE,
  38. entity TEXT NOT NULL,
  39. PRIMARY KEY (cluster_id, entity)
  40. )
  41. """
  42. )
  43. conn.execute(
  44. "CREATE INDEX IF NOT EXISTS idx_cluster_entities_entity ON cluster_entities(entity)"
  45. )
  46. conn.execute(
  47. """
  48. CREATE TABLE IF NOT EXISTS cluster_keywords (
  49. cluster_id TEXT NOT NULL REFERENCES clusters(cluster_id) ON DELETE CASCADE,
  50. keyword TEXT NOT NULL,
  51. PRIMARY KEY (cluster_id, keyword)
  52. )
  53. """
  54. )
  55. conn.execute(
  56. "CREATE INDEX IF NOT EXISTS idx_cluster_keywords_keyword ON cluster_keywords(keyword)"
  57. )
  58. # Ensure payload_ts generated column exists
  59. try:
  60. conn.execute(
  61. "ALTER TABLE clusters ADD COLUMN payload_ts "
  62. "GENERATED ALWAYS AS (json_extract(payload, '$.timestamp')) VIRTUAL"
  63. )
  64. except sqlite3.OperationalError:
  65. pass # column already exists
  66. rows = conn.execute("SELECT cluster_id, payload FROM clusters").fetchall()
  67. total = len(rows)
  68. entities_inserted = 0
  69. keywords_inserted = 0
  70. for i, (cluster_id, payload_text) in enumerate(rows):
  71. try:
  72. payload = json.loads(payload_text)
  73. except Exception:
  74. continue
  75. # Clear stale entries (idempotent re-run handles re-enrichment)
  76. conn.execute("DELETE FROM cluster_entities WHERE cluster_id = ?", (cluster_id,))
  77. conn.execute("DELETE FROM cluster_keywords WHERE cluster_id = ?", (cluster_id,))
  78. for entity in payload.get("entities", []):
  79. ent_norm = str(entity).strip().lower()
  80. if ent_norm:
  81. conn.execute(
  82. "INSERT OR IGNORE INTO cluster_entities(cluster_id, entity) VALUES(?, ?)",
  83. (cluster_id, ent_norm),
  84. )
  85. entities_inserted += 1
  86. for kw in payload.get("keywords", []):
  87. kw_norm = str(kw).strip().lower()
  88. if kw_norm:
  89. conn.execute(
  90. "INSERT OR IGNORE INTO cluster_keywords(cluster_id, keyword) VALUES(?, ?)",
  91. (cluster_id, kw_norm),
  92. )
  93. keywords_inserted += 1
  94. if (i + 1) % 500 == 0 or i == total - 1:
  95. print(f" Processed {i+1}/{total}...", flush=True)
  96. conn.commit()
  97. # Report
  98. final_entities = conn.execute("SELECT COUNT(*) FROM cluster_entities").fetchone()[0]
  99. final_keywords = conn.execute("SELECT COUNT(*) FROM cluster_keywords").fetchone()[0]
  100. conn.close()
  101. print(f"\nBackfill complete:")
  102. print(f" Clusters processed: {total}")
  103. print(f" Entity rows inserted this run: {entities_inserted}")
  104. print(f" Keyword rows inserted this run: {keywords_inserted}")
  105. print(f" Total entity rows in DB: {final_entities}")
  106. print(f" Total keyword rows in DB: {final_keywords}")
  107. if __name__ == "__main__":
  108. main()