""" chunker.py — splits text into token-sized chunks, purely in Python. No LLM calls. Uses tiktoken for accurate token counting. Strategy: - Split on paragraph boundaries first (double newline) - If a paragraph exceeds chunk_size, split on sentence boundaries - If a sentence exceeds chunk_size, hard-split on token count - Chunks carry their source metadata (page range, section title) """ from __future__ import annotations import re from dataclasses import dataclass import tiktoken from .config import cfg # Use cl100k_base — matches most modern LLMs well enough for counting _ENCODER = tiktoken.get_encoding("cl100k_base") # ── Data model ───────────────────────────────────────────────────────────────── @dataclass class Chunk: text: str token_count: int source_file: str section_title: str | None # None for flat docs chapter_number: int | None page_start: int page_end: int chunk_index: int # position within parent section # ── Public API ───────────────────────────────────────────────────────────────── def chunk_section( text: str, source_file: str, section_title: str | None = None, chapter_number: int | None = None, page_start: int = 0, page_end: int = 0, chunk_size: int | None = None, ) -> list[Chunk]: """ Chunk a block of text into token-sized pieces. Returns a list of Chunk objects with metadata attached. """ size = chunk_size or cfg.chunk_size_tokens paragraphs = _split_paragraphs(text) raw_chunks = _build_chunks(paragraphs, size) return [ Chunk( text=raw, token_count=count_tokens(raw), source_file=source_file, section_title=section_title, chapter_number=chapter_number, page_start=page_start, page_end=page_end, chunk_index=idx, ) for idx, raw in enumerate(raw_chunks) if raw.strip() ] def count_tokens(text: str) -> int: """Count tokens in a string using tiktoken.""" return len(_ENCODER.encode(text)) # ── Internal helpers ─────────────────────────────────────────────────────────── def _split_paragraphs(text: str) -> list[str]: """Split on blank lines, clean up whitespace.""" paragraphs = re.split(r"\n\s*\n", text) return [p.strip() for p in paragraphs if p.strip()] def _split_sentences(text: str) -> list[str]: """ Rough sentence splitter — handles common abbreviations. Good enough for chunking purposes without an NLP library. """ # Protect common abbreviations from splitting protected = re.sub( r"\b(Mr|Mrs|Ms|Dr|Prof|Sr|Jr|vs|etc|approx|dept|est|fig|govt|inc|ltd|no|vol)\.", r"\1", text, flags=re.IGNORECASE, ) # Split on sentence-ending punctuation followed by whitespace + capital sentences = re.split(r"(?<=[.!?])\s+(?=[A-Z\"\'])", protected) # Restore protected dots return [s.replace("", ".").strip() for s in sentences if s.strip()] def _build_chunks(paragraphs: list[str], max_tokens: int) -> list[str]: """ Greedily build chunks by accumulating paragraphs. Respects max_tokens boundary, spilling over to sentences then hard splits. """ chunks: list[str] = [] current_parts: list[str] = [] current_tokens = 0 for para in paragraphs: para_tokens = count_tokens(para) if para_tokens > max_tokens: # Paragraph too big — flush current buffer, then split paragraph if current_parts: chunks.append(" ".join(current_parts)) current_parts, current_tokens = [], 0 chunks.extend(_split_large_paragraph(para, max_tokens)) continue if current_tokens + para_tokens > max_tokens: # Would exceed limit — flush and start fresh if current_parts: chunks.append(" ".join(current_parts)) current_parts = [para] current_tokens = para_tokens else: current_parts.append(para) current_tokens += para_tokens if current_parts: chunks.append(" ".join(current_parts)) return chunks def _split_large_paragraph(para: str, max_tokens: int) -> list[str]: """Split an oversized paragraph at sentence boundaries.""" sentences = _split_sentences(para) chunks: list[str] = [] current_parts: list[str] = [] current_tokens = 0 for sent in sentences: sent_tokens = count_tokens(sent) if sent_tokens > max_tokens: # Single sentence too long — hard split by tokens if current_parts: chunks.append(" ".join(current_parts)) current_parts, current_tokens = [], 0 chunks.extend(_hard_split(sent, max_tokens)) continue if current_tokens + sent_tokens > max_tokens: if current_parts: chunks.append(" ".join(current_parts)) current_parts = [sent] current_tokens = sent_tokens else: current_parts.append(sent) current_tokens += sent_tokens if current_parts: chunks.append(" ".join(current_parts)) return chunks def _hard_split(text: str, max_tokens: int) -> list[str]: """Last resort: split a string into max_tokens-sized pieces by token index.""" tokens = _ENCODER.encode(text) chunks = [] for i in range(0, len(tokens), max_tokens): chunk_tokens = tokens[i: i + max_tokens] chunks.append(_ENCODER.decode(chunk_tokens)) return chunks