from __future__ import annotations """One-off maintenance: enforce ENTITY_BLACKLIST patterns inside stored clusters. This script rewrites clusters in news-mcp's SQLite DB so that: - payload.entities: remove entities matching ENTITY_BLACKLIST patterns - payload.keywords: remove keywords matching ENTITY_BLACKLIST patterns - payload.topic: if topic matches ENTITY_BLACKLIST patterns, set topic='other' It mirrors the matching approach used by news-mcp (case-insensitive fnmatch). Usage examples: ././.venv/bin/python scripts/enforce_news_blacklist.py --dry-run --limit 200 ././.venv/bin/python scripts/enforce_news_blacklist.py --limit 1000 """ import argparse import fnmatch import json import sys from pathlib import Path from typing import Any ROOT = Path(__file__).resolve().parents[1] sys.path.insert(0, str(ROOT)) from news_mcp.config import DB_PATH, NEWS_ENTITY_BLACKLIST from news_mcp.storage.sqlite_store import SQLiteClusterStore def _matches_blacklist(value: str, blacklist: list[str]) -> bool: key = str(value).strip().lower() if not key: return True return any(fnmatch.fnmatchcase(key, pattern) for pattern in blacklist) def enforce(cluster: dict, blacklist: list[str]) -> tuple[dict, bool]: changed = False ents = cluster.get("entities", []) or [] new_ents = [] for e in ents: if _matches_blacklist(str(e), blacklist): changed = True continue new_ents.append(e) cluster = dict(cluster) cluster["entities"] = new_ents kws = cluster.get("keywords", []) or [] new_kws = [] for k in kws: if _matches_blacklist(str(k), blacklist): changed = True continue new_kws.append(k) cluster["keywords"] = new_kws topic = cluster.get("topic") or "other" if topic and _matches_blacklist(topic, blacklist): cluster["topic"] = "other" changed = True return cluster, changed def main() -> None: parser = argparse.ArgumentParser(description="Enforce news-mcp ENTITY_BLACKLIST patterns in stored clusters") parser.add_argument("--db", type=Path, default=DB_PATH, help="Path to the news sqlite DB") parser.add_argument("--limit", type=int, default=None, help="Optional max number of clusters to scan") parser.add_argument("--dry-run", action="store_true", help="Compute changes without writing them") args = parser.parse_args() if not NEWS_ENTITY_BLACKLIST: print("ENTITY_BLACKLIST is empty; nothing to do.") return store = SQLiteClusterStore(args.db) with store._conn() as conn: # noqa: SLF001 cur = conn.execute( "SELECT cluster_id, topic, payload FROM clusters ORDER BY updated_at ASC" ) rows = cur.fetchall() rows = rows[: args.limit] if args.limit else rows total = 0 updated = 0 print(f"starting blacklist enforcement: clusters={len(rows)} dry_run={args.dry_run}") for cluster_id, topic, payload_json in rows: total += 1 cluster = json.loads(payload_json) new_cluster, changed = enforce(cluster, NEWS_ENTITY_BLACKLIST) if not changed: continue if not args.dry_run: store.upsert_clusters([new_cluster], topic=topic or new_cluster.get("topic", "other")) updated += 1 # mild progress if updated % 25 == 0: print(f"updated={updated} (processed={total})") print({"total_scanned": total, "updated": updated, "dry_run": args.dry_run}) if __name__ == "__main__": main()