""" mem0_writer.py — the ONLY module that talks to the mem0 server. Targets the /knowledge endpoint for objective fact storage. Server expects: { text, user_id, metadata, infer } - summaries → infer: false (already distilled by Groq, store verbatim) - raw chunks → infer: true (let server extract facts from raw text) """ from __future__ import annotations import logging import time from datetime import datetime, timezone import requests from .config import cfg from .chunker import Chunk log = logging.getLogger(__name__) _SESSION = requests.Session() _SESSION.headers.update({"Content-Type": "application/json"}) # ── Public API ───────────────────────────────────────────────────────────────── def write_book_summary(title: str, summary: str, source_file: str) -> str | None: """POST a book-level summary. Stored verbatim — already distilled by Groq.""" return _post( text=f"[Book Overview] {title}: {summary}", metadata={ **_base_meta(source_file, "book_summary"), "doc_title": title, }, infer=False, ) def write_chapter_summary( title: str, chapter_title: str, chapter_number: int | None, summary: str, source_file: str, page_start: int, page_end: int, ) -> str | None: """POST a chapter summary. Stored verbatim — already distilled by Groq.""" return _post( text=f"[Chapter Summary] {title} — {chapter_title}: {summary}", metadata={ **_base_meta(source_file, "chapter_summary"), "doc_title": title, "chapter_title": chapter_title, "chapter": chapter_number, "page_start": page_start, "page_end": page_end, }, infer=False, ) def write_content_chunk(chunk: Chunk, doc_title: str) -> str | None: """POST a raw content chunk. Let server extract facts from it.""" return _post( text=chunk.text, metadata={ **_base_meta(chunk.source_file, "content"), "doc_title": doc_title, "chapter_title": chunk.section_title, "chapter": chunk.chapter_number, "page_start": chunk.page_start, "page_end": chunk.page_end, "chunk_index": chunk.chunk_index, "token_count": chunk.token_count, }, infer=False, ) def write_content_chunks_batch(chunks: list[Chunk], doc_title: str) -> list[str]: """ POST multiple content chunks. Returns list of successful memory IDs. Throttled by INGEST_DELAY to avoid hammering the Ollama embedder. """ ids = [] for i, chunk in enumerate(chunks): mem_id = write_content_chunk(chunk, doc_title) if mem_id: ids.append(mem_id) # Throttle after every chunk except the last — give the GPU breathing room if cfg.ingest_delay > 0 and i < len(chunks) - 1: time.sleep(cfg.ingest_delay) return ids # ── Internal ─────────────────────────────────────────────────────────────────── def _base_meta(source_file: str, memory_type: str) -> dict: return { "source_file": source_file, "memory_type": memory_type, "ingested_at": datetime.now(timezone.utc).isoformat(), } def _sanitize_meta(metadata: dict) -> dict: """ ChromaDB only accepts str, int, float, bool as metadata values. Remove None values and convert anything else to str. """ clean = {} for k, v in metadata.items(): if v is None: continue # drop None entirely if isinstance(v, (str, int, float, bool)): clean[k] = v else: clean[k] = str(v) # last resort conversion return clean def _post(text: str, metadata: dict, infer: bool) -> str | None: """ POST a single entry to the /knowledge endpoint. Returns the memory ID on success, None on failure. """ payload = { "text": text, "user_id": cfg.mem0_agent_id, "metadata": metadata, "infer": infer, } metadata = _sanitize_meta(metadata) url = f"{cfg.mem0_base_url}/knowledge" try: resp = _SESSION.post(url, json=payload, timeout=30) resp.raise_for_status() data = resp.json() mem_id = _extract_id(data) log.debug( "Knowledge stored: %s (type=%s, infer=%s)", mem_id, metadata.get("memory_type"), infer, ) return mem_id except requests.HTTPError as e: log.error("knowledge HTTP error: %s — %s", e, resp.text[:300]) except requests.RequestException as e: log.error("knowledge connection error: %s", e) return None def _extract_id(data: dict | list) -> str | None: """Robustly extract a memory ID from various response shapes.""" if isinstance(data, list): return data[0].get("id") if data else None if isinstance(data, dict): if "id" in data: return data["id"] results = data.get("results", []) if results and isinstance(results, list): return results[0].get("id") return None