| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113 |
- from __future__ import annotations
- """One-off maintenance: enforce ENTITY_BLACKLIST patterns inside stored clusters.
- This script rewrites clusters in news-mcp's SQLite DB so that:
- - payload.entities: remove entities matching ENTITY_BLACKLIST patterns
- - payload.keywords: remove keywords matching ENTITY_BLACKLIST patterns
- - payload.topic: if topic matches ENTITY_BLACKLIST patterns, set topic='other'
- It mirrors the matching approach used by news-mcp (case-insensitive fnmatch).
- Usage examples:
- ././.venv/bin/python scripts/enforce_news_blacklist.py --dry-run --limit 200
- ././.venv/bin/python scripts/enforce_news_blacklist.py --limit 1000
- """
- import argparse
- import fnmatch
- import json
- import sys
- from pathlib import Path
- from typing import Any
- ROOT = Path(__file__).resolve().parents[1]
- sys.path.insert(0, str(ROOT))
- from news_mcp.config import DB_PATH, NEWS_ENTITY_BLACKLIST
- from news_mcp.storage.sqlite_store import SQLiteClusterStore
- def _matches_blacklist(value: str, blacklist: list[str]) -> bool:
- key = str(value).strip().lower()
- if not key:
- return True
- return any(fnmatch.fnmatchcase(key, pattern) for pattern in blacklist)
- def enforce(cluster: dict, blacklist: list[str]) -> tuple[dict, bool]:
- changed = False
- ents = cluster.get("entities", []) or []
- new_ents = []
- for e in ents:
- if _matches_blacklist(str(e), blacklist):
- changed = True
- continue
- new_ents.append(e)
- cluster = dict(cluster)
- cluster["entities"] = new_ents
- kws = cluster.get("keywords", []) or []
- new_kws = []
- for k in kws:
- if _matches_blacklist(str(k), blacklist):
- changed = True
- continue
- new_kws.append(k)
- cluster["keywords"] = new_kws
- topic = cluster.get("topic") or "other"
- if topic and _matches_blacklist(topic, blacklist):
- cluster["topic"] = "other"
- changed = True
- return cluster, changed
- def main() -> None:
- parser = argparse.ArgumentParser(description="Enforce news-mcp ENTITY_BLACKLIST patterns in stored clusters")
- parser.add_argument("--db", type=Path, default=DB_PATH, help="Path to the news sqlite DB")
- parser.add_argument("--limit", type=int, default=None, help="Optional max number of clusters to scan")
- parser.add_argument("--dry-run", action="store_true", help="Compute changes without writing them")
- args = parser.parse_args()
- if not NEWS_ENTITY_BLACKLIST:
- print("ENTITY_BLACKLIST is empty; nothing to do.")
- return
- store = SQLiteClusterStore(args.db)
- with store._conn() as conn: # noqa: SLF001
- cur = conn.execute(
- "SELECT cluster_id, topic, payload FROM clusters ORDER BY updated_at ASC"
- )
- rows = cur.fetchall()
- rows = rows[: args.limit] if args.limit else rows
- total = 0
- updated = 0
- print(f"starting blacklist enforcement: clusters={len(rows)} dry_run={args.dry_run}")
- for cluster_id, topic, payload_json in rows:
- total += 1
- cluster = json.loads(payload_json)
- new_cluster, changed = enforce(cluster, NEWS_ENTITY_BLACKLIST)
- if not changed:
- continue
- if not args.dry_run:
- store.upsert_clusters([new_cluster], topic=topic or new_cluster.get("topic", "other"))
- updated += 1
- # mild progress
- if updated % 25 == 0:
- print(f"updated={updated} (processed={total})")
- print({"total_scanned": total, "updated": updated, "dry_run": args.dry_run})
- if __name__ == "__main__":
- main()
|