mem0server.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594
  1. import os
  2. import math
  3. import json
  4. import sqlite3
  5. import httpx
  6. from fastapi import FastAPI, Request
  7. from fastapi.responses import JSONResponse, FileResponse, HTMLResponse
  8. from mem0 import Memory
  9. # =============================================================================
  10. # ENVIRONMENT
  11. # =============================================================================
  12. GROQ_API_KEY = os.environ.get("GROQ_API_KEY")
  13. if not GROQ_API_KEY:
  14. raise RuntimeError("GROQ_API_KEY environment variable is not set.")
  15. RERANKER_URL = os.environ.get("RERANKER_URL", "http://192.168.0.200:5200/rerank")
  16. SQLITE_PATH = os.path.expanduser("~/.mem0/history.db")
  17. # =============================================================================
  18. # SAFE JSON RESPONSE (handles Infinity / NaN from Chroma / reranker scores)
  19. # =============================================================================
  20. def _sanitize(obj):
  21. if isinstance(obj, float):
  22. if math.isnan(obj) or math.isinf(obj):
  23. return None
  24. if isinstance(obj, dict):
  25. return {k: _sanitize(v) for k, v in obj.items()}
  26. if isinstance(obj, list):
  27. return [_sanitize(i) for i in obj]
  28. return obj
  29. class SafeJSONResponse(JSONResponse):
  30. def render(self, content) -> bytes:
  31. return json.dumps(_sanitize(content), ensure_ascii=False).encode("utf-8")
  32. # =============================================================================
  33. # PROMPTS
  34. # Edit these to change how each collection extracts and stores facts.
  35. # =============================================================================
  36. PROMPTS = {
  37. # Used by /memories — conversational, user-centric recall for OpenClaw.
  38. "conversational": {
  39. "fact_extraction": """
  40. You are a personal memory assistant. Extract concise, standalone facts about
  41. the user from the conversation below. Write each fact as a single sentence
  42. starting with "User" — for example:
  43. - "User is interested in generative music."
  44. - "User is familiar with Python async patterns."
  45. - "User prefers dark mode interfaces."
  46. Only extract facts that are clearly stated or strongly implied. Ignore filler,
  47. greetings, and opinions the user is uncertain about.
  48. """.strip(),
  49. "update_memory": """
  50. You manage a long-term memory database for a personal AI assistant.
  51. You receive existing memories and new information. Update, merge, or add
  52. memories as needed. Keep each memory as a single concise sentence starting
  53. with "User". Remove duplicates and outdated facts.
  54. """.strip(),
  55. },
  56. # Used by /knowledge — objective, source-neutral facts for book/doc ingest.
  57. "knowledge": {
  58. "fact_extraction": """
  59. You are a knowledge extraction system that reads source material and produces
  60. a list of objective, encyclopedic facts. Write each fact as a precise,
  61. self-contained sentence. Do NOT reframe facts as user preferences or interests.
  62. Preserve names, terminology, and relationships exactly as they appear.
  63. Examples:
  64. - "Silvio Gesell proposed demurrage as a mechanism to discourage hoarding of currency."
  65. - "The MIDI standard uses a 7-bit checksum for SysEx message validation."
  66. Only extract verifiable facts. Ignore meta-commentary and transitional prose.
  67. """.strip(),
  68. "update_memory": """
  69. You manage a knowledge base that stores objective facts extracted from books,
  70. documents, and reference material. You receive existing facts and new
  71. information. Update, merge, or add facts as needed. Keep each fact as a
  72. precise, self-contained sentence. Remove duplicates and outdated entries.
  73. """.strip(),
  74. },
  75. }
  76. # =============================================================================
  77. # MEM0 CONFIG FACTORY
  78. # =============================================================================
  79. def make_config(collection_name: str, prompt_key: str) -> dict:
  80. return {
  81. "llm": {
  82. "provider": "groq",
  83. "config": {
  84. "model": "meta-llama/llama-4-scout-17b-16e-instruct",
  85. "temperature": 0.025,
  86. "max_tokens": 1500,
  87. },
  88. },
  89. "vector_store": {
  90. "provider": "chroma",
  91. "config": {
  92. "host": "192.168.0.200",
  93. "port": 8001,
  94. "collection_name": collection_name,
  95. },
  96. },
  97. "embedder": {
  98. "provider": "ollama",
  99. "config": {
  100. "model": "nomic-embed-text",
  101. "ollama_base_url": "http://192.168.0.200:11434",
  102. },
  103. },
  104. "custom_prompts": PROMPTS[prompt_key],
  105. }
  106. # =============================================================================
  107. # MEMORY INSTANCES
  108. # =============================================================================
  109. memory_conv = Memory.from_config(make_config("openclaw_mem", "conversational"))
  110. memory_know = Memory.from_config(make_config("knowledge_mem", "knowledge"))
  111. # =============================================================================
  112. # CHROMA EMPTY-FILTER PATCH (applied to both instances)
  113. # =============================================================================
  114. NOOP_WHERE = {"$and": [
  115. {"user_id": {"$ne": ""}},
  116. {"user_id": {"$ne": ""}},
  117. ]}
  118. def is_effectively_empty(filters) -> bool:
  119. if not filters:
  120. return True
  121. if filters in ({"AND": []}, {"OR": []}):
  122. return True
  123. return False
  124. def make_safe_search(mem_instance: Memory):
  125. orig = mem_instance.vector_store.search
  126. def safe_search(query, vectors, limit=10, filters=None):
  127. if is_effectively_empty(filters):
  128. return mem_instance.vector_store.collection.query(
  129. query_embeddings=vectors,
  130. n_results=limit,
  131. where=NOOP_WHERE,
  132. )
  133. try:
  134. return orig(query=query, vectors=vectors, limit=limit, filters=filters)
  135. except Exception as e:
  136. if "Expected where" in str(e):
  137. return mem_instance.vector_store.collection.query(
  138. query_embeddings=vectors,
  139. n_results=limit,
  140. where=NOOP_WHERE,
  141. )
  142. raise
  143. return safe_search
  144. memory_conv.vector_store.search = make_safe_search(memory_conv)
  145. memory_know.vector_store.search = make_safe_search(memory_know)
  146. # =============================================================================
  147. # RERANKER
  148. # =============================================================================
  149. def rerank_results(query: str, items: list, top_k: int) -> list:
  150. """Re-order results via local reranker. Falls back gracefully."""
  151. if not items:
  152. return items
  153. documents = [r.get("memory", "") for r in items]
  154. try:
  155. resp = httpx.post(
  156. RERANKER_URL,
  157. json={"query": query, "documents": documents, "top_k": top_k},
  158. timeout=5.0,
  159. )
  160. resp.raise_for_status()
  161. reranked = resp.json()["results"]
  162. except Exception as exc:
  163. print(f"[reranker] unavailable, skipping rerank: {exc}")
  164. return items[:top_k]
  165. text_to_meta = {r.get("memory", ""): r for r in items}
  166. merged = []
  167. for r in reranked:
  168. meta = text_to_meta.get(r["text"])
  169. if meta:
  170. merged.append({**meta, "rerank_score": r["score"]})
  171. return merged
  172. # =============================================================================
  173. # SQLITE HELPER
  174. # =============================================================================
  175. def sqlite_delete_ids(memory_ids: list[str]) -> int:
  176. """Delete rows from mem0 SQLite by memory_id. Returns count deleted."""
  177. if not memory_ids:
  178. return 0
  179. try:
  180. conn = sqlite3.connect(SQLITE_PATH)
  181. cur = conn.cursor()
  182. placeholders = ",".join("?" * len(memory_ids))
  183. cur.execute(
  184. f"DELETE FROM history WHERE memory_id IN ({placeholders})",
  185. memory_ids
  186. )
  187. deleted = cur.rowcount
  188. conn.commit()
  189. conn.close()
  190. return deleted
  191. except Exception as e:
  192. print(f"[sqlite] warning: {e}")
  193. return 0
  194. # =============================================================================
  195. # CHROMA PAGINATION HELPER
  196. # =============================================================================
  197. def chroma_get_all(collection, user_id: str, include: list = None) -> list[dict]:
  198. """
  199. Page through a Chroma collection in batches, filtering by user_id.
  200. Returns list of dicts with 'id' and any included fields.
  201. Bypasses mem0's 100-entry cap entirely.
  202. """
  203. if include is None:
  204. include = ["metadatas"]
  205. results = []
  206. batch = 500
  207. offset = 0
  208. while True:
  209. page = collection.get(
  210. where={"user_id": {"$eq": user_id}},
  211. limit=batch,
  212. offset=offset,
  213. include=include,
  214. )
  215. ids = page.get("ids", [])
  216. if not ids:
  217. break
  218. for i, id_ in enumerate(ids):
  219. row = {"id": id_}
  220. for field in include:
  221. values = page.get(field, [])
  222. row[field[:-1]] = values[i] if i < len(values) else None
  223. results.append(row)
  224. offset += len(ids)
  225. if len(ids) < batch:
  226. break
  227. return results
  228. # =============================================================================
  229. # SHARED HANDLERS
  230. # =============================================================================
  231. def extract_user_id(data: dict) -> str:
  232. return data.get("userId") or data.get("user_id") or "default"
  233. async def handle_add(req: Request, mem: Memory, verbatim_allowed: bool = False):
  234. """
  235. Shared add handler for /memories and /knowledge.
  236. /knowledge (verbatim_allowed=True) — always stores verbatim (infer=False).
  237. The ingestor already summarised; skip
  238. the second LLM pass.
  239. /memories (verbatim_allowed=False) — always uses LLM extraction for
  240. conversational recall.
  241. Supports:
  242. - text — raw string (legacy)
  243. - messages — list of {role, content} dicts (standard mem0)
  244. - metadata — dict, passed through to mem0
  245. - user_id / userId
  246. """
  247. data = await req.json()
  248. user_id = extract_user_id(data)
  249. metadata = data.get("metadata") or {}
  250. messages = data.get("messages")
  251. text = data.get("text")
  252. if not messages and not text:
  253. return SafeJSONResponse(
  254. content={"error": "Provide 'text' or 'messages'"}, status_code=400
  255. )
  256. if verbatim_allowed:
  257. # /knowledge — always verbatim, ingestor already summarised
  258. content = text or " ".join(
  259. m["content"] for m in messages if m.get("role") == "user"
  260. )
  261. result = mem.add(content, user_id=user_id, metadata=metadata, infer=False)
  262. print(f"[add verbatim] user={user_id} chars={len(content)} meta={metadata}")
  263. return SafeJSONResponse(content=result)
  264. # /memories — always LLM extraction
  265. if messages:
  266. result = mem.add(messages, user_id=user_id, metadata=metadata)
  267. else:
  268. result = mem.add(text, user_id=user_id, metadata=metadata)
  269. print(f"[add conversational] user={user_id} meta={metadata}")
  270. return SafeJSONResponse(content=result)
  271. async def handle_search(req: Request, mem: Memory):
  272. data = await req.json()
  273. query = (data.get("query") or "").strip()
  274. user_id = extract_user_id(data)
  275. limit = int(data.get("limit", 5))
  276. if not query:
  277. return SafeJSONResponse(content={"results": []})
  278. fetch_k = max(limit * 3, 15)
  279. try:
  280. result = mem.search(query, user_id=user_id, limit=fetch_k)
  281. except Exception:
  282. all_res = mem.get_all(user_id=user_id)
  283. items = (
  284. all_res.get("results", [])
  285. if isinstance(all_res, dict)
  286. else (all_res if isinstance(all_res, list) else [])
  287. )
  288. q = query.lower()
  289. items = [r for r in items if q in r.get("memory", "").lower()]
  290. result = {"results": items}
  291. items = result.get("results", [])
  292. items = rerank_results(query, items, top_k=limit)
  293. print(f"[search] user={user_id} query={query!r} hits={len(items)}")
  294. return SafeJSONResponse(content={"results": items})
  295. async def handle_recent(req: Request, mem: Memory):
  296. data = await req.json()
  297. user_id = extract_user_id(data)
  298. if not user_id:
  299. return SafeJSONResponse(content={"error": "Missing userId"}, status_code=400)
  300. limit = int(data.get("limit", 5))
  301. try:
  302. results = mem.get_all(user_id=user_id)
  303. except Exception:
  304. results = mem.search(query="recent", user_id=user_id)
  305. items = results.get("results", [])
  306. items = sorted(items, key=lambda r: r.get("created_at", ""), reverse=True)
  307. return SafeJSONResponse(content={"results": items[:limit]})
  308. # =============================================================================
  309. # APP
  310. # =============================================================================
  311. app = FastAPI(title="mem0 server")
  312. # ---------------------------------------------------------------------------
  313. # DASHBOARD
  314. # ---------------------------------------------------------------------------
  315. DASHBOARD_HTML = open("dashboard.html").read()
  316. @app.get("/dashboard")
  317. async def dashboard():
  318. return HTMLResponse(content=DASHBOARD_HTML)
  319. # ---------------------------------------------------------------------------
  320. # HEALTH
  321. # ---------------------------------------------------------------------------
  322. @app.get("/health")
  323. async def health():
  324. return SafeJSONResponse(content={
  325. "status": "ok",
  326. "reranker_url": RERANKER_URL,
  327. "collections": {
  328. "conversational": "openclaw_mem",
  329. "knowledge": "knowledge_mem",
  330. },
  331. "prompts": {
  332. k: {pk: pv[:80] + "…" for pk, pv in pv_dict.items()}
  333. for k, pv_dict in PROMPTS.items()
  334. },
  335. })
  336. # ---------------------------------------------------------------------------
  337. # /memories — conversational, OpenClaw
  338. # ---------------------------------------------------------------------------
  339. @app.post("/memories")
  340. async def add_memory(req: Request):
  341. return await handle_add(req, memory_conv, verbatim_allowed=False)
  342. @app.post("/memories/search")
  343. async def search_memories(req: Request):
  344. return await handle_search(req, memory_conv)
  345. @app.post("/memories/recent")
  346. async def recent_memories(req: Request):
  347. return await handle_recent(req, memory_conv)
  348. @app.delete("/memories")
  349. async def delete_memory(req: Request):
  350. data = await req.json()
  351. return SafeJSONResponse(content=memory_conv.delete(data.get("filter", {})))
  352. # ---------------------------------------------------------------------------
  353. # /knowledge — objective facts, book-ingestor
  354. # ---------------------------------------------------------------------------
  355. @app.post("/knowledge")
  356. async def add_knowledge(req: Request):
  357. return await handle_add(req, memory_know, verbatim_allowed=True)
  358. @app.post("/knowledge/search")
  359. async def search_knowledge(req: Request):
  360. return await handle_search(req, memory_know)
  361. @app.post("/knowledge/recent")
  362. async def recent_knowledge(req: Request):
  363. return await handle_recent(req, memory_know)
  364. @app.delete("/knowledge")
  365. async def delete_knowledge(req: Request):
  366. data = await req.json()
  367. return SafeJSONResponse(content=memory_know.delete(data.get("filter", {})))
  368. @app.post("/knowledge/sources")
  369. async def knowledge_sources(req: Request):
  370. """
  371. Return distinct source_file values with entry counts.
  372. Pages through Chroma directly — no mem0 100-entry cap.
  373. """
  374. data = await req.json()
  375. user_id = extract_user_id(data) or "knowledge_base"
  376. rows = chroma_get_all(memory_know.vector_store.collection, user_id)
  377. counts = {}
  378. for row in rows:
  379. src = (row.get("metadata") or {}).get("source_file", "(no source)")
  380. counts[src] = counts.get(src, 0) + 1
  381. sources = [
  382. {"source_file": k, "count": v}
  383. for k, v in sorted(counts.items(), key=lambda x: -x[1])
  384. ]
  385. print(f"[sources] user={user_id} total={len(rows)} books={len(sources)}")
  386. return SafeJSONResponse(content={"sources": sources, "total": len(rows)})
  387. @app.delete("/knowledge/by-source")
  388. async def delete_knowledge_by_source(req: Request):
  389. """
  390. Delete all knowledge entries for a given source_file.
  391. Pages through Chroma directly, then cleans SQLite.
  392. """
  393. data = await req.json()
  394. source_file = data.get("source_file")
  395. user_id = extract_user_id(data) or "knowledge_base"
  396. if not source_file:
  397. return SafeJSONResponse(
  398. content={"error": "Missing source_file"}, status_code=400
  399. )
  400. rows = chroma_get_all(memory_know.vector_store.collection, user_id)
  401. to_delete = [
  402. row["id"] for row in rows
  403. if (row.get("metadata") or {}).get("source_file") == source_file
  404. ]
  405. if not to_delete:
  406. return SafeJSONResponse(
  407. content={"deleted": 0, "message": "no entries found for that source"}
  408. )
  409. # 1. Chroma bulk delete
  410. try:
  411. memory_know.vector_store.collection.delete(ids=to_delete)
  412. except Exception as e:
  413. return SafeJSONResponse(
  414. content={"error": f"chroma delete failed: {e}"}, status_code=500
  415. )
  416. # 2. SQLite cleanup
  417. sqlite_deleted = sqlite_delete_ids(to_delete)
  418. print(f"[delete by-source] source={source_file} "
  419. f"chroma={len(to_delete)} sqlite={sqlite_deleted}")
  420. return SafeJSONResponse(content={
  421. "deleted": len(to_delete),
  422. "sqlite_deleted": sqlite_deleted,
  423. "source_file": source_file,
  424. })
  425. # ---------------------------------------------------------------------------
  426. # /memory/{id} — single entry delete (knowledge or conversational)
  427. # ---------------------------------------------------------------------------
  428. @app.delete("/memory/{memory_id}")
  429. async def delete_single_memory(memory_id: str, req: Request):
  430. """
  431. Delete a single memory by ID from either collection.
  432. Body: { "collection": "knowledge" | "conversational" }
  433. Cleans both Chroma and SQLite.
  434. """
  435. data = await req.json()
  436. collection = data.get("collection", "knowledge")
  437. mem = memory_know if collection == "knowledge" else memory_conv
  438. # 1. Chroma delete
  439. try:
  440. mem.vector_store.collection.delete(ids=[memory_id])
  441. except Exception as e:
  442. return SafeJSONResponse(
  443. content={"error": f"chroma delete failed: {e}"}, status_code=500
  444. )
  445. # 2. SQLite cleanup
  446. sqlite_delete_ids([memory_id])
  447. print(f"[delete single] id={memory_id} collection={collection}")
  448. return SafeJSONResponse(content={"deleted": memory_id})
  449. # ---------------------------------------------------------------------------
  450. # /search — merged results from both collections (OpenClaw autorecall)
  451. # ---------------------------------------------------------------------------
  452. @app.post("/search")
  453. async def search_all(req: Request):
  454. """
  455. Query both collections and merge results.
  456. Results tagged with _source: conversational | knowledge.
  457. Accepts same payload as /memories/search.
  458. """
  459. data = await req.json()
  460. query = (data.get("query") or "").strip()
  461. user_id = extract_user_id(data)
  462. limit = int(data.get("limit", 5))
  463. if not query:
  464. return SafeJSONResponse(content={"results": []})
  465. fetch_k = max(limit * 3, 15)
  466. def fetch(mem: Memory, tag: str):
  467. try:
  468. r = mem.search(query, user_id=user_id, limit=fetch_k)
  469. items = r.get("results", [])
  470. except Exception:
  471. items = []
  472. for item in items:
  473. item["_source"] = tag
  474. return items
  475. conv_items = fetch(memory_conv, "conversational")
  476. know_items = fetch(memory_know, "knowledge")
  477. merged = rerank_results(query, conv_items + know_items, top_k=limit)
  478. print(
  479. f"[search/all] user={user_id} query={query!r} "
  480. f"conv={len(conv_items)} know={len(know_items)} merged={len(merged)}"
  481. )
  482. return SafeJSONResponse(content={"results": merged})