| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711 |
- import os
- import math
- import json
- import sqlite3
- import httpx
- from fastapi import FastAPI, Request
- from fastapi.responses import JSONResponse, HTMLResponse
- from mem0 import Memory
- # =============================================================================
- # ENVIRONMENT
- # =============================================================================
- GROQ_API_KEY = os.environ.get("GROQ_API_KEY")
- if not GROQ_API_KEY:
- raise RuntimeError("GROQ_API_KEY environment variable is not set.")
- RERANKER_URL = os.environ.get("RERANKER_URL", "http://192.168.0.200:5200/rerank")
- SQLITE_PATH = os.path.expanduser("~/.mem0/history.db")
- # =============================================================================
- # SAFE JSON RESPONSE
- # Chroma and the reranker can emit Infinity/NaN which is invalid JSON.
- # Sanitize them to None before serializing.
- # =============================================================================
- def _sanitize(obj):
- if isinstance(obj, float):
- if math.isnan(obj) or math.isinf(obj):
- return None
- if isinstance(obj, dict):
- return {k: _sanitize(v) for k, v in obj.items()}
- if isinstance(obj, list):
- return [_sanitize(i) for i in obj]
- return obj
- class SafeJSONResponse(JSONResponse):
- def render(self, content) -> bytes:
- return json.dumps(_sanitize(content), ensure_ascii=False).encode("utf-8")
- # =============================================================================
- # METADATA SANITIZER
- # Chroma MetadataValue only accepts str, int, float, bool.
- # Drop None values; coerce anything else (lists, dicts) to str.
- # =============================================================================
- def sanitize_metadata(meta: dict) -> dict:
- clean = {}
- for k, v in meta.items():
- if v is None:
- continue
- if isinstance(v, (str, int, float, bool)):
- clean[k] = v
- else:
- clean[k] = str(v)
- return clean
- # =============================================================================
- # PROMPTS
- # Mapped to MemoryConfig.custom_fact_extraction_prompt /
- # MemoryConfig.custom_update_memory_prompt (top-level fields).
- #
- # conversational — active, used by /memories on every add
- # knowledge — defined for future use; currently bypassed because
- # /knowledge always stores verbatim (infer=False)
- # =============================================================================
- PROMPTS = {
- "conversational": {
- "fact_extraction": """
- You are an intelligent system that extracts useful long-term memory
- from a conversation.
- Your goal is to identify information that could help future interactions.
- Extract facts that describe:
- 1. User preferences
- 2. Important decisions
- 3. Ongoing projects
- 4. Tools or technologies being used
- 5. Goals or plans
- 6. Constraints or requirements
- 7. Discoveries or conclusions
- 8. Important context about tasks
- Ignore:
- - greetings
- - casual conversation
- - general world knowledge
- - temporary statements
- Return JSON:
- {
- "facts": [
- "fact 1",
- "fact 2"
- ]
- }
- Only include information that may be useful later.
- If nothing important is present return:
- {"facts": []}
- """.strip(),
-
- "update_memory": """
- You manage a long-term memory database.
- You receive:
- 1. existing stored memories
- 2. new extracted facts
- For each fact decide whether to:
- ADD
- Create a new memory if it contains useful new information.
- UPDATE
- Modify an existing memory if the new fact refines or corrects it.
- DELETE
- Remove a memory if it is clearly outdated or incorrect.
- NONE
- Ignore the fact if it is redundant or trivial.
- Guidelines:
- - Prefer updating over adding duplicates
- - Keep memories concise
- - Avoid storing repeated information
- - Preserve important context
- Return JSON list:
- [
- { "event": "ADD", "text": "..." },
- { "event": "UPDATE", "id": "...", "text": "..." }
- ]
- """.strip(),
- },
-
- "knowledge": {
- # Not active during ingest (infer=False bypasses extraction).
- # Kept here so it can be enabled if infer=True is ever needed.
- "fact_extraction": """
- You are a knowledge extraction system that reads source material and produces
- a list of objective, encyclopedic facts. Write each fact as a precise,
- self-contained sentence. Do NOT reframe facts as user preferences or interests.
- Preserve names, terminology, and relationships exactly as they appear.
- Examples:
- - "Silvio Gesell proposed demurrage as a mechanism to discourage hoarding of currency."
- - "The MIDI standard uses a 7-bit checksum for SysEx message validation."
- Only extract verifiable facts. Ignore meta-commentary and transitional prose.
- Return JSON: {"facts": ["fact 1", "fact 2"]}
- """.strip(),
-
- "update_memory": """
- You manage a knowledge base that stores objective facts extracted from books,
- documents, and reference material. You receive existing facts and new
- information. Update, merge, or add facts as needed. Keep each fact as a
- precise, self-contained sentence. Remove duplicates and outdated entries.
- Return JSON list: [{ "event": "ADD"|"UPDATE"|"DELETE"|"NONE", "text": "..." }]
- """.strip(),
- },
- }
- # =============================================================================
- # MEM0 CONFIG FACTORY
- # Prompts are top-level MemoryConfig fields — not nested inside llm.config.
- # =============================================================================
- def make_config(collection_name: str, prompt_key: str) -> dict:
- prompts = PROMPTS[prompt_key]
- return {
- "llm": {
- "provider": "groq",
- "config": {
- "model": "meta-llama/llama-4-scout-17b-16e-instruct",
- "temperature": 0.025,
- "max_tokens": 1500,
- },
- },
- "vector_store": {
- "provider": "chroma",
- "config": {
- "host": "192.168.0.200",
- "port": 8001,
- "collection_name": collection_name,
- },
- },
- "embedder": {
- "provider": "ollama",
- "config": {
- "model": "nomic-embed-text",
- "ollama_base_url": "http://192.168.0.200:11434",
- },
- },
- # Top-level MemoryConfig fields — confirmed from MemoryConfig source
- "custom_fact_extraction_prompt": prompts["fact_extraction"],
- "custom_update_memory_prompt": prompts["update_memory"],
- }
- # =============================================================================
- # MEMORY INSTANCES
- # =============================================================================
- memory_conv = Memory.from_config(make_config("openclaw_mem", "conversational"))
- memory_know = Memory.from_config(make_config("knowledge_mem", "knowledge"))
- # =============================================================================
- # CHROMA EMPTY-FILTER PATCH
- # mem0 sometimes passes an empty filter dict to Chroma which raises an error.
- # Replace with a harmless always-true filter as fallback.
- # =============================================================================
- NOOP_WHERE = {"$and": [
- {"user_id": {"$ne": ""}},
- {"user_id": {"$ne": ""}},
- ]}
- def is_effectively_empty(filters) -> bool:
- if not filters:
- return True
- if filters in ({"AND": []}, {"OR": []}):
- return True
- return False
- def make_safe_search(mem_instance: Memory):
- orig = mem_instance.vector_store.search
- def safe_search(query, vectors, limit=10, filters=None):
- if is_effectively_empty(filters):
- return mem_instance.vector_store.collection.query(
- query_embeddings=vectors,
- n_results=limit,
- where=NOOP_WHERE,
- )
- try:
- return orig(query=query, vectors=vectors, limit=limit, filters=filters)
- except Exception as e:
- if "Expected where" in str(e):
- return mem_instance.vector_store.collection.query(
- query_embeddings=vectors,
- n_results=limit,
- where=NOOP_WHERE,
- )
- raise
- return safe_search
- memory_conv.vector_store.search = make_safe_search(memory_conv)
- memory_know.vector_store.search = make_safe_search(memory_know)
- # =============================================================================
- # RERANKER
- # Calls local reranker to re-order search results by relevance.
- # Falls back to raw mem0 order if unreachable.
- # =============================================================================
- def rerank_results(query: str, items: list, top_k: int) -> list:
- if not items:
- return items
- documents = [r.get("memory", "") for r in items]
- try:
- resp = httpx.post(
- RERANKER_URL,
- json={"query": query, "documents": documents, "top_k": top_k},
- timeout=5.0,
- )
- resp.raise_for_status()
- reranked = resp.json()["results"]
- except Exception as exc:
- print(f"[reranker] unavailable, skipping: {exc}")
- return items[:top_k]
- # Re-attach original mem0 metadata by matching text
- text_to_meta = {r.get("memory", ""): r for r in items}
- merged = []
- for r in reranked:
- meta = text_to_meta.get(r["text"])
- if meta:
- merged.append({**meta, "rerank_score": r["score"]})
- return merged
- # =============================================================================
- # SQLITE HELPER
- # mem0 maintains a local SQLite history alongside Chroma.
- # Both must be cleaned together or deleted entries reappear after restart.
- # =============================================================================
- def sqlite_delete_ids(memory_ids: list[str]) -> int:
- """Delete rows by memory_id. Returns count deleted."""
- if not memory_ids:
- return 0
- try:
- conn = sqlite3.connect(SQLITE_PATH)
- cur = conn.cursor()
- placeholders = ",".join("?" * len(memory_ids))
- cur.execute(
- f"DELETE FROM history WHERE memory_id IN ({placeholders})",
- memory_ids
- )
- deleted = cur.rowcount
- conn.commit()
- conn.close()
- return deleted
- except Exception as e:
- print(f"[sqlite] warning: {e}")
- return 0
- # =============================================================================
- # CHROMA PAGINATION HELPER
- # mem0's get_all() is capped at 100 entries. This pages Chroma directly
- # in batches of 500 to retrieve the full collection without limits.
- # =============================================================================
- def chroma_get_all(collection, user_id: str, include: list = None) -> list[dict]:
- if include is None:
- include = ["metadatas"]
- results = []
- batch = 500
- offset = 0
- while True:
- page = collection.get(
- where={"user_id": {"$eq": user_id}},
- limit=batch,
- offset=offset,
- include=include,
- )
- ids = page.get("ids", [])
- if not ids:
- break
- for i, id_ in enumerate(ids):
- row = {"id": id_}
- for field in include:
- values = page.get(field, [])
- row[field[:-1]] = values[i] if i < len(values) else None
- results.append(row)
- offset += len(ids)
- if len(ids) < batch:
- break
- return results
- # =============================================================================
- # SHARED HANDLERS
- # =============================================================================
- def extract_user_id(data: dict) -> str:
- return data.get("userId") or data.get("user_id") or "default"
- async def handle_add(req: Request, mem: Memory, verbatim_allowed: bool = False):
- """
- Shared add handler for /memories and /knowledge.
- /knowledge (verbatim_allowed=True) — always infer=False. The ingestor
- already summarised; skip the second
- LLM extraction pass.
- /memories (verbatim_allowed=False) — always LLM extraction using the
- conversational prompts above.
- Accepts: text | messages, user_id, metadata.
- Metadata is sanitized — Chroma rejects None and complex types.
- """
- data = await req.json()
- user_id = extract_user_id(data)
- # metadata = sanitize_metadata(data.get("metadata") or {})
- raw_meta = data.get("metadata")
- metadata = sanitize_metadata(raw_meta) if raw_meta else None
- messages = data.get("messages")
- text = data.get("text")
- if not messages and not text:
- return SafeJSONResponse(
- content={"error": "Provide 'text' or 'messages'"}, status_code=400
- )
- if verbatim_allowed:
- # /knowledge — store verbatim, ingestor already did the summarisation
- content = text or " ".join(
- m["content"] for m in messages if m.get("role") == "user"
- )
- result = mem.add(content, user_id=user_id, metadata=metadata, infer=False)
- print(f"[add verbatim] user={user_id} chars={len(content)} meta={metadata}")
- return SafeJSONResponse(content=result)
- # in the /memories path
- kwargs = {"user_id": user_id}
- if metadata:
- kwargs["metadata"] = metadata
- result = mem.add(messages or text, **kwargs)
- # # /memories — LLM extracts and deduplicates facts from conversation
- # if messages:
- # result = mem.add(messages, user_id=user_id)
- # else:
- # result = mem.add(text, user_id=user_id)
- print(f"[add conversational] user={user_id} meta={metadata}")
- return SafeJSONResponse(content=result)
- async def handle_search(req: Request, mem: Memory):
- """Semantic search with reranking. Fetches limit×3 candidates then reranks."""
- data = await req.json()
- query = (data.get("query") or "").strip()
- user_id = extract_user_id(data)
- limit = int(data.get("limit", 5))
- if not query:
- return SafeJSONResponse(content={"results": []})
- fetch_k = max(limit * 3, 15)
- try:
- result = mem.search(query, user_id=user_id, limit=fetch_k)
- except Exception:
- # Fallback: get_all + simple text filter
- all_res = mem.get_all(user_id=user_id)
- items = (
- all_res.get("results", [])
- if isinstance(all_res, dict)
- else (all_res if isinstance(all_res, list) else [])
- )
- q = query.lower()
- items = [r for r in items if q in r.get("memory", "").lower()]
- result = {"results": items}
- items = result.get("results", [])
- items = rerank_results(query, items, top_k=limit)
- print(f"[search] user={user_id} query={query!r} hits={len(items)}")
- return SafeJSONResponse(content={"results": items})
- async def handle_recent(req: Request, mem: Memory):
- """Return most recently created memories, sorted by created_at desc."""
- data = await req.json()
- user_id = extract_user_id(data)
- if not user_id:
- return SafeJSONResponse(content={"error": "Missing userId"}, status_code=400)
- limit = int(data.get("limit", 5))
- try:
- results = mem.get_all(user_id=user_id)
- except Exception:
- results = mem.search(query="recent", user_id=user_id)
- items = results.get("results", [])
- items = sorted(items, key=lambda r: r.get("created_at", ""), reverse=True)
- return SafeJSONResponse(content={"results": items[:limit]})
- # =============================================================================
- # APP
- # =============================================================================
- app = FastAPI(title="mem0 server")
- # ---------------------------------------------------------------------------
- # DASHBOARD — served from file mounted via docker-compose volume
- # ---------------------------------------------------------------------------
- DASHBOARD_HTML = open("dashboard.html").read()
- @app.get("/dashboard")
- async def dashboard():
- return HTMLResponse(content=DASHBOARD_HTML)
- # ---------------------------------------------------------------------------
- # HEALTH
- # ---------------------------------------------------------------------------
- @app.get("/health")
- async def health():
- return SafeJSONResponse(content={
- "status": "ok",
- "reranker_url": RERANKER_URL,
- "collections": {
- "conversational": "openclaw_mem",
- "knowledge": "knowledge_mem",
- },
- # Show first 80 chars of each prompt for quick verification
- "prompts": {
- k: {pk: pv[:80] + "…" for pk, pv in pv_dict.items()}
- for k, pv_dict in PROMPTS.items()
- },
- })
- # ---------------------------------------------------------------------------
- # /memories — conversational collection (OpenClaw)
- # ---------------------------------------------------------------------------
- @app.post("/memories")
- async def add_memory(req: Request):
- return await handle_add(req, memory_conv, verbatim_allowed=False)
- @app.post("/memories/search")
- async def search_memories(req: Request):
- return await handle_search(req, memory_conv)
- @app.post("/memories/recent")
- async def recent_memories(req: Request):
- return await handle_recent(req, memory_conv)
- @app.delete("/memories")
- async def delete_memory(req: Request):
- data = await req.json()
- return SafeJSONResponse(content=memory_conv.delete(data.get("filter", {})))
- # ---------------------------------------------------------------------------
- # /knowledge — objective facts collection (book-ingestor)
- # ---------------------------------------------------------------------------
- @app.post("/knowledge")
- async def add_knowledge(req: Request):
- return await handle_add(req, memory_know, verbatim_allowed=True)
- @app.post("/knowledge/search")
- async def search_knowledge(req: Request):
- return await handle_search(req, memory_know)
- @app.post("/knowledge/recent")
- async def recent_knowledge(req: Request):
- return await handle_recent(req, memory_know)
- @app.delete("/knowledge")
- async def delete_knowledge(req: Request):
- data = await req.json()
- return SafeJSONResponse(content=memory_know.delete(data.get("filter", {})))
- @app.post("/knowledge/sources")
- async def knowledge_sources(req: Request):
- """
- Return distinct source_file values with entry counts.
- Pages Chroma directly — bypasses mem0's 100-entry get_all cap.
- """
- data = await req.json()
- user_id = extract_user_id(data) or "knowledge_base"
- rows = chroma_get_all(memory_know.vector_store.collection, user_id)
- counts = {}
- for row in rows:
- src = (row.get("metadata") or {}).get("source_file", "(no source)")
- counts[src] = counts.get(src, 0) + 1
- sources = [
- {"source_file": k, "count": v}
- for k, v in sorted(counts.items(), key=lambda x: -x[1])
- ]
- print(f"[sources] user={user_id} total={len(rows)} books={len(sources)}")
- return SafeJSONResponse(content={"sources": sources, "total": len(rows)})
- @app.delete("/knowledge/by-source")
- async def delete_knowledge_by_source(req: Request):
- """
- Delete all entries for a given source_file from both Chroma and SQLite.
- Pages Chroma directly to avoid the 100-entry cap on get_all.
- """
- data = await req.json()
- source_file = data.get("source_file")
- user_id = extract_user_id(data) or "knowledge_base"
- if not source_file:
- return SafeJSONResponse(
- content={"error": "Missing source_file"}, status_code=400
- )
- # Collect all IDs matching source_file across all pages
- rows = chroma_get_all(memory_know.vector_store.collection, user_id)
- to_delete = [
- row["id"] for row in rows
- if (row.get("metadata") or {}).get("source_file") == source_file
- ]
- if not to_delete:
- return SafeJSONResponse(
- content={"deleted": 0, "message": "no entries found for that source"}
- )
- # Delete from Chroma in one bulk call
- try:
- memory_know.vector_store.collection.delete(ids=to_delete)
- except Exception as e:
- return SafeJSONResponse(
- content={"error": f"chroma delete failed: {e}"}, status_code=500
- )
- # Clean SQLite so entries don't reappear after server restart
- sqlite_deleted = sqlite_delete_ids(to_delete)
- print(f"[delete by-source] source={source_file} "
- f"chroma={len(to_delete)} sqlite={sqlite_deleted}")
- return SafeJSONResponse(content={
- "deleted": len(to_delete),
- "sqlite_deleted": sqlite_deleted,
- "source_file": source_file,
- })
- # ---------------------------------------------------------------------------
- # /memory/{id} — single entry delete for dashboard per-row buttons
- # ---------------------------------------------------------------------------
- @app.delete("/memory/{memory_id}")
- async def delete_single_memory(memory_id: str, req: Request):
- """
- Delete one memory by ID from either collection.
- Body: { "collection": "knowledge" | "conversational" }
- Cleans both Chroma and SQLite.
- """
- data = await req.json()
- collection = data.get("collection", "knowledge")
- mem = memory_know if collection == "knowledge" else memory_conv
- try:
- mem.vector_store.collection.delete(ids=[memory_id])
- except Exception as e:
- return SafeJSONResponse(
- content={"error": f"chroma delete failed: {e}"}, status_code=500
- )
- sqlite_delete_ids([memory_id])
- print(f"[delete single] id={memory_id} collection={collection}")
- return SafeJSONResponse(content={"deleted": memory_id})
- # ---------------------------------------------------------------------------
- # /search — merged results from both collections (OpenClaw autorecall)
- # ---------------------------------------------------------------------------
- @app.post("/search")
- async def search_all(req: Request):
- """
- Query both collections simultaneously, tag results with _source,
- then run a single rerank pass over the merged pool.
- """
- data = await req.json()
- query = (data.get("query") or "").strip()
- user_id = extract_user_id(data)
- limit = int(data.get("limit", 5))
- if not query:
- return SafeJSONResponse(content={"results": []})
- fetch_k = max(limit * 3, 15)
- def fetch(mem: Memory, tag: str):
- try:
- r = mem.search(query, user_id=user_id, limit=fetch_k)
- items = r.get("results", [])
- except Exception:
- items = []
- for item in items:
- item["_source"] = tag
- return items
- conv_items = fetch(memory_conv, "conversational")
- know_items = fetch(memory_know, "knowledge")
- merged = rerank_results(query, conv_items + know_items, top_k=limit)
- print(
- f"[search/all] user={user_id} query={query!r} "
- f"conv={len(conv_items)} know={len(know_items)} merged={len(merged)}"
- )
- return SafeJSONResponse(content={"results": merged})
- @app.post("/memories/all")
- async def memories_all(req: Request):
- """
- Return all memories for a user, paging Chroma directly.
- Bypasses mem0's 100-entry get_all cap.
- """
- data = await req.json()
- user_id = extract_user_id(data) or "main"
- rows = chroma_get_all(
- memory_conv.vector_store.collection,
- user_id,
- include=["metadatas", "documents"]
- )
- items = []
- for row in rows:
- meta = row.get("metadata") or {}
- items.append({
- "id": row["id"],
- "memory": row.get("document") or meta.get("data", ""),
- "created_at": meta.get("created_at"),
- "metadata": meta,
- "user_id": user_id,
- })
- items.sort(key=lambda r: r.get("created_at") or "", reverse=True)
- print(f"[memories/all] user={user_id} total={len(items)}")
- return SafeJSONResponse(content={"results": items})
|