backfill_junction_tables.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. #!/usr/bin/env python3
  2. """Backfill: populate cluster_entities and cluster_keywords junction tables.
  3. Reads every cluster payload from the DB, extracts entities and keywords,
  4. and inserts them into the junction tables. Idempotent — safe to re-run.
  5. Usage:
  6. python3 scripts/backfill_junction_tables.py
  7. python3 scripts/backfill_junction_tables.py --db /path/to/news.sqlite
  8. """
  9. from __future__ import annotations
  10. import argparse
  11. import json
  12. import sqlite3
  13. import sys
  14. from pathlib import Path
  15. def main() -> None:
  16. parser = argparse.ArgumentParser(
  17. description="Populate cluster_entities and cluster_keywords from existing payloads"
  18. )
  19. parser.add_argument(
  20. "--db",
  21. default=str(Path(__file__).resolve().parent.parent / "news_mcp" / "data" / "news.sqlite"),
  22. help="Path to news.sqlite (default: dev DB)",
  23. )
  24. args = parser.parse_args()
  25. db_path = args.db
  26. conn = sqlite3.connect(db_path)
  27. # Ensure junction tables exist (same DDL as _init_db)
  28. conn.execute(
  29. """
  30. CREATE TABLE IF NOT EXISTS cluster_entities (
  31. cluster_id TEXT NOT NULL REFERENCES clusters(cluster_id) ON DELETE CASCADE,
  32. entity TEXT NOT NULL,
  33. PRIMARY KEY (cluster_id, entity)
  34. )
  35. """
  36. )
  37. conn.execute(
  38. "CREATE INDEX IF NOT EXISTS idx_cluster_entities_entity ON cluster_entities(entity)"
  39. )
  40. conn.execute(
  41. """
  42. CREATE TABLE IF NOT EXISTS cluster_keywords (
  43. cluster_id TEXT NOT NULL REFERENCES clusters(cluster_id) ON DELETE CASCADE,
  44. keyword TEXT NOT NULL,
  45. PRIMARY KEY (cluster_id, keyword)
  46. )
  47. """
  48. )
  49. conn.execute(
  50. "CREATE INDEX IF NOT EXISTS idx_cluster_keywords_keyword ON cluster_keywords(keyword)"
  51. )
  52. # Ensure payload_ts generated column exists
  53. try:
  54. conn.execute(
  55. "ALTER TABLE clusters ADD COLUMN payload_ts "
  56. "GENERATED ALWAYS AS (json_extract(payload, '$.timestamp')) VIRTUAL"
  57. )
  58. except sqlite3.OperationalError:
  59. pass # already exists
  60. rows = conn.execute("SELECT cluster_id, payload FROM clusters").fetchall()
  61. total = len(rows)
  62. entities_count = 0
  63. keywords_count = 0
  64. for cluster_id, payload_text in rows:
  65. payload = json.loads(payload_text)
  66. # Clear stale entries (idempotent re-run)
  67. conn.execute("DELETE FROM cluster_entities WHERE cluster_id = ?", (cluster_id,))
  68. conn.execute("DELETE FROM cluster_keywords WHERE cluster_id = ?", (cluster_id,))
  69. for entity in payload.get("entities", []):
  70. ent_norm = str(entity).strip().lower()
  71. if ent_norm:
  72. conn.execute(
  73. "INSERT OR IGNORE INTO cluster_entities(cluster_id, entity) VALUES(?, ?)",
  74. (cluster_id, ent_norm),
  75. )
  76. entities_count += 1
  77. for kw in payload.get("keywords", []):
  78. kw_norm = str(kw).strip().lower()
  79. if kw_norm:
  80. conn.execute(
  81. "INSERT OR IGNORE INTO cluster_keywords(cluster_id, keyword) VALUES(?, ?)",
  82. (cluster_id, kw_norm),
  83. )
  84. keywords_count += 1
  85. conn.commit()
  86. # Report
  87. final_entities = conn.execute("SELECT COUNT(*) FROM cluster_entities").fetchone()[0]
  88. final_keywords = conn.execute("SELECT COUNT(*) FROM cluster_keywords").fetchone()[0]
  89. conn.close()
  90. print(f"Backfill complete:")
  91. print(f" Clusters processed: {total}")
  92. print(f" Entity rows inserted this run: {entities_count}")
  93. print(f" Keyword rows inserted this run: {keywords_count}")
  94. print(f" Total entity rows in DB: {final_entities}")
  95. print(f" Total keyword rows in DB: {final_keywords}")
  96. if __name__ == "__main__":
  97. main()