| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227 |
- """
- watchdog_runner.py — watches books/inbox/ and triggers the pipeline.
- Entry point: python -m book_ingestor.watchdog_runner
- Uses the watchdog library for filesystem events.
- Rich provides a live terminal dashboard when running interactively.
- In headless/container environments (no TTY), falls back to plain log output.
- """
- from __future__ import annotations
- import logging
- import signal
- import sys
- import time
- from pathlib import Path
- from rich.console import Console
- from rich.live import Live
- from rich.panel import Panel
- from rich.table import Table
- from rich.text import Text
- from watchdog.events import FileSystemEvent, FileSystemEventHandler
- from watchdog.observers import Observer
- from .config import cfg
- from .pipeline import ingest
- # ── Logging setup ──────────────────────────────────────────────────────────────
- logging.basicConfig(
- level=getattr(logging, cfg.log_level.upper(), logging.INFO),
- format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
- datefmt="%H:%M:%S",
- )
- log = logging.getLogger(__name__)
- console = Console()
- # True when running in a real interactive terminal, False in Docker/headless
- IS_TTY = sys.stdout.isatty()
- SUPPORTED_EXTENSIONS = {".pdf"} # .md, .txt, .epub coming in phase 2
- # ── Stats tracker ──────────────────────────────────────────────────────────────
- class Stats:
- def __init__(self):
- self.watched = str(Path(cfg.books_inbox).resolve())
- self.processed = 0
- self.failed = 0
- self.skipped = 0
- self.last_file: str = "—"
- self.last_status: str = "—"
- self.start_time = time.time()
- def uptime(self) -> str:
- elapsed = int(time.time() - self.start_time)
- h, m, s = elapsed // 3600, (elapsed % 3600) // 60, elapsed % 60
- return f"{h:02d}:{m:02d}:{s:02d}"
- _stats = Stats()
- # ── Dashboard (TTY only) ───────────────────────────────────────────────────────
- def _build_dashboard() -> Panel:
- grid = Table.grid(padding=(0, 2))
- grid.add_column(style="bold cyan", min_width=18)
- grid.add_column()
- grid.add_row("📂 Watching", _stats.watched)
- grid.add_row("⏱ Uptime", _stats.uptime())
- grid.add_row("✅ Processed", str(_stats.processed))
- grid.add_row("⚠️ Failed", str(_stats.failed))
- grid.add_row("⏭ Skipped", str(_stats.skipped))
- grid.add_row("📄 Last file", _stats.last_file)
- grid.add_row(
- " Status",
- Text(_stats.last_status, style="green" if "✓" in _stats.last_status else "yellow"),
- )
- return Panel(
- grid,
- title="[bold white]📚 book-ingestor[/]",
- subtitle="[dim]Drop a PDF into inbox/ to ingest · Ctrl+C to stop[/]",
- border_style="cyan",
- )
- # ── Shared ingest logic ────────────────────────────────────────────────────────
- def _run_ingest(pdf: Path) -> None:
- """Run ingestion and update stats. Works in both TTY and headless mode."""
- _stats.last_file = pdf.name
- _stats.last_status = "⏳ Processing..."
- if not IS_TTY:
- log.info("Processing: %s", pdf.name)
- success = ingest(pdf)
- if success:
- _stats.processed += 1
- _stats.last_status = "✓ Complete"
- if not IS_TTY:
- log.info("Complete: %s (processed=%d)", pdf.name, _stats.processed)
- else:
- _stats.failed += 1
- _stats.last_status = "✗ Failed"
- if not IS_TTY:
- log.error("Failed: %s (failed=%d)", pdf.name, _stats.failed)
- # ── Watchdog handler ───────────────────────────────────────────────────────────
- class InboxHandler(FileSystemEventHandler):
- def __init__(self, live: Live | None = None):
- self._live = live
- def on_created(self, event: FileSystemEvent) -> None:
- if event.is_directory:
- return
- path = Path(event.src_path)
- if path.suffix.lower() not in SUPPORTED_EXTENSIONS:
- log.debug("Ignoring unsupported file type: %s", path.name)
- return
- # Brief wait to ensure file write is complete
- time.sleep(1.0)
- if not path.exists():
- return
- _run_ingest(path)
- if self._live:
- self._live.refresh()
- # ── Setup ──────────────────────────────────────────────────────────────────────
- def _ensure_folders() -> None:
- for folder in [
- cfg.books_inbox,
- cfg.books_processing,
- cfg.books_done,
- cfg.books_manifests,
- ]:
- Path(folder).mkdir(parents=True, exist_ok=True)
- def _process_existing(live: Live | None = None) -> None:
- """Process any PDFs already sitting in inbox/ at startup."""
- inbox = Path(cfg.books_inbox)
- existing = list(inbox.glob("*.pdf"))
- if not existing:
- return
- log.info("Found %d existing file(s) in inbox — processing...", len(existing))
- for pdf in existing:
- _run_ingest(pdf)
- if live:
- live.refresh()
- # ── Entry point ────────────────────────────────────────────────────────────────
- def _shutdown(observer: Observer) -> None:
- log.info("Shutting down...")
- observer.stop()
- observer.join()
- sys.exit(0)
- def run() -> None:
- _ensure_folders()
- observer = Observer()
- if IS_TTY:
- # ── Interactive mode: Rich live dashboard ──────────────────────────────
- with Live(_build_dashboard(), console=console, refresh_per_second=2) as live:
- _process_existing(live)
- handler = InboxHandler(live)
- observer.schedule(handler, cfg.books_inbox, recursive=False)
- observer.start()
- log.info("Watching: %s", Path(cfg.books_inbox).resolve())
- signal.signal(signal.SIGINT, lambda s, f: _shutdown(observer))
- signal.signal(signal.SIGTERM, lambda s, f: _shutdown(observer))
- try:
- while observer.is_alive():
- live.update(_build_dashboard())
- time.sleep(1)
- except Exception:
- observer.stop()
- observer.join()
- raise
- else:
- # ── Headless mode: plain structured logging (Docker / no TTY) ─────────
- log.info("Headless mode — Rich dashboard disabled")
- log.info("Watching: %s", Path(cfg.books_inbox).resolve())
- _process_existing()
- handler = InboxHandler()
- observer.schedule(handler, cfg.books_inbox, recursive=False)
- observer.start()
- signal.signal(signal.SIGINT, lambda s, f: _shutdown(observer))
- signal.signal(signal.SIGTERM, lambda s, f: _shutdown(observer))
- try:
- while observer.is_alive():
- time.sleep(1)
- except Exception:
- observer.stop()
- observer.join()
- raise
- if __name__ == "__main__":
- run()
|