Lukas Goldschmidt 2 giorni fa
parent
commit
589d19ae78

+ 19 - 0
.env.example

@@ -0,0 +1,19 @@
+# mem0 server (your LAN address)
+MEM0_BASE_URL=http://192.168.0.200:8420
+MEM0_AGENT_ID=knowledge_base
+
+# Groq
+GROQ_API_KEY=your_groq_key_here
+GROQ_MODEL=meta-llama/llama-4-scout-17b-16e-instruct
+
+# Folders (defaults work out of the box)
+BOOKS_INBOX=./books/inbox
+BOOKS_PROCESSING=./books/processing
+BOOKS_DONE=./books/done
+BOOKS_MANIFESTS=./books/manifests
+
+# Chunking
+CHUNK_SIZE_TOKENS=350
+
+# Logging: DEBUG | INFO | WARNING
+LOG_LEVEL=INFO

+ 28 - 0
Dockerfile

@@ -0,0 +1,28 @@
+# ── Build stage ────────────────────────────────────────────────────────────────
+FROM python:3.11-slim AS base
+
+# System deps for PyMuPDF
+RUN apt-get update && apt-get install -y --no-install-recommends \
+    libmupdf-dev \
+    && rm -rf /var/lib/apt/lists/*
+
+WORKDIR /app
+
+# Install Python deps first (layer cache)
+COPY requirements.txt .
+RUN pip install --no-cache-dir -r requirements.txt
+
+# Copy package
+COPY book_ingestor/ ./book_ingestor/
+
+# ── Runtime ─────────────────────────────────────────────────────────────────────
+# books/ is mounted at runtime — not baked in
+RUN mkdir -p books/inbox books/processing books/done books/manifests
+
+# Non-root user for safety
+RUN useradd -m -u 1000 ingestor && chown -R ingestor:ingestor /app
+USER ingestor
+
+ENV PYTHONUNBUFFERED=1
+
+CMD ["python", "-m", "book_ingestor.watchdog_runner"]

+ 33 - 1
README.md

@@ -41,10 +41,39 @@ Memories are stored in layers:
 
 ## Quick Start
 
+### With Docker (recommended)
+
+```bash
+git clone https://github.com/yourname/book-ingestor.git
+cd book-ingestor
+cp .env.example .env        # fill in your values
+docker compose up -d --build
+```
+
+Watch logs:
+```bash
+docker compose logs -f
+```
+
+Stop / restart:
+```bash
+docker compose down
+docker compose up -d
+```
+
+If a PDF gets stuck in `books/processing/` after an interrupted run:
+```bash
+mv books/processing/*.pdf books/inbox/
+docker compose restart
+```
+
+### Without Docker
+
 ```bash
 git clone https://github.com/yourname/book-ingestor.git
 cd book-ingestor
 cp .env.example .env        # fill in your values
+python -m venv venv && source venv/bin/activate
 pip install -r requirements.txt
 python -m book_ingestor.watchdog_runner
 ```
@@ -90,6 +119,8 @@ book-ingestor/
 │   ├── mem0_writer.py
 │   ├── manifest.py
 │   └── config.py
+├── Dockerfile
+├── docker-compose.yml
 ├── .env.example
 ├── requirements.txt
 ├── PROJECT.md
@@ -114,7 +145,8 @@ book-ingestor/
 
 - This project is **completely independent** of OpenClaw or any specific AI agent — it only talks to mem0.
 - Any machine on the LAN with network access to your mem0 server can run this.
-- Docker support is planned for a future release.
+- The `books/` folder is mounted into the container — PDFs, manifests and archives survive restarts and rebuilds.
+- `network_mode: host` is used so the container can reach your LAN mem0 server without extra networking config.
 
 ---
 

+ 181 - 0
book_ingestor/chunker.py

@@ -0,0 +1,181 @@
+"""
+chunker.py — splits text into token-sized chunks, purely in Python.
+No LLM calls. Uses tiktoken for accurate token counting.
+
+Strategy:
+  - Split on paragraph boundaries first (double newline)
+  - If a paragraph exceeds chunk_size, split on sentence boundaries
+  - If a sentence exceeds chunk_size, hard-split on token count
+  - Chunks carry their source metadata (page range, section title)
+"""
+
+from __future__ import annotations
+
+import re
+from dataclasses import dataclass
+
+import tiktoken
+
+from .config import cfg
+
+# Use cl100k_base — matches most modern LLMs well enough for counting
+_ENCODER = tiktoken.get_encoding("cl100k_base")
+
+
+# ── Data model ─────────────────────────────────────────────────────────────────
+
+@dataclass
+class Chunk:
+    text: str
+    token_count: int
+    source_file: str
+    section_title: str | None  # None for flat docs
+    chapter_number: int | None
+    page_start: int
+    page_end: int
+    chunk_index: int           # position within parent section
+
+
+# ── Public API ─────────────────────────────────────────────────────────────────
+
+def chunk_section(
+    text: str,
+    source_file: str,
+    section_title: str | None = None,
+    chapter_number: int | None = None,
+    page_start: int = 0,
+    page_end: int = 0,
+    chunk_size: int | None = None,
+) -> list[Chunk]:
+    """
+    Chunk a block of text into token-sized pieces.
+    Returns a list of Chunk objects with metadata attached.
+    """
+    size = chunk_size or cfg.chunk_size_tokens
+    paragraphs = _split_paragraphs(text)
+    raw_chunks = _build_chunks(paragraphs, size)
+
+    return [
+        Chunk(
+            text=raw,
+            token_count=count_tokens(raw),
+            source_file=source_file,
+            section_title=section_title,
+            chapter_number=chapter_number,
+            page_start=page_start,
+            page_end=page_end,
+            chunk_index=idx,
+        )
+        for idx, raw in enumerate(raw_chunks)
+        if raw.strip()
+    ]
+
+
+def count_tokens(text: str) -> int:
+    """Count tokens in a string using tiktoken."""
+    return len(_ENCODER.encode(text))
+
+
+# ── Internal helpers ───────────────────────────────────────────────────────────
+
+def _split_paragraphs(text: str) -> list[str]:
+    """Split on blank lines, clean up whitespace."""
+    paragraphs = re.split(r"\n\s*\n", text)
+    return [p.strip() for p in paragraphs if p.strip()]
+
+
+def _split_sentences(text: str) -> list[str]:
+    """
+    Rough sentence splitter — handles common abbreviations.
+    Good enough for chunking purposes without an NLP library.
+    """
+    # Protect common abbreviations from splitting
+    protected = re.sub(
+        r"\b(Mr|Mrs|Ms|Dr|Prof|Sr|Jr|vs|etc|approx|dept|est|fig|govt|inc|ltd|no|vol)\.",
+        r"\1<DOT>",
+        text,
+        flags=re.IGNORECASE,
+    )
+    # Split on sentence-ending punctuation followed by whitespace + capital
+    sentences = re.split(r"(?<=[.!?])\s+(?=[A-Z\"\'])", protected)
+    # Restore protected dots
+    return [s.replace("<DOT>", ".").strip() for s in sentences if s.strip()]
+
+
+def _build_chunks(paragraphs: list[str], max_tokens: int) -> list[str]:
+    """
+    Greedily build chunks by accumulating paragraphs.
+    Respects max_tokens boundary, spilling over to sentences then hard splits.
+    """
+    chunks: list[str] = []
+    current_parts: list[str] = []
+    current_tokens = 0
+
+    for para in paragraphs:
+        para_tokens = count_tokens(para)
+
+        if para_tokens > max_tokens:
+            # Paragraph too big — flush current buffer, then split paragraph
+            if current_parts:
+                chunks.append(" ".join(current_parts))
+                current_parts, current_tokens = [], 0
+            chunks.extend(_split_large_paragraph(para, max_tokens))
+            continue
+
+        if current_tokens + para_tokens > max_tokens:
+            # Would exceed limit — flush and start fresh
+            if current_parts:
+                chunks.append(" ".join(current_parts))
+            current_parts = [para]
+            current_tokens = para_tokens
+        else:
+            current_parts.append(para)
+            current_tokens += para_tokens
+
+    if current_parts:
+        chunks.append(" ".join(current_parts))
+
+    return chunks
+
+
+def _split_large_paragraph(para: str, max_tokens: int) -> list[str]:
+    """Split an oversized paragraph at sentence boundaries."""
+    sentences = _split_sentences(para)
+    chunks: list[str] = []
+    current_parts: list[str] = []
+    current_tokens = 0
+
+    for sent in sentences:
+        sent_tokens = count_tokens(sent)
+
+        if sent_tokens > max_tokens:
+            # Single sentence too long — hard split by tokens
+            if current_parts:
+                chunks.append(" ".join(current_parts))
+                current_parts, current_tokens = [], 0
+            chunks.extend(_hard_split(sent, max_tokens))
+            continue
+
+        if current_tokens + sent_tokens > max_tokens:
+            if current_parts:
+                chunks.append(" ".join(current_parts))
+            current_parts = [sent]
+            current_tokens = sent_tokens
+        else:
+            current_parts.append(sent)
+            current_tokens += sent_tokens
+
+    if current_parts:
+        chunks.append(" ".join(current_parts))
+
+    return chunks
+
+
+def _hard_split(text: str, max_tokens: int) -> list[str]:
+    """Last resort: split a string into max_tokens-sized pieces by token index."""
+    tokens = _ENCODER.encode(text)
+    chunks = []
+    for i in range(0, len(tokens), max_tokens):
+        chunk_tokens = tokens[i: i + max_tokens]
+        chunks.append(_ENCODER.decode(chunk_tokens))
+    return chunks

+ 58 - 0
book_ingestor/config.py

@@ -0,0 +1,58 @@
+"""
+config.py — loads .env and exposes typed settings for the entire project.
+No LLM calls. No side effects. Just config.
+"""
+
+import os
+from dataclasses import dataclass
+from dotenv import load_dotenv
+
+load_dotenv()
+
+
+@dataclass(frozen=True)
+class Config:
+    # mem0 server
+    mem0_base_url: str
+    mem0_agent_id: str
+
+    # Groq
+    groq_api_key: str
+    groq_model: str
+
+    # Folder paths
+    books_inbox: str
+    books_processing: str
+    books_done: str
+    books_manifests: str
+
+    # Chunking
+    chunk_size_tokens: int
+
+    # Logging
+    log_level: str
+
+
+def load_config() -> Config:
+    def require(key: str) -> str:
+        val = os.getenv(key)
+        if not val:
+            raise EnvironmentError(f"Missing required env var: {key}")
+        return val
+
+    return Config(
+        mem0_base_url=require("MEM0_BASE_URL").rstrip("/"),
+        mem0_agent_id=os.getenv("MEM0_AGENT_ID", "knowledge_base"),
+        groq_api_key=require("GROQ_API_KEY"),
+        groq_model=os.getenv("GROQ_MODEL", "meta-llama/llama-4-scout-17b-16e-instruct"),
+        books_inbox=os.getenv("BOOKS_INBOX", "./books/inbox"),
+        books_processing=os.getenv("BOOKS_PROCESSING", "./books/processing"),
+        books_done=os.getenv("BOOKS_DONE", "./books/done"),
+        books_manifests=os.getenv("BOOKS_MANIFESTS", "./books/manifests"),
+        chunk_size_tokens=int(os.getenv("CHUNK_SIZE_TOKENS", "350")),
+        log_level=os.getenv("LOG_LEVEL", "INFO"),
+    )
+
+
+# Singleton — import this everywhere
+cfg = load_config()

+ 272 - 0
book_ingestor/detector.py

@@ -0,0 +1,272 @@
+"""
+detector.py — analyses a PDF and determines its structure purely via PyMuPDF.
+No LLM calls. Font sizes, bold flags, and text positioning do all the work.
+
+Returns a DocumentStructure describing:
+  - whether the doc is structured (has chapters/headings) or flat
+  - extracted chapters with their page ranges and raw text
+  - document title (best guess)
+"""
+
+from __future__ import annotations
+
+import re
+from dataclasses import dataclass, field
+from pathlib import Path
+
+import fitz  # PyMuPDF
+
+
+# ── Data models ────────────────────────────────────────────────────────────────
+
+@dataclass
+class Section:
+    title: str
+    chapter_number: int | None   # None for flat docs
+    page_start: int
+    page_end: int
+    raw_text: str                 # full extracted text for this section
+
+
+@dataclass
+class DocumentStructure:
+    source_file: str
+    doc_type: str                 # "structured" | "flat"
+    title: str
+    sections: list[Section] = field(default_factory=list)
+    full_text: str = ""           # populated for flat docs
+
+
+# ── Constants ──────────────────────────────────────────────────────────────────
+
+# Heading candidates: bold text significantly larger than body font
+HEADING_FONT_RATIO = 1.15        # heading must be ≥ 15% larger than median body size
+MIN_HEADING_LENGTH = 3
+MAX_HEADING_LENGTH = 120
+MIN_SECTIONS_FOR_STRUCTURED = 2  # need at least 2 headings to call it structured
+
+# Patterns that strongly suggest a chapter heading
+CHAPTER_PATTERNS = [
+    re.compile(r"^\s*(chapter|part|section|unit|lesson)\s+[\dIVXivx]+", re.IGNORECASE),
+    re.compile(r"^\s*[\dIVXivx]+[\.\)]\s+\w"),   # "1. Introduction" or "IV) Conclusion"
+]
+
+
+# ── Main entry point ───────────────────────────────────────────────────────────
+
+def detect_structure(pdf_path: str | Path) -> DocumentStructure:
+    """
+    Analyse a PDF and return a DocumentStructure.
+    Always succeeds — falls back to flat doc if structure detection fails.
+    """
+    path = Path(pdf_path)
+    doc = fitz.open(str(path))
+
+    try:
+        title = _extract_title(doc)
+        body_font_size = _median_body_font_size(doc)
+        headings = _extract_headings(doc, body_font_size)
+
+        if len(headings) >= MIN_SECTIONS_FOR_STRUCTURED:
+            sections = _build_sections(doc, headings)
+            return DocumentStructure(
+                source_file=path.name,
+                doc_type="structured",
+                title=title,
+                sections=sections,
+            )
+        else:
+            full_text = _extract_full_text(doc)
+            return DocumentStructure(
+                source_file=path.name,
+                doc_type="flat",
+                title=title,
+                full_text=full_text,
+            )
+    finally:
+        doc.close()
+
+
+# ── Internal helpers ───────────────────────────────────────────────────────────
+
+def _extract_title(doc: fitz.Document) -> str:
+    """Try PDF metadata first, then largest text on page 1."""
+    meta_title = (doc.metadata or {}).get("title", "").strip()
+    if meta_title and len(meta_title) > 3:
+        return meta_title
+
+    # Fallback: largest font on first page
+    if doc.page_count == 0:
+        return "Unknown Document"
+
+    page = doc[0]
+    blocks = page.get_text("dict")["blocks"]
+    candidates = []
+
+    for block in blocks:
+        if block.get("type") != 0:  # type 0 = text
+            continue
+        for line in block.get("lines", []):
+            for span in line.get("spans", []):
+                text = span["text"].strip()
+                if MIN_HEADING_LENGTH < len(text) < MAX_HEADING_LENGTH:
+                    candidates.append((span["size"], text))
+
+    if candidates:
+        candidates.sort(reverse=True)
+        return candidates[0][1]
+
+    return Path(doc.name).stem.replace("_", " ").replace("-", " ").title()
+
+
+def _median_body_font_size(doc: fitz.Document) -> float:
+    """
+    Compute the median font size across the document.
+    This represents 'body text' since it will dominate page count.
+    Sample up to 10 pages for speed.
+    """
+    sizes = []
+    sample_pages = min(doc.page_count, 10)
+
+    for page_num in range(sample_pages):
+        page = doc[page_num]
+        for block in page.get_text("dict")["blocks"]:
+            if block.get("type") != 0:
+                continue
+            for line in block.get("lines", []):
+                for span in line.get("spans", []):
+                    if span["text"].strip():
+                        sizes.append(span["size"])
+
+    if not sizes:
+        return 12.0
+
+    sizes.sort()
+    return sizes[len(sizes) // 2]
+
+
+def _is_heading_span(span: dict, body_size: float) -> bool:
+    """Heuristic: is this text span a section heading?"""
+    text = span["text"].strip()
+
+    if not (MIN_HEADING_LENGTH <= len(text) <= MAX_HEADING_LENGTH):
+        return False
+
+    # Must be larger than body OR explicitly bold with any size increase
+    size = span["size"]
+    flags = span.get("flags", 0)
+    is_bold = bool(flags & 2**4)  # PyMuPDF bold flag
+    is_larger = size >= body_size * HEADING_FONT_RATIO
+
+    if not (is_larger or (is_bold and size >= body_size)):
+        return False
+
+    # Filter out obvious non-headings
+    if text.endswith(",") or text.endswith(";"):
+        return False
+    if sum(1 for c in text if c.isupper()) / max(len(text), 1) > 0.85:
+        # ALL CAPS long sentences are likely decorative, not headings
+        if len(text) > 40:
+            return False
+
+    return True
+
+
+def _extract_headings(doc: fitz.Document, body_size: float) -> list[dict]:
+    """
+    Walk every page and collect candidate headings with their page number.
+    Returns list of {page, text, size, y_pos}
+    """
+    headings = []
+    seen_texts = set()
+
+    for page_num in range(doc.page_count):
+        page = doc[page_num]
+        for block in page.get_text("dict")["blocks"]:
+            if block.get("type") != 0:
+                continue
+            for line in block.get("lines", []):
+                for span in line.get("spans", []):
+                    text = span["text"].strip()
+                    if _is_heading_span(span, body_size) and text not in seen_texts:
+                        headings.append({
+                            "page": page_num,
+                            "text": text,
+                            "size": span["size"],
+                            "y": span["origin"][1],
+                        })
+                        seen_texts.add(text)
+                        break  # one heading per line is enough
+
+    return headings
+
+
+def _extract_page_text(doc: fitz.Document, page_num: int) -> str:
+    """Extract clean plain text from a single page."""
+    return doc[page_num].get_text("text").strip()
+
+
+def _build_sections(doc: fitz.Document, headings: list[dict]) -> list[Section]:
+    """
+    Given a list of headings with page numbers, build Section objects
+    by extracting text between consecutive headings.
+    """
+    sections = []
+
+    for idx, heading in enumerate(headings):
+        start_page = heading["page"]
+        end_page = headings[idx + 1]["page"] - 1 if idx + 1 < len(headings) else doc.page_count - 1
+        end_page = max(start_page, end_page)
+
+        raw_text_parts = []
+        for p in range(start_page, end_page + 1):
+            raw_text_parts.append(_extract_page_text(doc, p))
+
+        # Try to detect chapter number from heading text
+        chapter_num = _parse_chapter_number(heading["text"], idx)
+
+        sections.append(Section(
+            title=heading["text"],
+            chapter_number=chapter_num,
+            page_start=start_page + 1,  # 1-indexed for humans
+            page_end=end_page + 1,
+            raw_text="\n".join(raw_text_parts).strip(),
+        ))
+
+    return sections
+
+
+def _parse_chapter_number(text: str, fallback: int) -> int:
+    """Try to extract a numeric chapter number from a heading string."""
+    # "Chapter 3", "3.", "III."
+    match = re.search(r"\b(\d+)\b", text)
+    if match:
+        return int(match.group(1))
+
+    roman = re.search(r"\b(I{1,3}|IV|V|VI{0,3}|IX|X{1,3})\b", text)
+    if roman:
+        return _roman_to_int(roman.group(1))
+
+    return fallback + 1
+
+
+def _roman_to_int(s: str) -> int:
+    values = {"I": 1, "V": 5, "X": 10, "L": 50, "C": 100, "D": 500, "M": 1000}
+    s = s.upper()
+    total = 0
+    for i, c in enumerate(s):
+        if i + 1 < len(s) and values[c] < values[s[i + 1]]:
+            total -= values[c]
+        else:
+            total += values[c]
+    return total
+
+
+def _extract_full_text(doc: fitz.Document) -> str:
+    """Extract all text from the document for flat processing."""
+    parts = []
+    for page_num in range(doc.page_count):
+        text = _extract_page_text(doc, page_num)
+        if text:
+            parts.append(text)
+    return "\n\n".join(parts)

+ 97 - 0
book_ingestor/manifest.py

@@ -0,0 +1,97 @@
+"""
+manifest.py — records every ingested file as a JSON manifest.
+Enables safe re-ingestion, deletion, and status checks.
+"""
+
+from __future__ import annotations
+
+import json
+import hashlib
+import logging
+from dataclasses import dataclass, field, asdict
+from datetime import datetime, timezone
+from pathlib import Path
+
+from .config import cfg
+
+log = logging.getLogger(__name__)
+
+
+@dataclass
+class Manifest:
+    source_file: str
+    file_hash: str             # SHA-256 of original PDF
+    ingested_at: str
+    doc_type: str              # "structured" | "flat"
+    doc_title: str
+    chapters_detected: int
+    memories: dict             # { "book_summary": 1, "chapter_summary": N, "content": M }
+    memory_ids: list[str]      # all mem0 IDs — for future deletion
+    status: str                # "complete" | "partial" | "failed"
+
+
+def compute_file_hash(path: str | Path) -> str:
+    sha = hashlib.sha256()
+    with open(path, "rb") as f:
+        for chunk in iter(lambda: f.read(65536), b""):
+            sha.update(chunk)
+    return sha.hexdigest()
+
+
+def already_ingested(file_hash: str) -> bool:
+    """Check if a file with this hash has already been ingested successfully."""
+    manifests_dir = Path(cfg.books_manifests)
+    for manifest_file in manifests_dir.glob("*.json"):
+        try:
+            data = json.loads(manifest_file.read_text())
+            if data.get("file_hash") == file_hash and data.get("status") == "complete":
+                log.info("Skipping already-ingested file (hash match): %s", manifest_file.name)
+                return True
+        except (json.JSONDecodeError, KeyError):
+            continue
+    return False
+
+
+def save_manifest(manifest: Manifest) -> Path:
+    """Write manifest JSON to the manifests directory."""
+    manifests_dir = Path(cfg.books_manifests)
+    manifests_dir.mkdir(parents=True, exist_ok=True)
+
+    # Filename: stem + timestamp to avoid collisions on re-ingestion
+    timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
+    stem = Path(manifest.source_file).stem[:40]  # truncate long filenames
+    filename = f"{stem}_{timestamp}.json"
+    out_path = manifests_dir / filename
+
+    out_path.write_text(json.dumps(asdict(manifest), indent=2))
+    log.info("Manifest saved: %s", out_path)
+    return out_path
+
+
+def build_manifest(
+    source_file: str,
+    file_hash: str,
+    doc_type: str,
+    doc_title: str,
+    chapters_detected: int,
+    book_summary_id: str | None,
+    chapter_summary_ids: list[str],
+    content_ids: list[str],
+    status: str = "complete",
+) -> Manifest:
+    all_ids = [i for i in [book_summary_id] + chapter_summary_ids + content_ids if i]
+    return Manifest(
+        source_file=source_file,
+        file_hash=file_hash,
+        ingested_at=datetime.now(timezone.utc).isoformat(),
+        doc_type=doc_type,
+        doc_title=doc_title,
+        chapters_detected=chapters_detected,
+        memories={
+            "book_summary": 1 if book_summary_id else 0,
+            "chapter_summary": len(chapter_summary_ids),
+            "content": len(content_ids),
+        },
+        memory_ids=all_ids,
+        status=status,
+    )

+ 146 - 0
book_ingestor/mem0_writer.py

@@ -0,0 +1,146 @@
+"""
+mem0_writer.py — the ONLY module that talks to the mem0 server.
+Targets the /knowledge endpoint for objective fact storage.
+
+Server expects: { text, user_id, metadata, infer }
+- summaries  → infer: false  (already distilled by Groq, store verbatim)
+- raw chunks → infer: true   (let server extract facts from raw text)
+"""
+
+from __future__ import annotations
+
+import logging
+from datetime import datetime, timezone
+
+import requests
+
+from .config import cfg
+from .chunker import Chunk
+
+log = logging.getLogger(__name__)
+
+_SESSION = requests.Session()
+_SESSION.headers.update({"Content-Type": "application/json"})
+
+
+# ── Public API ─────────────────────────────────────────────────────────────────
+
+def write_book_summary(title: str, summary: str, source_file: str) -> str | None:
+    """POST a book-level summary. Stored verbatim — already distilled by Groq."""
+    return _post(
+        text=f"[Book Overview] {title}: {summary}",
+        metadata={
+            **_base_meta(source_file, "book_summary"),
+            "doc_title": title,
+        },
+        infer=False,
+    )
+
+
+def write_chapter_summary(
+    title: str,
+    chapter_title: str,
+    chapter_number: int | None,
+    summary: str,
+    source_file: str,
+    page_start: int,
+    page_end: int,
+) -> str | None:
+    """POST a chapter summary. Stored verbatim — already distilled by Groq."""
+    return _post(
+        text=f"[Chapter Summary] {title} — {chapter_title}: {summary}",
+        metadata={
+            **_base_meta(source_file, "chapter_summary"),
+            "doc_title": title,
+            "chapter_title": chapter_title,
+            "chapter": chapter_number,
+            "page_start": page_start,
+            "page_end": page_end,
+        },
+        infer=False,
+    )
+
+
+def write_content_chunk(chunk: Chunk, doc_title: str) -> str | None:
+    """POST a raw content chunk. Let server extract facts from it."""
+    return _post(
+        text=chunk.text,
+        metadata={
+            **_base_meta(chunk.source_file, "content"),
+            "doc_title": doc_title,
+            "chapter_title": chunk.section_title,
+            "chapter": chunk.chapter_number,
+            "page_start": chunk.page_start,
+            "page_end": chunk.page_end,
+            "chunk_index": chunk.chunk_index,
+            "token_count": chunk.token_count,
+        },
+        infer=True,
+    )
+
+
+def write_content_chunks_batch(chunks: list[Chunk], doc_title: str) -> list[str]:
+    """POST multiple content chunks. Returns list of successful memory IDs."""
+    ids = []
+    for chunk in chunks:
+        mem_id = write_content_chunk(chunk, doc_title)
+        if mem_id:
+            ids.append(mem_id)
+    return ids
+
+
+# ── Internal ───────────────────────────────────────────────────────────────────
+
+def _base_meta(source_file: str, memory_type: str) -> dict:
+    return {
+        "source_file": source_file,
+        "memory_type": memory_type,
+        "ingested_at": datetime.now(timezone.utc).isoformat(),
+    }
+
+
+def _post(text: str, metadata: dict, infer: bool) -> str | None:
+    """
+    POST a single entry to the /knowledge endpoint.
+    Returns the memory ID on success, None on failure.
+    """
+    payload = {
+        "text": text,
+        "user_id": cfg.mem0_agent_id,
+        "metadata": metadata,
+        "infer": infer,
+    }
+
+    url = f"{cfg.mem0_base_url}/knowledge"
+
+    try:
+        resp = _SESSION.post(url, json=payload, timeout=30)
+        resp.raise_for_status()
+        data = resp.json()
+
+        mem_id = _extract_id(data)
+        log.debug(
+            "Knowledge stored: %s (type=%s, infer=%s)",
+            mem_id, metadata.get("memory_type"), infer,
+        )
+        return mem_id
+
+    except requests.HTTPError as e:
+        log.error("knowledge HTTP error: %s — %s", e, resp.text[:300])
+    except requests.RequestException as e:
+        log.error("knowledge connection error: %s", e)
+
+    return None
+
+
+def _extract_id(data: dict | list) -> str | None:
+    """Robustly extract a memory ID from various response shapes."""
+    if isinstance(data, list):
+        return data[0].get("id") if data else None
+    if isinstance(data, dict):
+        if "id" in data:
+            return data["id"]
+        results = data.get("results", [])
+        if results and isinstance(results, list):
+            return results[0].get("id")
+    return None

+ 232 - 0
book_ingestor/pipeline.py

@@ -0,0 +1,232 @@
+"""
+pipeline.py — orchestrates the full ingestion flow for a single PDF.
+Calls detector → chunker → summarizer → mem0_writer → manifest in order.
+"""
+
+from __future__ import annotations
+
+import logging
+import shutil
+from pathlib import Path
+
+from rich.console import Console
+from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TaskProgressColumn
+
+from .config import cfg
+from .detector import detect_structure, DocumentStructure
+from .chunker import chunk_section
+from .summarizer import summarize_book, summarize_chapter, summarize_flat_document
+from .mem0_writer import (
+    write_book_summary,
+    write_chapter_summary,
+    write_content_chunks_batch,
+)
+from .manifest import compute_file_hash, already_ingested, build_manifest, save_manifest
+
+log = logging.getLogger(__name__)
+console = Console()
+
+
+def ingest(pdf_path: str | Path) -> bool:
+    """
+    Full ingestion pipeline for a single PDF.
+    Moves file through inbox → processing → done.
+    Returns True on success, False on failure.
+    """
+    pdf_path = Path(pdf_path)
+    processing_path = Path(cfg.books_processing) / pdf_path.name
+    done_path = Path(cfg.books_done) / pdf_path.name
+
+    # ── Deduplication check ──────────────────────────────────────────────────
+    console.print(f"\n[bold cyan]📚 Ingesting:[/] {pdf_path.name}")
+    file_hash = compute_file_hash(pdf_path)
+
+    if already_ingested(file_hash):
+        console.print("[yellow]⚠ Already ingested (hash match). Skipping.[/]")
+        return True
+
+    # ── Move to processing ───────────────────────────────────────────────────
+    Path(cfg.books_processing).mkdir(parents=True, exist_ok=True)
+    shutil.move(str(pdf_path), processing_path)
+    log.info("Moved to processing: %s", processing_path)
+
+    try:
+        result = _run_pipeline(processing_path, file_hash)
+    except Exception as e:
+        log.exception("Pipeline failed for %s: %s", pdf_path.name, e)
+        console.print(f"[red]✗ Pipeline failed: {e}[/]")
+        # Move back to inbox so user can retry
+        shutil.move(str(processing_path), pdf_path)
+        return False
+
+    # ── Archive ──────────────────────────────────────────────────────────────
+    Path(cfg.books_done).mkdir(parents=True, exist_ok=True)
+    shutil.move(str(processing_path), done_path)
+    log.info("Archived to done: %s", done_path)
+
+    return result
+
+
+def _run_pipeline(pdf_path: Path, file_hash: str) -> bool:
+    """Inner pipeline — assumes file is already in processing/."""
+
+    with Progress(
+        SpinnerColumn(),
+        TextColumn("[progress.description]{task.description}"),
+        BarColumn(),
+        TaskProgressColumn(),
+        console=console,
+        transient=True,
+    ) as progress:
+
+        # ── Step 1: Detect structure ─────────────────────────────────────────
+        task = progress.add_task("Detecting structure...", total=None)
+        structure = detect_structure(pdf_path)
+        progress.update(task, completed=True)
+
+        console.print(
+            f"  [green]✓[/] [bold]{structure.doc_type.upper()}[/] document — "
+            f"[dim]{structure.title}[/]"
+        )
+        if structure.doc_type == "structured":
+            console.print(f"  [green]✓[/] {len(structure.sections)} sections detected")
+
+        # ── Route to appropriate sub-pipeline ───────────────────────────────
+        if structure.doc_type == "structured":
+            return _structured_pipeline(pdf_path, structure, file_hash, progress)
+        else:
+            return _flat_pipeline(pdf_path, structure, file_hash, progress)
+
+
+def _structured_pipeline(
+    pdf_path: Path,
+    structure: DocumentStructure,
+    file_hash: str,
+    progress: Progress,
+) -> bool:
+
+    chapter_summary_ids: list[str] = []
+    chapter_summaries: list[str] = []
+    all_content_ids: list[str] = []
+
+    # ── Step 2: Summarize chapters ───────────────────────────────────────────
+    task = progress.add_task("Summarizing chapters...", total=len(structure.sections))
+
+    for section in structure.sections:
+        summary = summarize_chapter(
+            book_title=structure.title,
+            chapter_title=section.title,
+            chapter_text=section.raw_text,
+        )
+        chapter_summaries.append(summary)
+
+        mem_id = write_chapter_summary(
+            title=structure.title,
+            chapter_title=section.title,
+            chapter_number=section.chapter_number,
+            summary=summary,
+            source_file=structure.source_file,
+            page_start=section.page_start,
+            page_end=section.page_end,
+        )
+        if mem_id:
+            chapter_summary_ids.append(mem_id)
+
+        progress.advance(task)
+
+    # ── Step 3: Book-level summary ───────────────────────────────────────────
+    task2 = progress.add_task("Generating book summary...", total=None)
+    book_summary = summarize_book(structure.title, chapter_summaries)
+    book_summary_id = write_book_summary(
+        title=structure.title,
+        summary=book_summary,
+        source_file=structure.source_file,
+    )
+    progress.update(task2, completed=True)
+
+    # ── Step 4: Chunk and store content ─────────────────────────────────────
+    task3 = progress.add_task("Chunking content...", total=len(structure.sections))
+
+    for section in structure.sections:
+        chunks = chunk_section(
+            text=section.raw_text,
+            source_file=structure.source_file,
+            section_title=section.title,
+            chapter_number=section.chapter_number,
+            page_start=section.page_start,
+            page_end=section.page_end,
+        )
+        ids = write_content_chunks_batch(chunks, doc_title=structure.title)
+        all_content_ids.extend(ids)
+        progress.advance(task3)
+
+    # ── Step 5: Save manifest ────────────────────────────────────────────────
+    manifest = build_manifest(
+        source_file=structure.source_file,
+        file_hash=file_hash,
+        doc_type="structured",
+        doc_title=structure.title,
+        chapters_detected=len(structure.sections),
+        book_summary_id=book_summary_id,
+        chapter_summary_ids=chapter_summary_ids,
+        content_ids=all_content_ids,
+    )
+    save_manifest(manifest)
+
+    _print_summary(manifest.memories, structure.title)
+    return True
+
+
+def _flat_pipeline(
+    pdf_path: Path,
+    structure: DocumentStructure,
+    file_hash: str,
+    progress: Progress,
+) -> bool:
+
+    # ── Step 2: Summarize whole doc ──────────────────────────────────────────
+    task = progress.add_task("Summarizing document...", total=None)
+    doc_summary = summarize_flat_document(structure.title, structure.full_text)
+    book_summary_id = write_book_summary(
+        title=structure.title,
+        summary=doc_summary,
+        source_file=structure.source_file,
+    )
+    progress.update(task, completed=True)
+
+    # ── Step 3: Chunk and store content ─────────────────────────────────────
+    task2 = progress.add_task("Chunking content...", total=None)
+    chunks = chunk_section(
+        text=structure.full_text,
+        source_file=structure.source_file,
+    )
+    content_ids = write_content_chunks_batch(chunks, doc_title=structure.title)
+    progress.update(task2, completed=True)
+
+    # ── Step 4: Save manifest ────────────────────────────────────────────────
+    manifest = build_manifest(
+        source_file=structure.source_file,
+        file_hash=file_hash,
+        doc_type="flat",
+        doc_title=structure.title,
+        chapters_detected=0,
+        book_summary_id=book_summary_id,
+        chapter_summary_ids=[],
+        content_ids=content_ids,
+    )
+    save_manifest(manifest)
+
+    _print_summary(manifest.memories, structure.title)
+    return True
+
+
+def _print_summary(memories: dict, title: str) -> None:
+    total = sum(memories.values())
+    console.print(
+        f"\n[bold green]✓ Done![/] [dim]{title}[/]\n"
+        f"  Memories stored: "
+        f"[cyan]{memories.get('book_summary', 0)}[/] book · "
+        f"[cyan]{memories.get('chapter_summary', 0)}[/] chapters · "
+        f"[cyan]{memories.get('content', 0)}[/] chunks · "
+        f"[bold]{total}[/] total\n"
+    )

+ 100 - 0
book_ingestor/summarizer.py

@@ -0,0 +1,100 @@
+"""
+summarizer.py — the ONLY module that calls an LLM (Groq/Llama 4).
+Generates book-level and chapter-level summaries.
+Keeps prompts tight to minimise token spend.
+"""
+
+from __future__ import annotations
+
+import logging
+
+from groq import Groq
+
+from .config import cfg
+
+log = logging.getLogger(__name__)
+
+_client: Groq | None = None
+
+
+def _get_client() -> Groq:
+    global _client
+    if _client is None:
+        _client = Groq(api_key=cfg.groq_api_key)
+    return _client
+
+
+def _call(prompt: str, max_tokens: int = 512) -> str:
+    """Single Groq call. Returns text response."""
+    response = _get_client().chat.completions.create(
+        model=cfg.groq_model,
+        messages=[{"role": "user", "content": prompt}],
+        max_tokens=max_tokens,
+        temperature=0.3,   # low temp = factual, consistent summaries
+    )
+    return response.choices[0].message.content.strip()
+
+
+# ── Public API ─────────────────────────────────────────────────────────────────
+
+def summarize_book(title: str, chapter_summaries: list[str]) -> str:
+    """
+    Generate a high-level book summary from the chapter summaries.
+    Input is cheap: we only send summaries, not raw text.
+    """
+    joined = "\n\n".join(
+        f"[Section {i+1}]: {s}" for i, s in enumerate(chapter_summaries)
+    )
+    prompt = (
+        f'You are summarizing the book "{title}".\n'
+        f"Below are summaries of each chapter/section.\n"
+        f"Write a concise overall summary (4-6 sentences) covering the main thesis, "
+        f"key ideas, and conclusions. Be factual and dense — no filler.\n\n"
+        f"{joined}"
+    )
+    log.info("Generating book summary for: %s", title)
+    return _call(prompt, max_tokens=400)
+
+
+def summarize_chapter(
+    book_title: str,
+    chapter_title: str,
+    chapter_text: str,
+    max_input_chars: int = 6000,
+) -> str:
+    """
+    Summarize a single chapter. Truncates input to keep token cost low.
+    6000 chars ≈ ~1500 tokens — well within Llama 4 context.
+    """
+    # Truncate raw text to control input tokens
+    truncated = chapter_text[:max_input_chars]
+    if len(chapter_text) > max_input_chars:
+        truncated += "\n[... text truncated for summary ...]"
+
+    prompt = (
+        f'From the book "{book_title}", summarize the chapter "{chapter_title}".\n'
+        f"Write 3-5 sentences covering the key points, arguments, and conclusions. "
+        f"Be specific and factual.\n\n"
+        f"{truncated}"
+    )
+    log.debug("Summarizing chapter: %s", chapter_title)
+    return _call(prompt, max_tokens=300)
+
+
+def summarize_flat_document(title: str, full_text: str, max_input_chars: int = 8000) -> str:
+    """
+    Summarize a flat (unstructured) document.
+    For long docs, summarizes the first portion — sufficient for most reference material.
+    """
+    truncated = full_text[:max_input_chars]
+    if len(full_text) > max_input_chars:
+        truncated += "\n[... text truncated for summary ...]"
+
+    prompt = (
+        f'Summarize the following document titled "{title}".\n'
+        f"Write 4-6 sentences covering the main topic, key points, and conclusions. "
+        f"Be specific and factual.\n\n"
+        f"{truncated}"
+    )
+    log.info("Summarizing flat document: %s", title)
+    return _call(prompt, max_tokens=400)

+ 227 - 0
book_ingestor/watchdog_runner.py

@@ -0,0 +1,227 @@
+"""
+watchdog_runner.py — watches books/inbox/ and triggers the pipeline.
+Entry point: python -m book_ingestor.watchdog_runner
+
+Uses the watchdog library for filesystem events.
+Rich provides a live terminal dashboard when running interactively.
+In headless/container environments (no TTY), falls back to plain log output.
+"""
+
+from __future__ import annotations
+
+import logging
+import signal
+import sys
+import time
+from pathlib import Path
+
+from rich.console import Console
+from rich.live import Live
+from rich.panel import Panel
+from rich.table import Table
+from rich.text import Text
+from watchdog.events import FileSystemEvent, FileSystemEventHandler
+from watchdog.observers import Observer
+
+from .config import cfg
+from .pipeline import ingest
+
+# ── Logging setup ──────────────────────────────────────────────────────────────
+
+logging.basicConfig(
+    level=getattr(logging, cfg.log_level.upper(), logging.INFO),
+    format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
+    datefmt="%H:%M:%S",
+)
+log = logging.getLogger(__name__)
+console = Console()
+
+# True when running in a real interactive terminal, False in Docker/headless
+IS_TTY = sys.stdout.isatty()
+
+SUPPORTED_EXTENSIONS = {".pdf"}  # .md, .txt, .epub coming in phase 2
+
+
+# ── Stats tracker ──────────────────────────────────────────────────────────────
+
+class Stats:
+    def __init__(self):
+        self.watched = str(Path(cfg.books_inbox).resolve())
+        self.processed = 0
+        self.failed = 0
+        self.skipped = 0
+        self.last_file: str = "—"
+        self.last_status: str = "—"
+        self.start_time = time.time()
+
+    def uptime(self) -> str:
+        elapsed = int(time.time() - self.start_time)
+        h, m, s = elapsed // 3600, (elapsed % 3600) // 60, elapsed % 60
+        return f"{h:02d}:{m:02d}:{s:02d}"
+
+
+_stats = Stats()
+
+
+# ── Dashboard (TTY only) ───────────────────────────────────────────────────────
+
+def _build_dashboard() -> Panel:
+    grid = Table.grid(padding=(0, 2))
+    grid.add_column(style="bold cyan", min_width=18)
+    grid.add_column()
+
+    grid.add_row("📂 Watching", _stats.watched)
+    grid.add_row("⏱  Uptime", _stats.uptime())
+    grid.add_row("✅  Processed", str(_stats.processed))
+    grid.add_row("⚠️  Failed", str(_stats.failed))
+    grid.add_row("⏭  Skipped", str(_stats.skipped))
+    grid.add_row("📄  Last file", _stats.last_file)
+    grid.add_row(
+        "   Status",
+        Text(_stats.last_status, style="green" if "✓" in _stats.last_status else "yellow"),
+    )
+
+    return Panel(
+        grid,
+        title="[bold white]📚 book-ingestor[/]",
+        subtitle="[dim]Drop a PDF into inbox/ to ingest · Ctrl+C to stop[/]",
+        border_style="cyan",
+    )
+
+
+# ── Shared ingest logic ────────────────────────────────────────────────────────
+
+def _run_ingest(pdf: Path) -> None:
+    """Run ingestion and update stats. Works in both TTY and headless mode."""
+    _stats.last_file = pdf.name
+    _stats.last_status = "⏳ Processing..."
+
+    if not IS_TTY:
+        log.info("Processing: %s", pdf.name)
+
+    success = ingest(pdf)
+
+    if success:
+        _stats.processed += 1
+        _stats.last_status = "✓ Complete"
+        if not IS_TTY:
+            log.info("Complete: %s (processed=%d)", pdf.name, _stats.processed)
+    else:
+        _stats.failed += 1
+        _stats.last_status = "✗ Failed"
+        if not IS_TTY:
+            log.error("Failed: %s (failed=%d)", pdf.name, _stats.failed)
+
+
+# ── Watchdog handler ───────────────────────────────────────────────────────────
+
+class InboxHandler(FileSystemEventHandler):
+    def __init__(self, live: Live | None = None):
+        self._live = live
+
+    def on_created(self, event: FileSystemEvent) -> None:
+        if event.is_directory:
+            return
+
+        path = Path(event.src_path)
+        if path.suffix.lower() not in SUPPORTED_EXTENSIONS:
+            log.debug("Ignoring unsupported file type: %s", path.name)
+            return
+
+        # Brief wait to ensure file write is complete
+        time.sleep(1.0)
+        if not path.exists():
+            return
+
+        _run_ingest(path)
+
+        if self._live:
+            self._live.refresh()
+
+
+# ── Setup ──────────────────────────────────────────────────────────────────────
+
+def _ensure_folders() -> None:
+    for folder in [
+        cfg.books_inbox,
+        cfg.books_processing,
+        cfg.books_done,
+        cfg.books_manifests,
+    ]:
+        Path(folder).mkdir(parents=True, exist_ok=True)
+
+
+def _process_existing(live: Live | None = None) -> None:
+    """Process any PDFs already sitting in inbox/ at startup."""
+    inbox = Path(cfg.books_inbox)
+    existing = list(inbox.glob("*.pdf"))
+    if not existing:
+        return
+
+    log.info("Found %d existing file(s) in inbox — processing...", len(existing))
+    for pdf in existing:
+        _run_ingest(pdf)
+        if live:
+            live.refresh()
+
+
+# ── Entry point ────────────────────────────────────────────────────────────────
+
+def _shutdown(observer: Observer) -> None:
+    log.info("Shutting down...")
+    observer.stop()
+    observer.join()
+    sys.exit(0)
+
+
+def run() -> None:
+    _ensure_folders()
+
+    observer = Observer()
+
+    if IS_TTY:
+        # ── Interactive mode: Rich live dashboard ──────────────────────────────
+        with Live(_build_dashboard(), console=console, refresh_per_second=2) as live:
+            _process_existing(live)
+
+            handler = InboxHandler(live)
+            observer.schedule(handler, cfg.books_inbox, recursive=False)
+            observer.start()
+            log.info("Watching: %s", Path(cfg.books_inbox).resolve())
+
+            signal.signal(signal.SIGINT, lambda s, f: _shutdown(observer))
+            signal.signal(signal.SIGTERM, lambda s, f: _shutdown(observer))
+
+            try:
+                while observer.is_alive():
+                    live.update(_build_dashboard())
+                    time.sleep(1)
+            except Exception:
+                observer.stop()
+                observer.join()
+                raise
+    else:
+        # ── Headless mode: plain structured logging (Docker / no TTY) ─────────
+        log.info("Headless mode — Rich dashboard disabled")
+        log.info("Watching: %s", Path(cfg.books_inbox).resolve())
+
+        _process_existing()
+
+        handler = InboxHandler()
+        observer.schedule(handler, cfg.books_inbox, recursive=False)
+        observer.start()
+
+        signal.signal(signal.SIGINT, lambda s, f: _shutdown(observer))
+        signal.signal(signal.SIGTERM, lambda s, f: _shutdown(observer))
+
+        try:
+            while observer.is_alive():
+                time.sleep(1)
+        except Exception:
+            observer.stop()
+            observer.join()
+            raise
+
+
+if __name__ == "__main__":
+    run()

+ 0 - 0
books/done/.gitkeep


+ 0 - 0
books/inbox/.gitkeep


+ 0 - 0
books/manifests/.gitkeep


+ 0 - 0
books/processing/.gitkeep


+ 17 - 0
docker-compose.yml

@@ -0,0 +1,17 @@
+services:
+  book-ingestor:
+    build: .
+    container_name: book-ingestor
+    restart: unless-stopped
+    env_file:
+      - .env
+    volumes:
+      # books/ folder lives on the host — survives container restarts
+      - ./books:/app/books
+    # Access to host network so it can reach mem0 at 192.168.0.200
+    network_mode: host
+    logging:
+      driver: "json-file"
+      options:
+        max-size: "10m"
+        max-file: "3"

+ 7 - 0
requirements.txt

@@ -0,0 +1,7 @@
+pymupdf>=1.24.0
+watchdog>=4.0.0
+groq>=0.9.0
+tiktoken>=0.7.0
+requests>=2.31.0
+python-dotenv>=1.0.0
+rich>=13.7.0