| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181 |
- """
- chunker.py — splits text into token-sized chunks, purely in Python.
- No LLM calls. Uses tiktoken for accurate token counting.
- Strategy:
- - Split on paragraph boundaries first (double newline)
- - If a paragraph exceeds chunk_size, split on sentence boundaries
- - If a sentence exceeds chunk_size, hard-split on token count
- - Chunks carry their source metadata (page range, section title)
- """
- from __future__ import annotations
- import re
- from dataclasses import dataclass
- import tiktoken
- from .config import cfg
- # Use cl100k_base — matches most modern LLMs well enough for counting
- _ENCODER = tiktoken.get_encoding("cl100k_base")
- # ── Data model ─────────────────────────────────────────────────────────────────
- @dataclass
- class Chunk:
- text: str
- token_count: int
- source_file: str
- section_title: str | None # None for flat docs
- chapter_number: int | None
- page_start: int
- page_end: int
- chunk_index: int # position within parent section
- # ── Public API ─────────────────────────────────────────────────────────────────
- def chunk_section(
- text: str,
- source_file: str,
- section_title: str | None = None,
- chapter_number: int | None = None,
- page_start: int = 0,
- page_end: int = 0,
- chunk_size: int | None = None,
- ) -> list[Chunk]:
- """
- Chunk a block of text into token-sized pieces.
- Returns a list of Chunk objects with metadata attached.
- """
- size = chunk_size or cfg.chunk_size_tokens
- paragraphs = _split_paragraphs(text)
- raw_chunks = _build_chunks(paragraphs, size)
- return [
- Chunk(
- text=raw,
- token_count=count_tokens(raw),
- source_file=source_file,
- section_title=section_title,
- chapter_number=chapter_number,
- page_start=page_start,
- page_end=page_end,
- chunk_index=idx,
- )
- for idx, raw in enumerate(raw_chunks)
- if raw.strip()
- ]
- def count_tokens(text: str) -> int:
- """Count tokens in a string using tiktoken."""
- return len(_ENCODER.encode(text))
- # ── Internal helpers ───────────────────────────────────────────────────────────
- def _split_paragraphs(text: str) -> list[str]:
- """Split on blank lines, clean up whitespace."""
- paragraphs = re.split(r"\n\s*\n", text)
- return [p.strip() for p in paragraphs if p.strip()]
- def _split_sentences(text: str) -> list[str]:
- """
- Rough sentence splitter — handles common abbreviations.
- Good enough for chunking purposes without an NLP library.
- """
- # Protect common abbreviations from splitting
- protected = re.sub(
- r"\b(Mr|Mrs|Ms|Dr|Prof|Sr|Jr|vs|etc|approx|dept|est|fig|govt|inc|ltd|no|vol)\.",
- r"\1<DOT>",
- text,
- flags=re.IGNORECASE,
- )
- # Split on sentence-ending punctuation followed by whitespace + capital
- sentences = re.split(r"(?<=[.!?])\s+(?=[A-Z\"\'])", protected)
- # Restore protected dots
- return [s.replace("<DOT>", ".").strip() for s in sentences if s.strip()]
- def _build_chunks(paragraphs: list[str], max_tokens: int) -> list[str]:
- """
- Greedily build chunks by accumulating paragraphs.
- Respects max_tokens boundary, spilling over to sentences then hard splits.
- """
- chunks: list[str] = []
- current_parts: list[str] = []
- current_tokens = 0
- for para in paragraphs:
- para_tokens = count_tokens(para)
- if para_tokens > max_tokens:
- # Paragraph too big — flush current buffer, then split paragraph
- if current_parts:
- chunks.append(" ".join(current_parts))
- current_parts, current_tokens = [], 0
- chunks.extend(_split_large_paragraph(para, max_tokens))
- continue
- if current_tokens + para_tokens > max_tokens:
- # Would exceed limit — flush and start fresh
- if current_parts:
- chunks.append(" ".join(current_parts))
- current_parts = [para]
- current_tokens = para_tokens
- else:
- current_parts.append(para)
- current_tokens += para_tokens
- if current_parts:
- chunks.append(" ".join(current_parts))
- return chunks
- def _split_large_paragraph(para: str, max_tokens: int) -> list[str]:
- """Split an oversized paragraph at sentence boundaries."""
- sentences = _split_sentences(para)
- chunks: list[str] = []
- current_parts: list[str] = []
- current_tokens = 0
- for sent in sentences:
- sent_tokens = count_tokens(sent)
- if sent_tokens > max_tokens:
- # Single sentence too long — hard split by tokens
- if current_parts:
- chunks.append(" ".join(current_parts))
- current_parts, current_tokens = [], 0
- chunks.extend(_hard_split(sent, max_tokens))
- continue
- if current_tokens + sent_tokens > max_tokens:
- if current_parts:
- chunks.append(" ".join(current_parts))
- current_parts = [sent]
- current_tokens = sent_tokens
- else:
- current_parts.append(sent)
- current_tokens += sent_tokens
- if current_parts:
- chunks.append(" ".join(current_parts))
- return chunks
- def _hard_split(text: str, max_tokens: int) -> list[str]:
- """Last resort: split a string into max_tokens-sized pieces by token index."""
- tokens = _ENCODER.encode(text)
- chunks = []
- for i in range(0, len(tokens), max_tokens):
- chunk_tokens = tokens[i: i + max_tokens]
- chunks.append(_ENCODER.decode(chunk_tokens))
- return chunks
|