""" watchdog_runner.py — watches books/inbox/ and triggers the pipeline. Entry point: python -m book_ingestor.watchdog_runner Uses the watchdog library for filesystem events. Rich provides a live terminal dashboard when running interactively. In headless/container environments (no TTY), falls back to plain log output. """ from __future__ import annotations import logging import signal import sys import time from pathlib import Path from rich.console import Console from rich.live import Live from rich.panel import Panel from rich.table import Table from rich.text import Text from watchdog.events import FileSystemEvent, FileSystemEventHandler from watchdog.observers import Observer from .config import cfg from .pipeline import ingest # ── Logging setup ────────────────────────────────────────────────────────────── logging.basicConfig( level=getattr(logging, cfg.log_level.upper(), logging.INFO), format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", datefmt="%H:%M:%S", ) log = logging.getLogger(__name__) console = Console() # True when running in a real interactive terminal, False in Docker/headless IS_TTY = sys.stdout.isatty() SUPPORTED_EXTENSIONS = {".pdf"} # .md, .txt, .epub coming in phase 2 # ── Stats tracker ────────────────────────────────────────────────────────────── class Stats: def __init__(self): self.watched = str(Path(cfg.books_inbox).resolve()) self.processed = 0 self.failed = 0 self.skipped = 0 self.last_file: str = "—" self.last_status: str = "—" self.start_time = time.time() def uptime(self) -> str: elapsed = int(time.time() - self.start_time) h, m, s = elapsed // 3600, (elapsed % 3600) // 60, elapsed % 60 return f"{h:02d}:{m:02d}:{s:02d}" _stats = Stats() # ── Dashboard (TTY only) ─────────────────────────────────────────────────────── def _build_dashboard() -> Panel: grid = Table.grid(padding=(0, 2)) grid.add_column(style="bold cyan", min_width=18) grid.add_column() grid.add_row("📂 Watching", _stats.watched) grid.add_row("⏱ Uptime", _stats.uptime()) grid.add_row("✅ Processed", str(_stats.processed)) grid.add_row("⚠️ Failed", str(_stats.failed)) grid.add_row("⏭ Skipped", str(_stats.skipped)) grid.add_row("📄 Last file", _stats.last_file) grid.add_row( " Status", Text(_stats.last_status, style="green" if "✓" in _stats.last_status else "yellow"), ) return Panel( grid, title="[bold white]📚 book-ingestor[/]", subtitle="[dim]Drop a PDF into inbox/ to ingest · Ctrl+C to stop[/]", border_style="cyan", ) # ── Shared ingest logic ──────────────────────────────────────────────────────── def _run_ingest(pdf: Path) -> None: """Run ingestion and update stats. Works in both TTY and headless mode.""" _stats.last_file = pdf.name _stats.last_status = "⏳ Processing..." if not IS_TTY: log.info("Processing: %s", pdf.name) success = ingest(pdf) if success: _stats.processed += 1 _stats.last_status = "✓ Complete" if not IS_TTY: log.info("Complete: %s (processed=%d)", pdf.name, _stats.processed) else: _stats.failed += 1 _stats.last_status = "✗ Failed" if not IS_TTY: log.error("Failed: %s (failed=%d)", pdf.name, _stats.failed) # ── Watchdog handler ─────────────────────────────────────────────────────────── class InboxHandler(FileSystemEventHandler): def __init__(self, live: Live | None = None): self._live = live def on_created(self, event: FileSystemEvent) -> None: if event.is_directory: return path = Path(event.src_path) if path.suffix.lower() not in SUPPORTED_EXTENSIONS: log.debug("Ignoring unsupported file type: %s", path.name) return # Brief wait to ensure file write is complete time.sleep(1.0) if not path.exists(): return _run_ingest(path) if self._live: self._live.refresh() # ── Setup ────────────────────────────────────────────────────────────────────── def _ensure_folders() -> None: for folder in [ cfg.books_inbox, cfg.books_processing, cfg.books_done, cfg.books_manifests, ]: Path(folder).mkdir(parents=True, exist_ok=True) def _process_existing(live: Live | None = None) -> None: """Process any PDFs already sitting in inbox/ at startup.""" inbox = Path(cfg.books_inbox) existing = list(inbox.glob("*.pdf")) if not existing: return log.info("Found %d existing file(s) in inbox — processing...", len(existing)) for pdf in existing: _run_ingest(pdf) if live: live.refresh() # ── Entry point ──────────────────────────────────────────────────────────────── def _shutdown(observer: Observer) -> None: log.info("Shutting down...") observer.stop() observer.join() sys.exit(0) def run() -> None: _ensure_folders() observer = Observer() if IS_TTY: # ── Interactive mode: Rich live dashboard ────────────────────────────── with Live(_build_dashboard(), console=console, refresh_per_second=2) as live: _process_existing(live) handler = InboxHandler(live) observer.schedule(handler, cfg.books_inbox, recursive=False) observer.start() log.info("Watching: %s", Path(cfg.books_inbox).resolve()) signal.signal(signal.SIGINT, lambda s, f: _shutdown(observer)) signal.signal(signal.SIGTERM, lambda s, f: _shutdown(observer)) try: while observer.is_alive(): live.update(_build_dashboard()) time.sleep(1) except Exception: observer.stop() observer.join() raise else: # ── Headless mode: plain structured logging (Docker / no TTY) ───────── log.info("Headless mode — Rich dashboard disabled") log.info("Watching: %s", Path(cfg.books_inbox).resolve()) _process_existing() handler = InboxHandler() observer.schedule(handler, cfg.books_inbox, recursive=False) observer.start() signal.signal(signal.SIGINT, lambda s, f: _shutdown(observer)) signal.signal(signal.SIGTERM, lambda s, f: _shutdown(observer)) try: while observer.is_alive(): time.sleep(1) except Exception: observer.stop() observer.join() raise if __name__ == "__main__": run()