mem0_writer.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. """
  2. mem0_writer.py — the ONLY module that talks to the mem0 server.
  3. Targets the /knowledge endpoint for objective fact storage.
  4. Server expects: { text, user_id, metadata, infer }
  5. - summaries → infer: false (already distilled by Groq, store verbatim)
  6. - raw chunks → infer: true (let server extract facts from raw text)
  7. """
  8. from __future__ import annotations
  9. import logging
  10. from datetime import datetime, timezone
  11. import requests
  12. from .config import cfg
  13. from .chunker import Chunk
  14. log = logging.getLogger(__name__)
  15. _SESSION = requests.Session()
  16. _SESSION.headers.update({"Content-Type": "application/json"})
  17. # ── Public API ─────────────────────────────────────────────────────────────────
  18. def write_book_summary(title: str, summary: str, source_file: str) -> str | None:
  19. """POST a book-level summary. Stored verbatim — already distilled by Groq."""
  20. return _post(
  21. text=f"[Book Overview] {title}: {summary}",
  22. metadata={
  23. **_base_meta(source_file, "book_summary"),
  24. "doc_title": title,
  25. },
  26. infer=False,
  27. )
  28. def write_chapter_summary(
  29. title: str,
  30. chapter_title: str,
  31. chapter_number: int | None,
  32. summary: str,
  33. source_file: str,
  34. page_start: int,
  35. page_end: int,
  36. ) -> str | None:
  37. """POST a chapter summary. Stored verbatim — already distilled by Groq."""
  38. return _post(
  39. text=f"[Chapter Summary] {title} — {chapter_title}: {summary}",
  40. metadata={
  41. **_base_meta(source_file, "chapter_summary"),
  42. "doc_title": title,
  43. "chapter_title": chapter_title,
  44. "chapter": chapter_number,
  45. "page_start": page_start,
  46. "page_end": page_end,
  47. },
  48. infer=False,
  49. )
  50. def write_content_chunk(chunk: Chunk, doc_title: str) -> str | None:
  51. """POST a raw content chunk. Let server extract facts from it."""
  52. return _post(
  53. text=chunk.text,
  54. metadata={
  55. **_base_meta(chunk.source_file, "content"),
  56. "doc_title": doc_title,
  57. "chapter_title": chunk.section_title,
  58. "chapter": chunk.chapter_number,
  59. "page_start": chunk.page_start,
  60. "page_end": chunk.page_end,
  61. "chunk_index": chunk.chunk_index,
  62. "token_count": chunk.token_count,
  63. },
  64. infer=True,
  65. )
  66. def write_content_chunks_batch(chunks: list[Chunk], doc_title: str) -> list[str]:
  67. """POST multiple content chunks. Returns list of successful memory IDs."""
  68. ids = []
  69. for chunk in chunks:
  70. mem_id = write_content_chunk(chunk, doc_title)
  71. if mem_id:
  72. ids.append(mem_id)
  73. return ids
  74. # ── Internal ───────────────────────────────────────────────────────────────────
  75. def _base_meta(source_file: str, memory_type: str) -> dict:
  76. return {
  77. "source_file": source_file,
  78. "memory_type": memory_type,
  79. "ingested_at": datetime.now(timezone.utc).isoformat(),
  80. }
  81. def _post(text: str, metadata: dict, infer: bool) -> str | None:
  82. """
  83. POST a single entry to the /knowledge endpoint.
  84. Returns the memory ID on success, None on failure.
  85. """
  86. payload = {
  87. "text": text,
  88. "user_id": cfg.mem0_agent_id,
  89. "metadata": metadata,
  90. "infer": infer,
  91. }
  92. url = f"{cfg.mem0_base_url}/knowledge"
  93. try:
  94. resp = _SESSION.post(url, json=payload, timeout=30)
  95. resp.raise_for_status()
  96. data = resp.json()
  97. mem_id = _extract_id(data)
  98. log.debug(
  99. "Knowledge stored: %s (type=%s, infer=%s)",
  100. mem_id, metadata.get("memory_type"), infer,
  101. )
  102. return mem_id
  103. except requests.HTTPError as e:
  104. log.error("knowledge HTTP error: %s — %s", e, resp.text[:300])
  105. except requests.RequestException as e:
  106. log.error("knowledge connection error: %s", e)
  107. return None
  108. def _extract_id(data: dict | list) -> str | None:
  109. """Robustly extract a memory ID from various response shapes."""
  110. if isinstance(data, list):
  111. return data[0].get("id") if data else None
  112. if isinstance(data, dict):
  113. if "id" in data:
  114. return data["id"]
  115. results = data.get("results", [])
  116. if results and isinstance(results, list):
  117. return results[0].get("id")
  118. return None