| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146 |
- """
- mem0_writer.py — the ONLY module that talks to the mem0 server.
- Targets the /knowledge endpoint for objective fact storage.
- Server expects: { text, user_id, metadata, infer }
- - summaries → infer: false (already distilled by Groq, store verbatim)
- - raw chunks → infer: true (let server extract facts from raw text)
- """
- from __future__ import annotations
- import logging
- from datetime import datetime, timezone
- import requests
- from .config import cfg
- from .chunker import Chunk
- log = logging.getLogger(__name__)
- _SESSION = requests.Session()
- _SESSION.headers.update({"Content-Type": "application/json"})
- # ── Public API ─────────────────────────────────────────────────────────────────
- def write_book_summary(title: str, summary: str, source_file: str) -> str | None:
- """POST a book-level summary. Stored verbatim — already distilled by Groq."""
- return _post(
- text=f"[Book Overview] {title}: {summary}",
- metadata={
- **_base_meta(source_file, "book_summary"),
- "doc_title": title,
- },
- infer=False,
- )
- def write_chapter_summary(
- title: str,
- chapter_title: str,
- chapter_number: int | None,
- summary: str,
- source_file: str,
- page_start: int,
- page_end: int,
- ) -> str | None:
- """POST a chapter summary. Stored verbatim — already distilled by Groq."""
- return _post(
- text=f"[Chapter Summary] {title} — {chapter_title}: {summary}",
- metadata={
- **_base_meta(source_file, "chapter_summary"),
- "doc_title": title,
- "chapter_title": chapter_title,
- "chapter": chapter_number,
- "page_start": page_start,
- "page_end": page_end,
- },
- infer=False,
- )
- def write_content_chunk(chunk: Chunk, doc_title: str) -> str | None:
- """POST a raw content chunk. Let server extract facts from it."""
- return _post(
- text=chunk.text,
- metadata={
- **_base_meta(chunk.source_file, "content"),
- "doc_title": doc_title,
- "chapter_title": chunk.section_title,
- "chapter": chunk.chapter_number,
- "page_start": chunk.page_start,
- "page_end": chunk.page_end,
- "chunk_index": chunk.chunk_index,
- "token_count": chunk.token_count,
- },
- infer=True,
- )
- def write_content_chunks_batch(chunks: list[Chunk], doc_title: str) -> list[str]:
- """POST multiple content chunks. Returns list of successful memory IDs."""
- ids = []
- for chunk in chunks:
- mem_id = write_content_chunk(chunk, doc_title)
- if mem_id:
- ids.append(mem_id)
- return ids
- # ── Internal ───────────────────────────────────────────────────────────────────
- def _base_meta(source_file: str, memory_type: str) -> dict:
- return {
- "source_file": source_file,
- "memory_type": memory_type,
- "ingested_at": datetime.now(timezone.utc).isoformat(),
- }
- def _post(text: str, metadata: dict, infer: bool) -> str | None:
- """
- POST a single entry to the /knowledge endpoint.
- Returns the memory ID on success, None on failure.
- """
- payload = {
- "text": text,
- "user_id": cfg.mem0_agent_id,
- "metadata": metadata,
- "infer": infer,
- }
- url = f"{cfg.mem0_base_url}/knowledge"
- try:
- resp = _SESSION.post(url, json=payload, timeout=30)
- resp.raise_for_status()
- data = resp.json()
- mem_id = _extract_id(data)
- log.debug(
- "Knowledge stored: %s (type=%s, infer=%s)",
- mem_id, metadata.get("memory_type"), infer,
- )
- return mem_id
- except requests.HTTPError as e:
- log.error("knowledge HTTP error: %s — %s", e, resp.text[:300])
- except requests.RequestException as e:
- log.error("knowledge connection error: %s", e)
- return None
- def _extract_id(data: dict | list) -> str | None:
- """Robustly extract a memory ID from various response shapes."""
- if isinstance(data, list):
- return data[0].get("id") if data else None
- if isinstance(data, dict):
- if "id" in data:
- return data["id"]
- results = data.get("results", [])
- if results and isinstance(results, list):
- return results[0].get("id")
- return None
|