| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247 |
- """
- pipeline.py — orchestrates the full ingestion flow for a single PDF.
- Calls detector → chunker → summarizer → mem0_writer → manifest in order.
- """
- from __future__ import annotations
- import logging
- import shutil
- from pathlib import Path
- from rich.console import Console
- from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TaskProgressColumn
- from .config import cfg
- from .detector import detect_structure, DocumentStructure
- from .chunker import chunk_section
- from .summarizer import summarize_book, summarize_chapter, summarize_flat_document
- from .mem0_writer import (
- write_book_summary,
- write_chapter_summary,
- write_content_chunks_batch,
- )
- from .manifest import compute_file_hash, already_ingested, build_manifest, save_manifest
- log = logging.getLogger(__name__)
- console = Console()
- def ingest(pdf_path: str | Path) -> bool:
- """
- Full ingestion pipeline for a single PDF.
- Moves file through inbox → processing → done.
- Returns True on success, False on failure.
- """
- pdf_path = Path(pdf_path)
- processing_path = Path(cfg.books_processing) / pdf_path.name
- done_path = Path(cfg.books_done) / pdf_path.name
- # ── Deduplication check ──────────────────────────────────────────────────
- console.print(f"\n[bold cyan]📚 Ingesting:[/] {pdf_path.name}")
- file_hash = compute_file_hash(pdf_path)
- if already_ingested(file_hash):
- console.print("[yellow]⚠ Already ingested (hash match). Skipping.[/]")
- return True
- # ── Move to processing ───────────────────────────────────────────────────
- Path(cfg.books_processing).mkdir(parents=True, exist_ok=True)
- shutil.move(str(pdf_path), processing_path)
- log.info("Moved to processing: %s", processing_path)
- try:
- result = _run_pipeline(processing_path, file_hash)
- except Exception as e:
- log.exception("Pipeline failed for %s: %s", pdf_path.name, e)
- console.print(f"[red]✗ Pipeline failed: {e}[/]")
- # Move back to inbox so user can retry
- shutil.move(str(processing_path), pdf_path)
- return False
- # ── Archive ──────────────────────────────────────────────────────────────
- Path(cfg.books_done).mkdir(parents=True, exist_ok=True)
- shutil.move(str(processing_path), done_path)
- log.info("Archived to done: %s", done_path)
- return result
- def _run_pipeline(pdf_path: Path, file_hash: str) -> bool:
- """Inner pipeline — assumes file is already in processing/."""
- with Progress(
- SpinnerColumn(),
- TextColumn("[progress.description]{task.description}"),
- BarColumn(),
- TaskProgressColumn(),
- console=console,
- transient=True,
- ) as progress:
- # ── Step 1: Detect structure ─────────────────────────────────────────
- task = progress.add_task("Detecting structure...", total=None)
- structure = detect_structure(pdf_path)
- progress.update(task, completed=True)
- console.print(
- f" [green]✓[/] [bold]{structure.doc_type.upper()}[/] document — "
- f"[dim]{structure.title}[/]"
- )
- if structure.doc_type == "structured":
- console.print(f" [green]✓[/] {len(structure.sections)} sections detected")
- # ── Sanity check: too many sections = crappy PDF, force flat ────────
- if structure.doc_type == "structured" and len(structure.sections) > cfg.max_sections:
- log.warning(
- "%s yielded %d sections (max=%d) — looks like OCR noise, "
- "falling back to flat processing",
- structure.source_file, len(structure.sections), cfg.max_sections
- )
- console.print(
- f" [yellow]⚠ {len(structure.sections)} sections detected — "
- f"exceeds MAX_SECTIONS={cfg.max_sections}, treating as flat[/]"
- )
- structure.doc_type = "flat"
- structure.full_text = "\n\n".join(s.raw_text for s in structure.sections)
- structure.sections = []
- # ── Route to appropriate sub-pipeline ───────────────────────────────
- if structure.doc_type == "structured":
- return _structured_pipeline(pdf_path, structure, file_hash, progress)
- else:
- return _flat_pipeline(pdf_path, structure, file_hash, progress)
- def _structured_pipeline(
- pdf_path: Path,
- structure: DocumentStructure,
- file_hash: str,
- progress: Progress,
- ) -> bool:
- chapter_summary_ids: list[str] = []
- chapter_summaries: list[str] = []
- all_content_ids: list[str] = []
- # ── Step 2: Summarize chapters ───────────────────────────────────────────
- task = progress.add_task("Summarizing chapters...", total=len(structure.sections))
- for section in structure.sections:
- summary = summarize_chapter(
- book_title=structure.title,
- chapter_title=section.title,
- chapter_text=section.raw_text,
- )
- chapter_summaries.append(summary)
- mem_id = write_chapter_summary(
- title=structure.title,
- chapter_title=section.title,
- chapter_number=section.chapter_number,
- summary=summary,
- source_file=structure.source_file,
- page_start=section.page_start,
- page_end=section.page_end,
- )
- if mem_id:
- chapter_summary_ids.append(mem_id)
- progress.advance(task)
- # ── Step 3: Book-level summary ───────────────────────────────────────────
- task2 = progress.add_task("Generating book summary...", total=None)
- book_summary = summarize_book(structure.title, chapter_summaries)
- book_summary_id = write_book_summary(
- title=structure.title,
- summary=book_summary,
- source_file=structure.source_file,
- )
- progress.update(task2, completed=True)
- # ── Step 4: Chunk and store content ─────────────────────────────────────
- task3 = progress.add_task("Chunking content...", total=len(structure.sections))
- for section in structure.sections:
- chunks = chunk_section(
- text=section.raw_text,
- source_file=structure.source_file,
- section_title=section.title,
- chapter_number=section.chapter_number,
- page_start=section.page_start,
- page_end=section.page_end,
- )
- ids = write_content_chunks_batch(chunks, doc_title=structure.title)
- all_content_ids.extend(ids)
- progress.advance(task3)
- # ── Step 5: Save manifest ────────────────────────────────────────────────
- manifest = build_manifest(
- source_file=structure.source_file,
- file_hash=file_hash,
- doc_type="structured",
- doc_title=structure.title,
- chapters_detected=len(structure.sections),
- book_summary_id=book_summary_id,
- chapter_summary_ids=chapter_summary_ids,
- content_ids=all_content_ids,
- )
- save_manifest(manifest)
- _print_summary(manifest.memories, structure.title)
- return True
- def _flat_pipeline(
- pdf_path: Path,
- structure: DocumentStructure,
- file_hash: str,
- progress: Progress,
- ) -> bool:
- # ── Step 2: Summarize whole doc ──────────────────────────────────────────
- task = progress.add_task("Summarizing document...", total=None)
- doc_summary = summarize_flat_document(structure.title, structure.full_text)
- book_summary_id = write_book_summary(
- title=structure.title,
- summary=doc_summary,
- source_file=structure.source_file,
- )
- progress.update(task, completed=True)
- # ── Step 3: Chunk and store content ─────────────────────────────────────
- task2 = progress.add_task("Chunking content...", total=None)
- chunks = chunk_section(
- text=structure.full_text,
- source_file=structure.source_file,
- )
- content_ids = write_content_chunks_batch(chunks, doc_title=structure.title)
- progress.update(task2, completed=True)
- # ── Step 4: Save manifest ────────────────────────────────────────────────
- manifest = build_manifest(
- source_file=structure.source_file,
- file_hash=file_hash,
- doc_type="flat",
- doc_title=structure.title,
- chapters_detected=0,
- book_summary_id=book_summary_id,
- chapter_summary_ids=[],
- content_ids=content_ids,
- )
- save_manifest(manifest)
- _print_summary(manifest.memories, structure.title)
- return True
- def _print_summary(memories: dict, title: str) -> None:
- total = sum(memories.values())
- console.print(
- f"\n[bold green]✓ Done![/] [dim]{title}[/]\n"
- f" Memories stored: "
- f"[cyan]{memories.get('book_summary', 0)}[/] book · "
- f"[cyan]{memories.get('chapter_summary', 0)}[/] chapters · "
- f"[cyan]{memories.get('content', 0)}[/] chunks · "
- f"[bold]{total}[/] total\n"
- )
|