import os import math import json import sqlite3 import httpx from fastapi import FastAPI, Request from fastapi.responses import JSONResponse, HTMLResponse from mem0 import Memory # ============================================================================= # ENVIRONMENT # ============================================================================= GROQ_API_KEY = os.environ.get("GROQ_API_KEY") if not GROQ_API_KEY: raise RuntimeError("GROQ_API_KEY environment variable is not set.") RERANKER_URL = os.environ.get("RERANKER_URL", "http://192.168.0.200:5200/rerank") SQLITE_PATH = os.path.expanduser("~/.mem0/history.db") # ============================================================================= # SAFE JSON RESPONSE # Chroma and the reranker can emit Infinity/NaN which is invalid JSON. # Sanitize them to None before serializing. # ============================================================================= def _sanitize(obj): if isinstance(obj, float): if math.isnan(obj) or math.isinf(obj): return None if isinstance(obj, dict): return {k: _sanitize(v) for k, v in obj.items()} if isinstance(obj, list): return [_sanitize(i) for i in obj] return obj class SafeJSONResponse(JSONResponse): def render(self, content) -> bytes: return json.dumps(_sanitize(content), ensure_ascii=False).encode("utf-8") # ============================================================================= # METADATA SANITIZER # Chroma MetadataValue only accepts str, int, float, bool. # Drop None values; coerce anything else (lists, dicts) to str. # ============================================================================= def sanitize_metadata(meta: dict) -> dict: clean = {} for k, v in meta.items(): if v is None: continue if isinstance(v, (str, int, float, bool)): clean[k] = v else: clean[k] = str(v) return clean # ============================================================================= # PROMPTS # Mapped to MemoryConfig.custom_fact_extraction_prompt / # MemoryConfig.custom_update_memory_prompt (top-level fields). # # conversational — active, used by /memories on every add # knowledge — defined for future use; currently bypassed because # /knowledge always stores verbatim (infer=False) # ============================================================================= PROMPTS = { "conversational": { "fact_extraction": """ You are an intelligent system that extracts useful long-term memory from a conversation. Your goal is to identify information that could help future interactions. Extract facts that describe: 1. User preferences 2. Important decisions 3. Ongoing projects 4. Tools or technologies being used 5. Goals or plans 6. Constraints or requirements 7. Discoveries or conclusions 8. Important context about tasks Ignore: - greetings - casual conversation - general world knowledge - temporary statements Return JSON: { "facts": [ "fact 1", "fact 2" ] } Only include information that may be useful later. If nothing important is present return: {"facts": []} """.strip(), "update_memory": """ You manage a long-term memory database. You receive: 1. existing stored memories 2. new extracted facts For each fact decide whether to: ADD Create a new memory if it contains useful new information. UPDATE Modify an existing memory if the new fact refines or corrects it. DELETE Remove a memory if it is clearly outdated or incorrect. NONE Ignore the fact if it is redundant or trivial. Guidelines: - Prefer updating over adding duplicates - Keep memories concise - Avoid storing repeated information - Preserve important context Return JSON list: [ { "event": "ADD", "text": "..." }, { "event": "UPDATE", "id": "...", "text": "..." } ] """.strip(), }, "knowledge": { # Not active during ingest (infer=False bypasses extraction). # Kept here so it can be enabled if infer=True is ever needed. "fact_extraction": """ You are a knowledge extraction system that reads source material and produces a list of objective, encyclopedic facts. Write each fact as a precise, self-contained sentence. Do NOT reframe facts as user preferences or interests. Preserve names, terminology, and relationships exactly as they appear. Examples: - "Silvio Gesell proposed demurrage as a mechanism to discourage hoarding of currency." - "The MIDI standard uses a 7-bit checksum for SysEx message validation." Only extract verifiable facts. Ignore meta-commentary and transitional prose. Return JSON: {"facts": ["fact 1", "fact 2"]} """.strip(), "update_memory": """ You manage a knowledge base that stores objective facts extracted from books, documents, and reference material. You receive existing facts and new information. Update, merge, or add facts as needed. Keep each fact as a precise, self-contained sentence. Remove duplicates and outdated entries. Return JSON list: [{ "event": "ADD"|"UPDATE"|"DELETE"|"NONE", "text": "..." }] """.strip(), }, } # ============================================================================= # MEM0 CONFIG FACTORY # Prompts are top-level MemoryConfig fields — not nested inside llm.config. # ============================================================================= def make_config(collection_name: str, prompt_key: str) -> dict: prompts = PROMPTS[prompt_key] return { "llm": { "provider": "groq", "config": { "model": "meta-llama/llama-4-scout-17b-16e-instruct", "temperature": 0.025, "max_tokens": 1500, }, }, "vector_store": { "provider": "chroma", "config": { "host": "192.168.0.200", "port": 8001, "collection_name": collection_name, }, }, "embedder": { "provider": "ollama", "config": { "model": "nomic-embed-text", "ollama_base_url": "http://192.168.0.200:11434", }, }, # Top-level MemoryConfig fields — confirmed from MemoryConfig source "custom_fact_extraction_prompt": prompts["fact_extraction"], "custom_update_memory_prompt": prompts["update_memory"], } # ============================================================================= # MEMORY INSTANCES # ============================================================================= memory_conv = Memory.from_config(make_config("openclaw_mem", "conversational")) memory_know = Memory.from_config(make_config("knowledge_mem", "knowledge")) # ============================================================================= # CHROMA EMPTY-FILTER PATCH # mem0 sometimes passes an empty filter dict to Chroma which raises an error. # Replace with a harmless always-true filter as fallback. # ============================================================================= NOOP_WHERE = {"$and": [ {"user_id": {"$ne": ""}}, {"user_id": {"$ne": ""}}, ]} def is_effectively_empty(filters) -> bool: if not filters: return True if filters in ({"AND": []}, {"OR": []}): return True return False def make_safe_search(mem_instance: Memory): orig = mem_instance.vector_store.search def safe_search(query, vectors, limit=10, filters=None): if is_effectively_empty(filters): return mem_instance.vector_store.collection.query( query_embeddings=vectors, n_results=limit, where=NOOP_WHERE, ) try: return orig(query=query, vectors=vectors, limit=limit, filters=filters) except Exception as e: if "Expected where" in str(e): return mem_instance.vector_store.collection.query( query_embeddings=vectors, n_results=limit, where=NOOP_WHERE, ) raise return safe_search memory_conv.vector_store.search = make_safe_search(memory_conv) memory_know.vector_store.search = make_safe_search(memory_know) # ============================================================================= # RERANKER # Calls local reranker to re-order search results by relevance. # Falls back to raw mem0 order if unreachable. # ============================================================================= def rerank_results(query: str, items: list, top_k: int) -> list: if not items: return items documents = [r.get("memory", "") for r in items] try: resp = httpx.post( RERANKER_URL, json={"query": query, "documents": documents, "top_k": top_k}, timeout=5.0, ) resp.raise_for_status() reranked = resp.json()["results"] except Exception as exc: print(f"[reranker] unavailable, skipping: {exc}") return items[:top_k] # Re-attach original mem0 metadata by matching text text_to_meta = {r.get("memory", ""): r for r in items} merged = [] for r in reranked: meta = text_to_meta.get(r["text"]) if meta: merged.append({**meta, "rerank_score": r["score"]}) return merged # ============================================================================= # SQLITE HELPER # mem0 maintains a local SQLite history alongside Chroma. # Both must be cleaned together or deleted entries reappear after restart. # ============================================================================= def sqlite_delete_ids(memory_ids: list[str]) -> int: """Delete rows by memory_id. Returns count deleted.""" if not memory_ids: return 0 try: conn = sqlite3.connect(SQLITE_PATH) cur = conn.cursor() placeholders = ",".join("?" * len(memory_ids)) cur.execute( f"DELETE FROM history WHERE memory_id IN ({placeholders})", memory_ids ) deleted = cur.rowcount conn.commit() conn.close() return deleted except Exception as e: print(f"[sqlite] warning: {e}") return 0 # ============================================================================= # CHROMA PAGINATION HELPER # mem0's get_all() is capped at 100 entries. This pages Chroma directly # in batches of 500 to retrieve the full collection without limits. # ============================================================================= def chroma_get_all(collection, user_id: str, include: list = None) -> list[dict]: if include is None: include = ["metadatas"] results = [] batch = 500 offset = 0 while True: page = collection.get( where={"user_id": {"$eq": user_id}}, limit=batch, offset=offset, include=include, ) ids = page.get("ids", []) if not ids: break for i, id_ in enumerate(ids): row = {"id": id_} for field in include: values = page.get(field, []) row[field[:-1]] = values[i] if i < len(values) else None results.append(row) offset += len(ids) if len(ids) < batch: break return results # ============================================================================= # SHARED HANDLERS # ============================================================================= def extract_user_id(data: dict) -> str: return data.get("userId") or data.get("user_id") or "default" async def handle_add(req: Request, mem: Memory, verbatim_allowed: bool = False): """ Shared add handler for /memories and /knowledge. /knowledge (verbatim_allowed=True) — always infer=False. The ingestor already summarised; skip the second LLM extraction pass. /memories (verbatim_allowed=False) — always LLM extraction using the conversational prompts above. Accepts: text | messages, user_id, metadata. Metadata is sanitized — Chroma rejects None and complex types. """ data = await req.json() user_id = extract_user_id(data) # metadata = sanitize_metadata(data.get("metadata") or {}) raw_meta = data.get("metadata") metadata = sanitize_metadata(raw_meta) if raw_meta else None messages = data.get("messages") text = data.get("text") if not messages and not text: return SafeJSONResponse( content={"error": "Provide 'text' or 'messages'"}, status_code=400 ) if verbatim_allowed: # /knowledge — store verbatim, ingestor already did the summarisation content = text or " ".join( m["content"] for m in messages if m.get("role") == "user" ) result = mem.add(content, user_id=user_id, metadata=metadata, infer=False) print(f"[add verbatim] user={user_id} chars={len(content)} meta={metadata}") return SafeJSONResponse(content=result) # in the /memories path kwargs = {"user_id": user_id} if metadata: kwargs["metadata"] = metadata result = mem.add(messages or text, **kwargs) # # /memories — LLM extracts and deduplicates facts from conversation # if messages: # result = mem.add(messages, user_id=user_id) # else: # result = mem.add(text, user_id=user_id) print(f"[add conversational] user={user_id} meta={metadata}") return SafeJSONResponse(content=result) async def handle_search(req: Request, mem: Memory): """Semantic search with reranking. Fetches limit×3 candidates then reranks.""" data = await req.json() query = (data.get("query") or "").strip() user_id = extract_user_id(data) limit = int(data.get("limit", 5)) if not query: return SafeJSONResponse(content={"results": []}) fetch_k = max(limit * 3, 15) try: result = mem.search(query, user_id=user_id, limit=fetch_k) except Exception: # Fallback: get_all + simple text filter all_res = mem.get_all(user_id=user_id) items = ( all_res.get("results", []) if isinstance(all_res, dict) else (all_res if isinstance(all_res, list) else []) ) q = query.lower() items = [r for r in items if q in r.get("memory", "").lower()] result = {"results": items} items = result.get("results", []) items = rerank_results(query, items, top_k=limit) print(f"[search] user={user_id} query={query!r} hits={len(items)}") return SafeJSONResponse(content={"results": items}) async def handle_recent(req: Request, mem: Memory): """Return most recently created memories, sorted by created_at desc.""" data = await req.json() user_id = extract_user_id(data) if not user_id: return SafeJSONResponse(content={"error": "Missing userId"}, status_code=400) limit = int(data.get("limit", 5)) try: results = mem.get_all(user_id=user_id) except Exception: results = mem.search(query="recent", user_id=user_id) items = results.get("results", []) items = sorted(items, key=lambda r: r.get("created_at", ""), reverse=True) return SafeJSONResponse(content={"results": items[:limit]}) # ============================================================================= # APP # ============================================================================= app = FastAPI(title="mem0 server") # --------------------------------------------------------------------------- # DASHBOARD — served from file mounted via docker-compose volume # --------------------------------------------------------------------------- DASHBOARD_HTML = open("dashboard.html").read() @app.get("/dashboard") async def dashboard(): return HTMLResponse(content=DASHBOARD_HTML) # --------------------------------------------------------------------------- # HEALTH # --------------------------------------------------------------------------- @app.get("/health") async def health(): return SafeJSONResponse(content={ "status": "ok", "reranker_url": RERANKER_URL, "collections": { "conversational": "openclaw_mem", "knowledge": "knowledge_mem", }, # Show first 80 chars of each prompt for quick verification "prompts": { k: {pk: pv[:80] + "…" for pk, pv in pv_dict.items()} for k, pv_dict in PROMPTS.items() }, }) # --------------------------------------------------------------------------- # /memories — conversational collection (OpenClaw) # --------------------------------------------------------------------------- @app.post("/memories") async def add_memory(req: Request): return await handle_add(req, memory_conv, verbatim_allowed=False) @app.post("/memories/search") async def search_memories(req: Request): return await handle_search(req, memory_conv) @app.post("/memories/recent") async def recent_memories(req: Request): return await handle_recent(req, memory_conv) @app.delete("/memories") async def delete_memory(req: Request): data = await req.json() return SafeJSONResponse(content=memory_conv.delete(data.get("filter", {}))) # --------------------------------------------------------------------------- # /knowledge — objective facts collection (book-ingestor) # --------------------------------------------------------------------------- @app.post("/knowledge") async def add_knowledge(req: Request): return await handle_add(req, memory_know, verbatim_allowed=True) @app.post("/knowledge/search") async def search_knowledge(req: Request): return await handle_search(req, memory_know) @app.post("/knowledge/recent") async def recent_knowledge(req: Request): return await handle_recent(req, memory_know) @app.delete("/knowledge") async def delete_knowledge(req: Request): data = await req.json() return SafeJSONResponse(content=memory_know.delete(data.get("filter", {}))) @app.post("/knowledge/sources") async def knowledge_sources(req: Request): """ Return distinct source_file values with entry counts. Pages Chroma directly — bypasses mem0's 100-entry get_all cap. """ data = await req.json() user_id = extract_user_id(data) or "knowledge_base" rows = chroma_get_all(memory_know.vector_store.collection, user_id) counts = {} for row in rows: src = (row.get("metadata") or {}).get("source_file", "(no source)") counts[src] = counts.get(src, 0) + 1 sources = [ {"source_file": k, "count": v} for k, v in sorted(counts.items(), key=lambda x: -x[1]) ] print(f"[sources] user={user_id} total={len(rows)} books={len(sources)}") return SafeJSONResponse(content={"sources": sources, "total": len(rows)}) @app.delete("/knowledge/by-source") async def delete_knowledge_by_source(req: Request): """ Delete all entries for a given source_file from both Chroma and SQLite. Pages Chroma directly to avoid the 100-entry cap on get_all. """ data = await req.json() source_file = data.get("source_file") user_id = extract_user_id(data) or "knowledge_base" if not source_file: return SafeJSONResponse( content={"error": "Missing source_file"}, status_code=400 ) # Collect all IDs matching source_file across all pages rows = chroma_get_all(memory_know.vector_store.collection, user_id) to_delete = [ row["id"] for row in rows if (row.get("metadata") or {}).get("source_file") == source_file ] if not to_delete: return SafeJSONResponse( content={"deleted": 0, "message": "no entries found for that source"} ) # Delete from Chroma in one bulk call try: memory_know.vector_store.collection.delete(ids=to_delete) except Exception as e: return SafeJSONResponse( content={"error": f"chroma delete failed: {e}"}, status_code=500 ) # Clean SQLite so entries don't reappear after server restart sqlite_deleted = sqlite_delete_ids(to_delete) print(f"[delete by-source] source={source_file} " f"chroma={len(to_delete)} sqlite={sqlite_deleted}") return SafeJSONResponse(content={ "deleted": len(to_delete), "sqlite_deleted": sqlite_deleted, "source_file": source_file, }) # --------------------------------------------------------------------------- # /memory/{id} — single entry delete for dashboard per-row buttons # --------------------------------------------------------------------------- @app.delete("/memory/{memory_id}") async def delete_single_memory(memory_id: str, req: Request): """ Delete one memory by ID from either collection. Body: { "collection": "knowledge" | "conversational" } Cleans both Chroma and SQLite. """ data = await req.json() collection = data.get("collection", "knowledge") mem = memory_know if collection == "knowledge" else memory_conv try: mem.vector_store.collection.delete(ids=[memory_id]) except Exception as e: return SafeJSONResponse( content={"error": f"chroma delete failed: {e}"}, status_code=500 ) sqlite_delete_ids([memory_id]) print(f"[delete single] id={memory_id} collection={collection}") return SafeJSONResponse(content={"deleted": memory_id}) # --------------------------------------------------------------------------- # /search — merged results from both collections (OpenClaw autorecall) # --------------------------------------------------------------------------- @app.post("/search") async def search_all(req: Request): """ Query both collections simultaneously, tag results with _source, then run a single rerank pass over the merged pool. """ data = await req.json() query = (data.get("query") or "").strip() user_id = extract_user_id(data) limit = int(data.get("limit", 5)) if not query: return SafeJSONResponse(content={"results": []}) fetch_k = max(limit * 3, 15) def fetch(mem: Memory, tag: str): try: r = mem.search(query, user_id=user_id, limit=fetch_k) items = r.get("results", []) except Exception: items = [] for item in items: item["_source"] = tag return items conv_items = fetch(memory_conv, "conversational") know_items = fetch(memory_know, "knowledge") merged = rerank_results(query, conv_items + know_items, top_k=limit) print( f"[search/all] user={user_id} query={query!r} " f"conv={len(conv_items)} know={len(know_items)} merged={len(merged)}" ) return SafeJSONResponse(content={"results": merged}) @app.post("/memories/all") async def memories_all(req: Request): """ Return all memories for a user, paging Chroma directly. Bypasses mem0's 100-entry get_all cap. """ data = await req.json() user_id = extract_user_id(data) or "main" rows = chroma_get_all( memory_conv.vector_store.collection, user_id, include=["metadatas", "documents"] ) items = [] for row in rows: meta = row.get("metadata") or {} items.append({ "id": row["id"], "memory": row.get("document") or meta.get("data", ""), "created_at": meta.get("created_at"), "metadata": meta, "user_id": user_id, }) items.sort(key=lambda r: r.get("created_at") or "", reverse=True) print(f"[memories/all] user={user_id} total={len(items)}") return SafeJSONResponse(content={"results": items})