|
|
@@ -7,6 +7,9 @@ and inserts them into the junction tables. Idempotent — safe to re-run.
|
|
|
Usage:
|
|
|
python3 scripts/backfill_junction_tables.py
|
|
|
python3 scripts/backfill_junction_tables.py --db /path/to/news.sqlite
|
|
|
+
|
|
|
+The script uses NEWS_MCP_DB_PATH env var or --db arg to locate the database.
|
|
|
+In the Docker container, NEWS_MCP_DB_PATH is set to ./data/news.sqlite.
|
|
|
"""
|
|
|
|
|
|
from __future__ import annotations
|
|
|
@@ -17,20 +20,27 @@ import sqlite3
|
|
|
import sys
|
|
|
from pathlib import Path
|
|
|
|
|
|
+ROOT = Path(__file__).resolve().parents[1]
|
|
|
+sys.path.insert(0, str(ROOT))
|
|
|
+
|
|
|
+from news_mcp.config import DB_PATH
|
|
|
+
|
|
|
|
|
|
def main() -> None:
|
|
|
parser = argparse.ArgumentParser(
|
|
|
description="Populate cluster_entities and cluster_keywords from existing payloads"
|
|
|
)
|
|
|
- parser.add_argument(
|
|
|
- "--db",
|
|
|
- default=str(Path(__file__).resolve().parent.parent / "news_mcp" / "data" / "news.sqlite"),
|
|
|
- help="Path to news.sqlite (default: dev DB)",
|
|
|
- )
|
|
|
+ parser.add_argument("--db", type=Path, default=DB_PATH)
|
|
|
args = parser.parse_args()
|
|
|
|
|
|
- db_path = args.db
|
|
|
+ db_path = str(args.db)
|
|
|
+ if not Path(db_path).exists():
|
|
|
+ print(f"ERROR: database file not found: {db_path}")
|
|
|
+ print("Check NEWS_MCP_DB_PATH env var or pass --db /path/to/db")
|
|
|
+ sys.exit(1)
|
|
|
+
|
|
|
conn = sqlite3.connect(db_path)
|
|
|
+ conn.execute("PRAGMA journal_mode=WAL")
|
|
|
|
|
|
# Ensure junction tables exist (same DDL as _init_db)
|
|
|
conn.execute(
|
|
|
@@ -65,17 +75,20 @@ def main() -> None:
|
|
|
"GENERATED ALWAYS AS (json_extract(payload, '$.timestamp')) VIRTUAL"
|
|
|
)
|
|
|
except sqlite3.OperationalError:
|
|
|
- pass # already exists
|
|
|
+ pass # column already exists
|
|
|
|
|
|
rows = conn.execute("SELECT cluster_id, payload FROM clusters").fetchall()
|
|
|
total = len(rows)
|
|
|
- entities_count = 0
|
|
|
- keywords_count = 0
|
|
|
+ entities_inserted = 0
|
|
|
+ keywords_inserted = 0
|
|
|
|
|
|
- for cluster_id, payload_text in rows:
|
|
|
- payload = json.loads(payload_text)
|
|
|
+ for i, (cluster_id, payload_text) in enumerate(rows):
|
|
|
+ try:
|
|
|
+ payload = json.loads(payload_text)
|
|
|
+ except Exception:
|
|
|
+ continue
|
|
|
|
|
|
- # Clear stale entries (idempotent re-run)
|
|
|
+ # Clear stale entries (idempotent re-run handles re-enrichment)
|
|
|
conn.execute("DELETE FROM cluster_entities WHERE cluster_id = ?", (cluster_id,))
|
|
|
conn.execute("DELETE FROM cluster_keywords WHERE cluster_id = ?", (cluster_id,))
|
|
|
|
|
|
@@ -86,7 +99,7 @@ def main() -> None:
|
|
|
"INSERT OR IGNORE INTO cluster_entities(cluster_id, entity) VALUES(?, ?)",
|
|
|
(cluster_id, ent_norm),
|
|
|
)
|
|
|
- entities_count += 1
|
|
|
+ entities_inserted += 1
|
|
|
|
|
|
for kw in payload.get("keywords", []):
|
|
|
kw_norm = str(kw).strip().lower()
|
|
|
@@ -95,7 +108,10 @@ def main() -> None:
|
|
|
"INSERT OR IGNORE INTO cluster_keywords(cluster_id, keyword) VALUES(?, ?)",
|
|
|
(cluster_id, kw_norm),
|
|
|
)
|
|
|
- keywords_count += 1
|
|
|
+ keywords_inserted += 1
|
|
|
+
|
|
|
+ if (i + 1) % 500 == 0 or i == total - 1:
|
|
|
+ print(f" Processed {i+1}/{total}...", flush=True)
|
|
|
|
|
|
conn.commit()
|
|
|
|
|
|
@@ -104,10 +120,10 @@ def main() -> None:
|
|
|
final_keywords = conn.execute("SELECT COUNT(*) FROM cluster_keywords").fetchone()[0]
|
|
|
conn.close()
|
|
|
|
|
|
- print(f"Backfill complete:")
|
|
|
+ print(f"\nBackfill complete:")
|
|
|
print(f" Clusters processed: {total}")
|
|
|
- print(f" Entity rows inserted this run: {entities_count}")
|
|
|
- print(f" Keyword rows inserted this run: {keywords_count}")
|
|
|
+ print(f" Entity rows inserted this run: {entities_inserted}")
|
|
|
+ print(f" Keyword rows inserted this run: {keywords_inserted}")
|
|
|
print(f" Total entity rows in DB: {final_entities}")
|
|
|
print(f" Total keyword rows in DB: {final_keywords}")
|
|
|
|