watchdog_runner.py 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. """
  2. watchdog_runner.py — watches books/inbox/ and triggers the pipeline.
  3. Entry point: python -m book_ingestor.watchdog_runner
  4. Uses the watchdog library for filesystem events.
  5. Rich provides a live terminal dashboard when running interactively.
  6. In headless/container environments (no TTY), falls back to plain log output.
  7. """
  8. from __future__ import annotations
  9. import logging
  10. import signal
  11. import sys
  12. import time
  13. from pathlib import Path
  14. from rich.console import Console
  15. from rich.live import Live
  16. from rich.panel import Panel
  17. from rich.table import Table
  18. from rich.text import Text
  19. from watchdog.events import FileSystemEvent, FileSystemEventHandler
  20. from watchdog.observers import Observer
  21. from .config import cfg
  22. from .pipeline import ingest
  23. # ── Logging setup ──────────────────────────────────────────────────────────────
  24. logging.basicConfig(
  25. level=getattr(logging, cfg.log_level.upper(), logging.INFO),
  26. format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
  27. datefmt="%H:%M:%S",
  28. )
  29. log = logging.getLogger(__name__)
  30. console = Console()
  31. # True when running in a real interactive terminal, False in Docker/headless
  32. IS_TTY = sys.stdout.isatty()
  33. SUPPORTED_EXTENSIONS = {".pdf"} # .md, .txt, .epub coming in phase 2
  34. # ── Stats tracker ──────────────────────────────────────────────────────────────
  35. class Stats:
  36. def __init__(self):
  37. self.watched = str(Path(cfg.books_inbox).resolve())
  38. self.processed = 0
  39. self.failed = 0
  40. self.skipped = 0
  41. self.last_file: str = "—"
  42. self.last_status: str = "—"
  43. self.start_time = time.time()
  44. def uptime(self) -> str:
  45. elapsed = int(time.time() - self.start_time)
  46. h, m, s = elapsed // 3600, (elapsed % 3600) // 60, elapsed % 60
  47. return f"{h:02d}:{m:02d}:{s:02d}"
  48. _stats = Stats()
  49. # ── Dashboard (TTY only) ───────────────────────────────────────────────────────
  50. def _build_dashboard() -> Panel:
  51. grid = Table.grid(padding=(0, 2))
  52. grid.add_column(style="bold cyan", min_width=18)
  53. grid.add_column()
  54. grid.add_row("📂 Watching", _stats.watched)
  55. grid.add_row("⏱ Uptime", _stats.uptime())
  56. grid.add_row("✅ Processed", str(_stats.processed))
  57. grid.add_row("⚠️ Failed", str(_stats.failed))
  58. grid.add_row("⏭ Skipped", str(_stats.skipped))
  59. grid.add_row("📄 Last file", _stats.last_file)
  60. grid.add_row(
  61. " Status",
  62. Text(_stats.last_status, style="green" if "✓" in _stats.last_status else "yellow"),
  63. )
  64. return Panel(
  65. grid,
  66. title="[bold white]📚 book-ingestor[/]",
  67. subtitle="[dim]Drop a PDF into inbox/ to ingest · Ctrl+C to stop[/]",
  68. border_style="cyan",
  69. )
  70. # ── Shared ingest logic ────────────────────────────────────────────────────────
  71. def _run_ingest(pdf: Path) -> None:
  72. """Run ingestion and update stats. Works in both TTY and headless mode."""
  73. _stats.last_file = pdf.name
  74. _stats.last_status = "⏳ Processing..."
  75. if not IS_TTY:
  76. log.info("Processing: %s", pdf.name)
  77. success = ingest(pdf)
  78. if success:
  79. _stats.processed += 1
  80. _stats.last_status = "✓ Complete"
  81. if not IS_TTY:
  82. log.info("Complete: %s (processed=%d)", pdf.name, _stats.processed)
  83. else:
  84. _stats.failed += 1
  85. _stats.last_status = "✗ Failed"
  86. if not IS_TTY:
  87. log.error("Failed: %s (failed=%d)", pdf.name, _stats.failed)
  88. # ── Watchdog handler ───────────────────────────────────────────────────────────
  89. class InboxHandler(FileSystemEventHandler):
  90. def __init__(self, live: Live | None = None):
  91. self._live = live
  92. def on_created(self, event: FileSystemEvent) -> None:
  93. if event.is_directory:
  94. return
  95. path = Path(event.src_path)
  96. if path.suffix.lower() not in SUPPORTED_EXTENSIONS:
  97. log.debug("Ignoring unsupported file type: %s", path.name)
  98. return
  99. # Brief wait to ensure file write is complete
  100. time.sleep(1.0)
  101. if not path.exists():
  102. return
  103. _run_ingest(path)
  104. if self._live:
  105. self._live.refresh()
  106. # ── Setup ──────────────────────────────────────────────────────────────────────
  107. def _ensure_folders() -> None:
  108. for folder in [
  109. cfg.books_inbox,
  110. cfg.books_processing,
  111. cfg.books_done,
  112. cfg.books_manifests,
  113. ]:
  114. Path(folder).mkdir(parents=True, exist_ok=True)
  115. def _process_existing(live: Live | None = None) -> None:
  116. """Process any PDFs already sitting in inbox/ at startup."""
  117. inbox = Path(cfg.books_inbox)
  118. existing = list(inbox.glob("*.pdf"))
  119. if not existing:
  120. return
  121. log.info("Found %d existing file(s) in inbox — processing...", len(existing))
  122. for pdf in existing:
  123. _run_ingest(pdf)
  124. if live:
  125. live.refresh()
  126. # ── Entry point ────────────────────────────────────────────────────────────────
  127. def _shutdown(observer: Observer) -> None:
  128. log.info("Shutting down...")
  129. observer.stop()
  130. observer.join()
  131. sys.exit(0)
  132. def run() -> None:
  133. _ensure_folders()
  134. observer = Observer()
  135. if IS_TTY:
  136. # ── Interactive mode: Rich live dashboard ──────────────────────────────
  137. with Live(_build_dashboard(), console=console, refresh_per_second=2) as live:
  138. _process_existing(live)
  139. handler = InboxHandler(live)
  140. observer.schedule(handler, cfg.books_inbox, recursive=False)
  141. observer.start()
  142. log.info("Watching: %s", Path(cfg.books_inbox).resolve())
  143. signal.signal(signal.SIGINT, lambda s, f: _shutdown(observer))
  144. signal.signal(signal.SIGTERM, lambda s, f: _shutdown(observer))
  145. try:
  146. while observer.is_alive():
  147. live.update(_build_dashboard())
  148. time.sleep(1)
  149. except Exception:
  150. observer.stop()
  151. observer.join()
  152. raise
  153. else:
  154. # ── Headless mode: plain structured logging (Docker / no TTY) ─────────
  155. log.info("Headless mode — Rich dashboard disabled")
  156. log.info("Watching: %s", Path(cfg.books_inbox).resolve())
  157. _process_existing()
  158. handler = InboxHandler()
  159. observer.schedule(handler, cfg.books_inbox, recursive=False)
  160. observer.start()
  161. signal.signal(signal.SIGINT, lambda s, f: _shutdown(observer))
  162. signal.signal(signal.SIGTERM, lambda s, f: _shutdown(observer))
  163. try:
  164. while observer.is_alive():
  165. time.sleep(1)
  166. except Exception:
  167. observer.stop()
  168. observer.join()
  169. raise
  170. if __name__ == "__main__":
  171. run()