pipeline.py 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232
  1. """
  2. pipeline.py — orchestrates the full ingestion flow for a single PDF.
  3. Calls detector → chunker → summarizer → mem0_writer → manifest in order.
  4. """
  5. from __future__ import annotations
  6. import logging
  7. import shutil
  8. from pathlib import Path
  9. from rich.console import Console
  10. from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TaskProgressColumn
  11. from .config import cfg
  12. from .detector import detect_structure, DocumentStructure
  13. from .chunker import chunk_section
  14. from .summarizer import summarize_book, summarize_chapter, summarize_flat_document
  15. from .mem0_writer import (
  16. write_book_summary,
  17. write_chapter_summary,
  18. write_content_chunks_batch,
  19. )
  20. from .manifest import compute_file_hash, already_ingested, build_manifest, save_manifest
  21. log = logging.getLogger(__name__)
  22. console = Console()
  23. def ingest(pdf_path: str | Path) -> bool:
  24. """
  25. Full ingestion pipeline for a single PDF.
  26. Moves file through inbox → processing → done.
  27. Returns True on success, False on failure.
  28. """
  29. pdf_path = Path(pdf_path)
  30. processing_path = Path(cfg.books_processing) / pdf_path.name
  31. done_path = Path(cfg.books_done) / pdf_path.name
  32. # ── Deduplication check ──────────────────────────────────────────────────
  33. console.print(f"\n[bold cyan]📚 Ingesting:[/] {pdf_path.name}")
  34. file_hash = compute_file_hash(pdf_path)
  35. if already_ingested(file_hash):
  36. console.print("[yellow]⚠ Already ingested (hash match). Skipping.[/]")
  37. return True
  38. # ── Move to processing ───────────────────────────────────────────────────
  39. Path(cfg.books_processing).mkdir(parents=True, exist_ok=True)
  40. shutil.move(str(pdf_path), processing_path)
  41. log.info("Moved to processing: %s", processing_path)
  42. try:
  43. result = _run_pipeline(processing_path, file_hash)
  44. except Exception as e:
  45. log.exception("Pipeline failed for %s: %s", pdf_path.name, e)
  46. console.print(f"[red]✗ Pipeline failed: {e}[/]")
  47. # Move back to inbox so user can retry
  48. shutil.move(str(processing_path), pdf_path)
  49. return False
  50. # ── Archive ──────────────────────────────────────────────────────────────
  51. Path(cfg.books_done).mkdir(parents=True, exist_ok=True)
  52. shutil.move(str(processing_path), done_path)
  53. log.info("Archived to done: %s", done_path)
  54. return result
  55. def _run_pipeline(pdf_path: Path, file_hash: str) -> bool:
  56. """Inner pipeline — assumes file is already in processing/."""
  57. with Progress(
  58. SpinnerColumn(),
  59. TextColumn("[progress.description]{task.description}"),
  60. BarColumn(),
  61. TaskProgressColumn(),
  62. console=console,
  63. transient=True,
  64. ) as progress:
  65. # ── Step 1: Detect structure ─────────────────────────────────────────
  66. task = progress.add_task("Detecting structure...", total=None)
  67. structure = detect_structure(pdf_path)
  68. progress.update(task, completed=True)
  69. console.print(
  70. f" [green]✓[/] [bold]{structure.doc_type.upper()}[/] document — "
  71. f"[dim]{structure.title}[/]"
  72. )
  73. if structure.doc_type == "structured":
  74. console.print(f" [green]✓[/] {len(structure.sections)} sections detected")
  75. # ── Route to appropriate sub-pipeline ───────────────────────────────
  76. if structure.doc_type == "structured":
  77. return _structured_pipeline(pdf_path, structure, file_hash, progress)
  78. else:
  79. return _flat_pipeline(pdf_path, structure, file_hash, progress)
  80. def _structured_pipeline(
  81. pdf_path: Path,
  82. structure: DocumentStructure,
  83. file_hash: str,
  84. progress: Progress,
  85. ) -> bool:
  86. chapter_summary_ids: list[str] = []
  87. chapter_summaries: list[str] = []
  88. all_content_ids: list[str] = []
  89. # ── Step 2: Summarize chapters ───────────────────────────────────────────
  90. task = progress.add_task("Summarizing chapters...", total=len(structure.sections))
  91. for section in structure.sections:
  92. summary = summarize_chapter(
  93. book_title=structure.title,
  94. chapter_title=section.title,
  95. chapter_text=section.raw_text,
  96. )
  97. chapter_summaries.append(summary)
  98. mem_id = write_chapter_summary(
  99. title=structure.title,
  100. chapter_title=section.title,
  101. chapter_number=section.chapter_number,
  102. summary=summary,
  103. source_file=structure.source_file,
  104. page_start=section.page_start,
  105. page_end=section.page_end,
  106. )
  107. if mem_id:
  108. chapter_summary_ids.append(mem_id)
  109. progress.advance(task)
  110. # ── Step 3: Book-level summary ───────────────────────────────────────────
  111. task2 = progress.add_task("Generating book summary...", total=None)
  112. book_summary = summarize_book(structure.title, chapter_summaries)
  113. book_summary_id = write_book_summary(
  114. title=structure.title,
  115. summary=book_summary,
  116. source_file=structure.source_file,
  117. )
  118. progress.update(task2, completed=True)
  119. # ── Step 4: Chunk and store content ─────────────────────────────────────
  120. task3 = progress.add_task("Chunking content...", total=len(structure.sections))
  121. for section in structure.sections:
  122. chunks = chunk_section(
  123. text=section.raw_text,
  124. source_file=structure.source_file,
  125. section_title=section.title,
  126. chapter_number=section.chapter_number,
  127. page_start=section.page_start,
  128. page_end=section.page_end,
  129. )
  130. ids = write_content_chunks_batch(chunks, doc_title=structure.title)
  131. all_content_ids.extend(ids)
  132. progress.advance(task3)
  133. # ── Step 5: Save manifest ────────────────────────────────────────────────
  134. manifest = build_manifest(
  135. source_file=structure.source_file,
  136. file_hash=file_hash,
  137. doc_type="structured",
  138. doc_title=structure.title,
  139. chapters_detected=len(structure.sections),
  140. book_summary_id=book_summary_id,
  141. chapter_summary_ids=chapter_summary_ids,
  142. content_ids=all_content_ids,
  143. )
  144. save_manifest(manifest)
  145. _print_summary(manifest.memories, structure.title)
  146. return True
  147. def _flat_pipeline(
  148. pdf_path: Path,
  149. structure: DocumentStructure,
  150. file_hash: str,
  151. progress: Progress,
  152. ) -> bool:
  153. # ── Step 2: Summarize whole doc ──────────────────────────────────────────
  154. task = progress.add_task("Summarizing document...", total=None)
  155. doc_summary = summarize_flat_document(structure.title, structure.full_text)
  156. book_summary_id = write_book_summary(
  157. title=structure.title,
  158. summary=doc_summary,
  159. source_file=structure.source_file,
  160. )
  161. progress.update(task, completed=True)
  162. # ── Step 3: Chunk and store content ─────────────────────────────────────
  163. task2 = progress.add_task("Chunking content...", total=None)
  164. chunks = chunk_section(
  165. text=structure.full_text,
  166. source_file=structure.source_file,
  167. )
  168. content_ids = write_content_chunks_batch(chunks, doc_title=structure.title)
  169. progress.update(task2, completed=True)
  170. # ── Step 4: Save manifest ────────────────────────────────────────────────
  171. manifest = build_manifest(
  172. source_file=structure.source_file,
  173. file_hash=file_hash,
  174. doc_type="flat",
  175. doc_title=structure.title,
  176. chapters_detected=0,
  177. book_summary_id=book_summary_id,
  178. chapter_summary_ids=[],
  179. content_ids=content_ids,
  180. )
  181. save_manifest(manifest)
  182. _print_summary(manifest.memories, structure.title)
  183. return True
  184. def _print_summary(memories: dict, title: str) -> None:
  185. total = sum(memories.values())
  186. console.print(
  187. f"\n[bold green]✓ Done![/] [dim]{title}[/]\n"
  188. f" Memories stored: "
  189. f"[cyan]{memories.get('book_summary', 0)}[/] book · "
  190. f"[cyan]{memories.get('chapter_summary', 0)}[/] chapters · "
  191. f"[cyan]{memories.get('content', 0)}[/] chunks · "
  192. f"[bold]{total}[/] total\n"
  193. )