mem0_writer.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. """
  2. mem0_writer.py — the ONLY module that talks to the mem0 server.
  3. Targets the /knowledge endpoint for objective fact storage.
  4. Server expects: { text, user_id, metadata, infer }
  5. - summaries → infer: false (already distilled by Groq, store verbatim)
  6. - raw chunks → infer: true (let server extract facts from raw text)
  7. """
  8. from __future__ import annotations
  9. import logging
  10. import time
  11. from datetime import datetime, timezone
  12. import requests
  13. from .config import cfg
  14. from .chunker import Chunk
  15. log = logging.getLogger(__name__)
  16. _SESSION = requests.Session()
  17. _SESSION.headers.update({"Content-Type": "application/json"})
  18. # ── Public API ─────────────────────────────────────────────────────────────────
  19. def write_book_summary(title: str, summary: str, source_file: str) -> str | None:
  20. """POST a book-level summary. Stored verbatim — already distilled by Groq."""
  21. return _post(
  22. text=f"[Book Overview] {title}: {summary}",
  23. metadata={
  24. **_base_meta(source_file, "book_summary"),
  25. "doc_title": title,
  26. },
  27. infer=False,
  28. )
  29. def write_chapter_summary(
  30. title: str,
  31. chapter_title: str,
  32. chapter_number: int | None,
  33. summary: str,
  34. source_file: str,
  35. page_start: int,
  36. page_end: int,
  37. ) -> str | None:
  38. """POST a chapter summary. Stored verbatim — already distilled by Groq."""
  39. return _post(
  40. text=f"[Chapter Summary] {title} — {chapter_title}: {summary}",
  41. metadata={
  42. **_base_meta(source_file, "chapter_summary"),
  43. "doc_title": title,
  44. "chapter_title": chapter_title,
  45. "chapter": chapter_number,
  46. "page_start": page_start,
  47. "page_end": page_end,
  48. },
  49. infer=False,
  50. )
  51. def write_content_chunk(chunk: Chunk, doc_title: str) -> str | None:
  52. """POST a raw content chunk. Let server extract facts from it."""
  53. return _post(
  54. text=chunk.text,
  55. metadata={
  56. **_base_meta(chunk.source_file, "content"),
  57. "doc_title": doc_title,
  58. "chapter_title": chunk.section_title,
  59. "chapter": chunk.chapter_number,
  60. "page_start": chunk.page_start,
  61. "page_end": chunk.page_end,
  62. "chunk_index": chunk.chunk_index,
  63. "token_count": chunk.token_count,
  64. },
  65. infer=False,
  66. )
  67. def write_content_chunks_batch(chunks: list[Chunk], doc_title: str) -> list[str]:
  68. """
  69. POST multiple content chunks. Returns list of successful memory IDs.
  70. Throttled by INGEST_DELAY to avoid hammering the Ollama embedder.
  71. """
  72. ids = []
  73. for i, chunk in enumerate(chunks):
  74. mem_id = write_content_chunk(chunk, doc_title)
  75. if mem_id:
  76. ids.append(mem_id)
  77. # Throttle after every chunk except the last — give the GPU breathing room
  78. if cfg.ingest_delay > 0 and i < len(chunks) - 1:
  79. time.sleep(cfg.ingest_delay)
  80. return ids
  81. # ── Internal ───────────────────────────────────────────────────────────────────
  82. def _base_meta(source_file: str, memory_type: str) -> dict:
  83. return {
  84. "source_file": source_file,
  85. "memory_type": memory_type,
  86. "ingested_at": datetime.now(timezone.utc).isoformat(),
  87. }
  88. def _sanitize_meta(metadata: dict) -> dict:
  89. """
  90. ChromaDB only accepts str, int, float, bool as metadata values.
  91. Remove None values and convert anything else to str.
  92. """
  93. clean = {}
  94. for k, v in metadata.items():
  95. if v is None:
  96. continue # drop None entirely
  97. if isinstance(v, (str, int, float, bool)):
  98. clean[k] = v
  99. else:
  100. clean[k] = str(v) # last resort conversion
  101. return clean
  102. def _post(text: str, metadata: dict, infer: bool) -> str | None:
  103. """
  104. POST a single entry to the /knowledge endpoint.
  105. Returns the memory ID on success, None on failure.
  106. """
  107. payload = {
  108. "text": text,
  109. "user_id": cfg.mem0_agent_id,
  110. "metadata": metadata,
  111. "infer": infer,
  112. }
  113. metadata = _sanitize_meta(metadata)
  114. url = f"{cfg.mem0_base_url}/knowledge"
  115. try:
  116. resp = _SESSION.post(url, json=payload, timeout=30)
  117. resp.raise_for_status()
  118. data = resp.json()
  119. mem_id = _extract_id(data)
  120. log.debug(
  121. "Knowledge stored: %s (type=%s, infer=%s)",
  122. mem_id, metadata.get("memory_type"), infer,
  123. )
  124. return mem_id
  125. except requests.HTTPError as e:
  126. log.error("knowledge HTTP error: %s — %s", e, resp.text[:300])
  127. except requests.RequestException as e:
  128. log.error("knowledge connection error: %s", e)
  129. return None
  130. def _extract_id(data: dict | list) -> str | None:
  131. """Robustly extract a memory ID from various response shapes."""
  132. if isinstance(data, list):
  133. return data[0].get("id") if data else None
  134. if isinstance(data, dict):
  135. if "id" in data:
  136. return data["id"]
  137. results = data.get("results", [])
  138. if results and isinstance(results, list):
  139. return results[0].get("id")
  140. return None