import os import math import json import httpx from fastapi import FastAPI, Request from fastapi.responses import JSONResponse from mem0 import Memory # ============================================================================= # ENVIRONMENT # ============================================================================= GROQ_API_KEY = os.environ.get("GROQ_API_KEY") if not GROQ_API_KEY: raise RuntimeError("GROQ_API_KEY environment variable is not set.") RERANKER_URL = os.environ.get("RERANKER_URL", "http://192.168.0.200:5200/rerank") # ============================================================================= # SAFE JSON RESPONSE (handles Infinity / NaN from Chroma / reranker scores) # ============================================================================= def _sanitize(obj): if isinstance(obj, float): if math.isnan(obj) or math.isinf(obj): return None if isinstance(obj, dict): return {k: _sanitize(v) for k, v in obj.items()} if isinstance(obj, list): return [_sanitize(i) for i in obj] return obj class SafeJSONResponse(JSONResponse): def render(self, content) -> bytes: return json.dumps( _sanitize(content), ensure_ascii=False ).encode("utf-8") # ============================================================================= # PROMPTS # Edit these to change how each collection extracts and stores facts. # ============================================================================= PROMPTS = { # Used by /memories — conversational, user-centric recall for OpenClaw. "conversational": { "fact_extraction": """ You are a personal memory assistant. Extract concise, standalone facts about the user from the conversation below. Write each fact as a single sentence starting with "User" — for example: - "User is interested in generative music." - "User is familiar with Python async patterns." - "User prefers dark mode interfaces." Only extract facts that are clearly stated or strongly implied. Ignore filler, greetings, and opinions the user is uncertain about. """.strip(), "update_memory": """ You manage a long-term memory database for a personal AI assistant. You receive existing memories and new information. Update, merge, or add memories as needed. Keep each memory as a single concise sentence starting with "User". Remove duplicates and outdated facts. """.strip(), }, # Used by /knowledge — objective, source-neutral facts for book/doc ingest. "knowledge": { "fact_extraction": """ You are a knowledge extraction system that reads source material and produces a list of objective, encyclopedic facts. Write each fact as a precise, self-contained sentence. Do NOT reframe facts as user preferences or interests. Preserve names, terminology, and relationships exactly as they appear. Examples: - "Silvio Gesell proposed demurrage as a mechanism to discourage hoarding of currency." - "The MIDI standard uses a 7-bit checksum for SysEx message validation." Only extract verifiable facts. Ignore meta-commentary and transitional prose. """.strip(), "update_memory": """ You manage a knowledge base that stores objective facts extracted from books, documents, and reference material. You receive existing facts and new information. Update, merge, or add facts as needed. Keep each fact as a precise, self-contained sentence. Remove duplicates and outdated entries. """.strip(), }, } # ============================================================================= # MEM0 CONFIG FACTORY # ============================================================================= def make_config(collection_name: str, prompt_key: str) -> dict: return { "llm": { "provider": "groq", "config": { "model": "meta-llama/llama-4-scout-17b-16e-instruct", "temperature": 0.025, "max_tokens": 1500, }, }, "vector_store": { "provider": "chroma", "config": { "host": "192.168.0.200", "port": 8001, "collection_name": collection_name, }, }, "embedder": { "provider": "ollama", "config": { "model": "nomic-embed-text", "ollama_base_url": "http://192.168.0.200:11434", }, }, "custom_prompts": PROMPTS[prompt_key], } # ============================================================================= # MEMORY INSTANCES # ============================================================================= memory_conv = Memory.from_config(make_config("openclaw_mem", "conversational")) memory_know = Memory.from_config(make_config("knowledge_mem", "knowledge")) # ============================================================================= # CHROMA EMPTY-FILTER PATCH (applied to both instances) # ============================================================================= NOOP_WHERE = {"$and": [ {"user_id": {"$ne": ""}}, {"user_id": {"$ne": ""}}, ]} def is_effectively_empty(filters) -> bool: if not filters: return True if filters in ({"AND": []}, {"OR": []}): return True return False def make_safe_search(mem_instance: Memory): orig = mem_instance.vector_store.search def safe_search(query, vectors, limit=10, filters=None): if is_effectively_empty(filters): return mem_instance.vector_store.collection.query( query_embeddings=vectors, n_results=limit, where=NOOP_WHERE, ) try: return orig(query=query, vectors=vectors, limit=limit, filters=filters) except Exception as e: if "Expected where" in str(e): return mem_instance.vector_store.collection.query( query_embeddings=vectors, n_results=limit, where=NOOP_WHERE, ) raise return safe_search memory_conv.vector_store.search = make_safe_search(memory_conv) memory_know.vector_store.search = make_safe_search(memory_know) # ============================================================================= # RERANKER # ============================================================================= def rerank_results(query: str, items: list, top_k: int) -> list: """Re-order results via local reranker. Falls back gracefully.""" if not items: return items documents = [r.get("memory", "") for r in items] try: resp = httpx.post( RERANKER_URL, json={"query": query, "documents": documents, "top_k": top_k}, timeout=5.0, ) resp.raise_for_status() reranked = resp.json()["results"] except Exception as exc: print(f"[reranker] unavailable, skipping rerank: {exc}") return items[:top_k] text_to_meta = {r.get("memory", ""): r for r in items} merged = [] for r in reranked: meta = text_to_meta.get(r["text"]) if meta: merged.append({**meta, "rerank_score": r["score"]}) return merged # ============================================================================= # SHARED HELPERS # ============================================================================= def extract_user_id(data: dict) -> str: return data.get("userId") or data.get("user_id") or "default" async def handle_add(req: Request, mem: Memory, verbatim_allowed: bool = False): """ Shared add handler for /memories and /knowledge. Supports: - text — raw string (legacy) - messages — list of {role, content} dicts (standard mem0) - infer — bool, default True. If False and verbatim_allowed=True, stores content without LLM extraction. - metadata — dict, passed through to mem0 - user_id / userId """ data = await req.json() user_id = extract_user_id(data) metadata = data.get("metadata") or {} infer = data.get("infer", True) messages = data.get("messages") text = data.get("text") if not messages and not text: return SafeJSONResponse( content={"error": "Provide 'text' or 'messages'"}, status_code=400 ) # infer:false — store verbatim (knowledge collection only) if verbatim_allowed and not infer: content = text or " ".join( m["content"] for m in messages if m.get("role") == "user" ) result = mem.add(content, user_id=user_id, metadata=metadata, infer=False) print(f"[add verbatim] user={user_id} chars={len(content)} meta={metadata}") return SafeJSONResponse(content=result) # Normal path — LLM extraction if messages: result = mem.add(messages, user_id=user_id, metadata=metadata) else: result = mem.add(text, user_id=user_id, metadata=metadata) print(f"[add] user={user_id} infer=True meta={metadata}") return SafeJSONResponse(content=result) async def handle_search(req: Request, mem: Memory): data = await req.json() query = (data.get("query") or "").strip() user_id = extract_user_id(data) limit = int(data.get("limit", 5)) if not query: return SafeJSONResponse(content={"results": []}) fetch_k = max(limit * 3, 15) try: result = mem.search(query, user_id=user_id, limit=fetch_k) except Exception: all_res = mem.get_all(user_id=user_id) items = ( all_res.get("results", []) if isinstance(all_res, dict) else (all_res if isinstance(all_res, list) else []) ) q = query.lower() items = [r for r in items if q in r.get("memory", "").lower()] result = {"results": items} items = result.get("results", []) items = rerank_results(query, items, top_k=limit) print(f"[search] user={user_id} query={query!r} hits={len(items)}") return SafeJSONResponse(content={"results": items}) async def handle_recent(req: Request, mem: Memory): data = await req.json() user_id = extract_user_id(data) if not user_id: return SafeJSONResponse(content={"error": "Missing userId"}, status_code=400) limit = int(data.get("limit", 5)) try: results = mem.get_all(user_id=user_id) except Exception: results = mem.search(query="recent", user_id=user_id) items = results.get("results", []) items = sorted(items, key=lambda r: r.get("created_at", ""), reverse=True) return SafeJSONResponse(content={"results": items[:limit]}) # ============================================================================= # APP # ============================================================================= app = FastAPI(title="mem0 server") @app.get("/health") async def health(): return SafeJSONResponse(content={ "status": "ok", "reranker_url": RERANKER_URL, "collections": { "conversational": "openclaw_mem", "knowledge": "knowledge_mem", }, "prompts": { k: {pk: pv[:80] + "…" for pk, pv in pv_dict.items()} for k, pv_dict in PROMPTS.items() }, }) # --------------------------------------------------------------------------- # /memories — conversational, OpenClaw # --------------------------------------------------------------------------- @app.post("/memories") async def add_memory(req: Request): return await handle_add(req, memory_conv, verbatim_allowed=False) @app.post("/memories/search") async def search_memories(req: Request): return await handle_search(req, memory_conv) @app.post("/memories/recent") async def recent_memories(req: Request): return await handle_recent(req, memory_conv) @app.delete("/memories") async def delete_memory(req: Request): data = await req.json() return SafeJSONResponse(content=memory_conv.delete(data.get("filter", {}))) # --------------------------------------------------------------------------- # /knowledge — objective facts, book-ingestor # --------------------------------------------------------------------------- @app.post("/knowledge") async def add_knowledge(req: Request): return await handle_add(req, memory_know, verbatim_allowed=True) @app.post("/knowledge/search") async def search_knowledge(req: Request): return await handle_search(req, memory_know) @app.post("/knowledge/recent") async def recent_knowledge(req: Request): return await handle_recent(req, memory_know) @app.delete("/knowledge") async def delete_knowledge(req: Request): data = await req.json() return SafeJSONResponse(content=memory_know.delete(data.get("filter", {}))) # --------------------------------------------------------------------------- # /search — merged results from both collections (OpenClaw autorecall) # --------------------------------------------------------------------------- @app.post("/search") async def search_all(req: Request): """ Query both collections and merge results. Results are tagged with _source: conversational | knowledge. Accepts same payload as /memories/search. """ data = await req.json() query = (data.get("query") or "").strip() user_id = extract_user_id(data) limit = int(data.get("limit", 5)) if not query: return SafeJSONResponse(content={"results": []}) fetch_k = max(limit * 3, 15) def fetch(mem: Memory, tag: str): try: r = mem.search(query, user_id=user_id, limit=fetch_k) items = r.get("results", []) except Exception: items = [] for item in items: item["_source"] = tag return items conv_items = fetch(memory_conv, "conversational") know_items = fetch(memory_know, "knowledge") merged = conv_items + know_items merged = rerank_results(query, merged, top_k=limit) print( f"[search/all] user={user_id} query={query!r} " f"conv={len(conv_items)} know={len(know_items)} merged={len(merged)}" ) return SafeJSONResponse(content={"results": merged})