""" pipeline.py — orchestrates the full ingestion flow for a single PDF. Calls detector → chunker → summarizer → mem0_writer → manifest in order. """ from __future__ import annotations import logging import shutil from pathlib import Path from rich.console import Console from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TaskProgressColumn from .config import cfg from .detector import detect_structure, DocumentStructure from .chunker import chunk_section from .summarizer import summarize_book, summarize_chapter, summarize_flat_document from .mem0_writer import ( write_book_summary, write_chapter_summary, write_content_chunks_batch, ) from .manifest import compute_file_hash, already_ingested, build_manifest, save_manifest log = logging.getLogger(__name__) console = Console() def ingest(pdf_path: str | Path) -> bool: """ Full ingestion pipeline for a single PDF. Moves file through inbox → processing → done. Returns True on success, False on failure. """ pdf_path = Path(pdf_path) processing_path = Path(cfg.books_processing) / pdf_path.name done_path = Path(cfg.books_done) / pdf_path.name # ── Deduplication check ────────────────────────────────────────────────── console.print(f"\n[bold cyan]📚 Ingesting:[/] {pdf_path.name}") file_hash = compute_file_hash(pdf_path) if already_ingested(file_hash): console.print("[yellow]⚠ Already ingested (hash match). Skipping.[/]") return True # ── Move to processing ─────────────────────────────────────────────────── Path(cfg.books_processing).mkdir(parents=True, exist_ok=True) shutil.move(str(pdf_path), processing_path) log.info("Moved to processing: %s", processing_path) try: result = _run_pipeline(processing_path, file_hash) except Exception as e: log.exception("Pipeline failed for %s: %s", pdf_path.name, e) console.print(f"[red]✗ Pipeline failed: {e}[/]") # Move back to inbox so user can retry shutil.move(str(processing_path), pdf_path) return False # ── Archive ────────────────────────────────────────────────────────────── Path(cfg.books_done).mkdir(parents=True, exist_ok=True) shutil.move(str(processing_path), done_path) log.info("Archived to done: %s", done_path) return result def _run_pipeline(pdf_path: Path, file_hash: str) -> bool: """Inner pipeline — assumes file is already in processing/.""" with Progress( SpinnerColumn(), TextColumn("[progress.description]{task.description}"), BarColumn(), TaskProgressColumn(), console=console, transient=True, ) as progress: # ── Step 1: Detect structure ───────────────────────────────────────── task = progress.add_task("Detecting structure...", total=None) structure = detect_structure(pdf_path) progress.update(task, completed=True) console.print( f" [green]✓[/] [bold]{structure.doc_type.upper()}[/] document — " f"[dim]{structure.title}[/]" ) if structure.doc_type == "structured": console.print(f" [green]✓[/] {len(structure.sections)} sections detected") # ── Route to appropriate sub-pipeline ─────────────────────────────── if structure.doc_type == "structured": return _structured_pipeline(pdf_path, structure, file_hash, progress) else: return _flat_pipeline(pdf_path, structure, file_hash, progress) def _structured_pipeline( pdf_path: Path, structure: DocumentStructure, file_hash: str, progress: Progress, ) -> bool: chapter_summary_ids: list[str] = [] chapter_summaries: list[str] = [] all_content_ids: list[str] = [] # ── Step 2: Summarize chapters ─────────────────────────────────────────── task = progress.add_task("Summarizing chapters...", total=len(structure.sections)) for section in structure.sections: summary = summarize_chapter( book_title=structure.title, chapter_title=section.title, chapter_text=section.raw_text, ) chapter_summaries.append(summary) mem_id = write_chapter_summary( title=structure.title, chapter_title=section.title, chapter_number=section.chapter_number, summary=summary, source_file=structure.source_file, page_start=section.page_start, page_end=section.page_end, ) if mem_id: chapter_summary_ids.append(mem_id) progress.advance(task) # ── Step 3: Book-level summary ─────────────────────────────────────────── task2 = progress.add_task("Generating book summary...", total=None) book_summary = summarize_book(structure.title, chapter_summaries) book_summary_id = write_book_summary( title=structure.title, summary=book_summary, source_file=structure.source_file, ) progress.update(task2, completed=True) # ── Step 4: Chunk and store content ───────────────────────────────────── task3 = progress.add_task("Chunking content...", total=len(structure.sections)) for section in structure.sections: chunks = chunk_section( text=section.raw_text, source_file=structure.source_file, section_title=section.title, chapter_number=section.chapter_number, page_start=section.page_start, page_end=section.page_end, ) ids = write_content_chunks_batch(chunks, doc_title=structure.title) all_content_ids.extend(ids) progress.advance(task3) # ── Step 5: Save manifest ──────────────────────────────────────────────── manifest = build_manifest( source_file=structure.source_file, file_hash=file_hash, doc_type="structured", doc_title=structure.title, chapters_detected=len(structure.sections), book_summary_id=book_summary_id, chapter_summary_ids=chapter_summary_ids, content_ids=all_content_ids, ) save_manifest(manifest) _print_summary(manifest.memories, structure.title) return True def _flat_pipeline( pdf_path: Path, structure: DocumentStructure, file_hash: str, progress: Progress, ) -> bool: # ── Step 2: Summarize whole doc ────────────────────────────────────────── task = progress.add_task("Summarizing document...", total=None) doc_summary = summarize_flat_document(structure.title, structure.full_text) book_summary_id = write_book_summary( title=structure.title, summary=doc_summary, source_file=structure.source_file, ) progress.update(task, completed=True) # ── Step 3: Chunk and store content ───────────────────────────────────── task2 = progress.add_task("Chunking content...", total=None) chunks = chunk_section( text=structure.full_text, source_file=structure.source_file, ) content_ids = write_content_chunks_batch(chunks, doc_title=structure.title) progress.update(task2, completed=True) # ── Step 4: Save manifest ──────────────────────────────────────────────── manifest = build_manifest( source_file=structure.source_file, file_hash=file_hash, doc_type="flat", doc_title=structure.title, chapters_detected=0, book_summary_id=book_summary_id, chapter_summary_ids=[], content_ids=content_ids, ) save_manifest(manifest) _print_summary(manifest.memories, structure.title) return True def _print_summary(memories: dict, title: str) -> None: total = sum(memories.values()) console.print( f"\n[bold green]✓ Done![/] [dim]{title}[/]\n" f" Memories stored: " f"[cyan]{memories.get('book_summary', 0)}[/] book · " f"[cyan]{memories.get('chapter_summary', 0)}[/] chapters · " f"[cyan]{memories.get('content', 0)}[/] chunks · " f"[bold]{total}[/] total\n" )