| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594 |
- import os
- import math
- import json
- import sqlite3
- import httpx
- from fastapi import FastAPI, Request
- from fastapi.responses import JSONResponse, FileResponse, HTMLResponse
- from mem0 import Memory
- # =============================================================================
- # ENVIRONMENT
- # =============================================================================
- GROQ_API_KEY = os.environ.get("GROQ_API_KEY")
- if not GROQ_API_KEY:
- raise RuntimeError("GROQ_API_KEY environment variable is not set.")
- RERANKER_URL = os.environ.get("RERANKER_URL", "http://192.168.0.200:5200/rerank")
- SQLITE_PATH = os.path.expanduser("~/.mem0/history.db")
- # =============================================================================
- # SAFE JSON RESPONSE (handles Infinity / NaN from Chroma / reranker scores)
- # =============================================================================
- def _sanitize(obj):
- if isinstance(obj, float):
- if math.isnan(obj) or math.isinf(obj):
- return None
- if isinstance(obj, dict):
- return {k: _sanitize(v) for k, v in obj.items()}
- if isinstance(obj, list):
- return [_sanitize(i) for i in obj]
- return obj
- class SafeJSONResponse(JSONResponse):
- def render(self, content) -> bytes:
- return json.dumps(_sanitize(content), ensure_ascii=False).encode("utf-8")
- # =============================================================================
- # PROMPTS
- # Edit these to change how each collection extracts and stores facts.
- # =============================================================================
- PROMPTS = {
- # Used by /memories — conversational, user-centric recall for OpenClaw.
- "conversational": {
- "fact_extraction": """
- You are a personal memory assistant. Extract concise, standalone facts about
- the user from the conversation below. Write each fact as a single sentence
- starting with "User" — for example:
- - "User is interested in generative music."
- - "User is familiar with Python async patterns."
- - "User prefers dark mode interfaces."
- Only extract facts that are clearly stated or strongly implied. Ignore filler,
- greetings, and opinions the user is uncertain about.
- """.strip(),
- "update_memory": """
- You manage a long-term memory database for a personal AI assistant.
- You receive existing memories and new information. Update, merge, or add
- memories as needed. Keep each memory as a single concise sentence starting
- with "User". Remove duplicates and outdated facts.
- """.strip(),
- },
- # Used by /knowledge — objective, source-neutral facts for book/doc ingest.
- "knowledge": {
- "fact_extraction": """
- You are a knowledge extraction system that reads source material and produces
- a list of objective, encyclopedic facts. Write each fact as a precise,
- self-contained sentence. Do NOT reframe facts as user preferences or interests.
- Preserve names, terminology, and relationships exactly as they appear.
- Examples:
- - "Silvio Gesell proposed demurrage as a mechanism to discourage hoarding of currency."
- - "The MIDI standard uses a 7-bit checksum for SysEx message validation."
- Only extract verifiable facts. Ignore meta-commentary and transitional prose.
- """.strip(),
- "update_memory": """
- You manage a knowledge base that stores objective facts extracted from books,
- documents, and reference material. You receive existing facts and new
- information. Update, merge, or add facts as needed. Keep each fact as a
- precise, self-contained sentence. Remove duplicates and outdated entries.
- """.strip(),
- },
- }
- # =============================================================================
- # MEM0 CONFIG FACTORY
- # =============================================================================
- def make_config(collection_name: str, prompt_key: str) -> dict:
- return {
- "llm": {
- "provider": "groq",
- "config": {
- "model": "meta-llama/llama-4-scout-17b-16e-instruct",
- "temperature": 0.025,
- "max_tokens": 1500,
- },
- },
- "vector_store": {
- "provider": "chroma",
- "config": {
- "host": "192.168.0.200",
- "port": 8001,
- "collection_name": collection_name,
- },
- },
- "embedder": {
- "provider": "ollama",
- "config": {
- "model": "nomic-embed-text",
- "ollama_base_url": "http://192.168.0.200:11434",
- },
- },
- "custom_prompts": PROMPTS[prompt_key],
- }
- # =============================================================================
- # MEMORY INSTANCES
- # =============================================================================
- memory_conv = Memory.from_config(make_config("openclaw_mem", "conversational"))
- memory_know = Memory.from_config(make_config("knowledge_mem", "knowledge"))
- # =============================================================================
- # CHROMA EMPTY-FILTER PATCH (applied to both instances)
- # =============================================================================
- NOOP_WHERE = {"$and": [
- {"user_id": {"$ne": ""}},
- {"user_id": {"$ne": ""}},
- ]}
- def is_effectively_empty(filters) -> bool:
- if not filters:
- return True
- if filters in ({"AND": []}, {"OR": []}):
- return True
- return False
- def make_safe_search(mem_instance: Memory):
- orig = mem_instance.vector_store.search
- def safe_search(query, vectors, limit=10, filters=None):
- if is_effectively_empty(filters):
- return mem_instance.vector_store.collection.query(
- query_embeddings=vectors,
- n_results=limit,
- where=NOOP_WHERE,
- )
- try:
- return orig(query=query, vectors=vectors, limit=limit, filters=filters)
- except Exception as e:
- if "Expected where" in str(e):
- return mem_instance.vector_store.collection.query(
- query_embeddings=vectors,
- n_results=limit,
- where=NOOP_WHERE,
- )
- raise
- return safe_search
- memory_conv.vector_store.search = make_safe_search(memory_conv)
- memory_know.vector_store.search = make_safe_search(memory_know)
- # =============================================================================
- # RERANKER
- # =============================================================================
- def rerank_results(query: str, items: list, top_k: int) -> list:
- """Re-order results via local reranker. Falls back gracefully."""
- if not items:
- return items
- documents = [r.get("memory", "") for r in items]
- try:
- resp = httpx.post(
- RERANKER_URL,
- json={"query": query, "documents": documents, "top_k": top_k},
- timeout=5.0,
- )
- resp.raise_for_status()
- reranked = resp.json()["results"]
- except Exception as exc:
- print(f"[reranker] unavailable, skipping rerank: {exc}")
- return items[:top_k]
- text_to_meta = {r.get("memory", ""): r for r in items}
- merged = []
- for r in reranked:
- meta = text_to_meta.get(r["text"])
- if meta:
- merged.append({**meta, "rerank_score": r["score"]})
- return merged
- # =============================================================================
- # SQLITE HELPER
- # =============================================================================
- def sqlite_delete_ids(memory_ids: list[str]) -> int:
- """Delete rows from mem0 SQLite by memory_id. Returns count deleted."""
- if not memory_ids:
- return 0
- try:
- conn = sqlite3.connect(SQLITE_PATH)
- cur = conn.cursor()
- placeholders = ",".join("?" * len(memory_ids))
- cur.execute(
- f"DELETE FROM history WHERE memory_id IN ({placeholders})",
- memory_ids
- )
- deleted = cur.rowcount
- conn.commit()
- conn.close()
- return deleted
- except Exception as e:
- print(f"[sqlite] warning: {e}")
- return 0
- # =============================================================================
- # CHROMA PAGINATION HELPER
- # =============================================================================
- def chroma_get_all(collection, user_id: str, include: list = None) -> list[dict]:
- """
- Page through a Chroma collection in batches, filtering by user_id.
- Returns list of dicts with 'id' and any included fields.
- Bypasses mem0's 100-entry cap entirely.
- """
- if include is None:
- include = ["metadatas"]
- results = []
- batch = 500
- offset = 0
- while True:
- page = collection.get(
- where={"user_id": {"$eq": user_id}},
- limit=batch,
- offset=offset,
- include=include,
- )
- ids = page.get("ids", [])
- if not ids:
- break
- for i, id_ in enumerate(ids):
- row = {"id": id_}
- for field in include:
- values = page.get(field, [])
- row[field[:-1]] = values[i] if i < len(values) else None
- results.append(row)
- offset += len(ids)
- if len(ids) < batch:
- break
- return results
- # =============================================================================
- # SHARED HANDLERS
- # =============================================================================
- def extract_user_id(data: dict) -> str:
- return data.get("userId") or data.get("user_id") or "default"
- async def handle_add(req: Request, mem: Memory, verbatim_allowed: bool = False):
- """
- Shared add handler for /memories and /knowledge.
- /knowledge (verbatim_allowed=True) — always stores verbatim (infer=False).
- The ingestor already summarised; skip
- the second LLM pass.
- /memories (verbatim_allowed=False) — always uses LLM extraction for
- conversational recall.
- Supports:
- - text — raw string (legacy)
- - messages — list of {role, content} dicts (standard mem0)
- - metadata — dict, passed through to mem0
- - user_id / userId
- """
- data = await req.json()
- user_id = extract_user_id(data)
- metadata = data.get("metadata") or {}
- messages = data.get("messages")
- text = data.get("text")
- if not messages and not text:
- return SafeJSONResponse(
- content={"error": "Provide 'text' or 'messages'"}, status_code=400
- )
- if verbatim_allowed:
- # /knowledge — always verbatim, ingestor already summarised
- content = text or " ".join(
- m["content"] for m in messages if m.get("role") == "user"
- )
- result = mem.add(content, user_id=user_id, metadata=metadata, infer=False)
- print(f"[add verbatim] user={user_id} chars={len(content)} meta={metadata}")
- return SafeJSONResponse(content=result)
- # /memories — always LLM extraction
- if messages:
- result = mem.add(messages, user_id=user_id, metadata=metadata)
- else:
- result = mem.add(text, user_id=user_id, metadata=metadata)
- print(f"[add conversational] user={user_id} meta={metadata}")
- return SafeJSONResponse(content=result)
- async def handle_search(req: Request, mem: Memory):
- data = await req.json()
- query = (data.get("query") or "").strip()
- user_id = extract_user_id(data)
- limit = int(data.get("limit", 5))
- if not query:
- return SafeJSONResponse(content={"results": []})
- fetch_k = max(limit * 3, 15)
- try:
- result = mem.search(query, user_id=user_id, limit=fetch_k)
- except Exception:
- all_res = mem.get_all(user_id=user_id)
- items = (
- all_res.get("results", [])
- if isinstance(all_res, dict)
- else (all_res if isinstance(all_res, list) else [])
- )
- q = query.lower()
- items = [r for r in items if q in r.get("memory", "").lower()]
- result = {"results": items}
- items = result.get("results", [])
- items = rerank_results(query, items, top_k=limit)
- print(f"[search] user={user_id} query={query!r} hits={len(items)}")
- return SafeJSONResponse(content={"results": items})
- async def handle_recent(req: Request, mem: Memory):
- data = await req.json()
- user_id = extract_user_id(data)
- if not user_id:
- return SafeJSONResponse(content={"error": "Missing userId"}, status_code=400)
- limit = int(data.get("limit", 5))
- try:
- results = mem.get_all(user_id=user_id)
- except Exception:
- results = mem.search(query="recent", user_id=user_id)
- items = results.get("results", [])
- items = sorted(items, key=lambda r: r.get("created_at", ""), reverse=True)
- return SafeJSONResponse(content={"results": items[:limit]})
- # =============================================================================
- # APP
- # =============================================================================
- app = FastAPI(title="mem0 server")
- # ---------------------------------------------------------------------------
- # DASHBOARD
- # ---------------------------------------------------------------------------
- DASHBOARD_HTML = open("dashboard.html").read()
- @app.get("/dashboard")
- async def dashboard():
- return HTMLResponse(content=DASHBOARD_HTML)
- # ---------------------------------------------------------------------------
- # HEALTH
- # ---------------------------------------------------------------------------
- @app.get("/health")
- async def health():
- return SafeJSONResponse(content={
- "status": "ok",
- "reranker_url": RERANKER_URL,
- "collections": {
- "conversational": "openclaw_mem",
- "knowledge": "knowledge_mem",
- },
- "prompts": {
- k: {pk: pv[:80] + "…" for pk, pv in pv_dict.items()}
- for k, pv_dict in PROMPTS.items()
- },
- })
- # ---------------------------------------------------------------------------
- # /memories — conversational, OpenClaw
- # ---------------------------------------------------------------------------
- @app.post("/memories")
- async def add_memory(req: Request):
- return await handle_add(req, memory_conv, verbatim_allowed=False)
- @app.post("/memories/search")
- async def search_memories(req: Request):
- return await handle_search(req, memory_conv)
- @app.post("/memories/recent")
- async def recent_memories(req: Request):
- return await handle_recent(req, memory_conv)
- @app.delete("/memories")
- async def delete_memory(req: Request):
- data = await req.json()
- return SafeJSONResponse(content=memory_conv.delete(data.get("filter", {})))
- # ---------------------------------------------------------------------------
- # /knowledge — objective facts, book-ingestor
- # ---------------------------------------------------------------------------
- @app.post("/knowledge")
- async def add_knowledge(req: Request):
- return await handle_add(req, memory_know, verbatim_allowed=True)
- @app.post("/knowledge/search")
- async def search_knowledge(req: Request):
- return await handle_search(req, memory_know)
- @app.post("/knowledge/recent")
- async def recent_knowledge(req: Request):
- return await handle_recent(req, memory_know)
- @app.delete("/knowledge")
- async def delete_knowledge(req: Request):
- data = await req.json()
- return SafeJSONResponse(content=memory_know.delete(data.get("filter", {})))
- @app.post("/knowledge/sources")
- async def knowledge_sources(req: Request):
- """
- Return distinct source_file values with entry counts.
- Pages through Chroma directly — no mem0 100-entry cap.
- """
- data = await req.json()
- user_id = extract_user_id(data) or "knowledge_base"
- rows = chroma_get_all(memory_know.vector_store.collection, user_id)
- counts = {}
- for row in rows:
- src = (row.get("metadata") or {}).get("source_file", "(no source)")
- counts[src] = counts.get(src, 0) + 1
- sources = [
- {"source_file": k, "count": v}
- for k, v in sorted(counts.items(), key=lambda x: -x[1])
- ]
- print(f"[sources] user={user_id} total={len(rows)} books={len(sources)}")
- return SafeJSONResponse(content={"sources": sources, "total": len(rows)})
- @app.delete("/knowledge/by-source")
- async def delete_knowledge_by_source(req: Request):
- """
- Delete all knowledge entries for a given source_file.
- Pages through Chroma directly, then cleans SQLite.
- """
- data = await req.json()
- source_file = data.get("source_file")
- user_id = extract_user_id(data) or "knowledge_base"
- if not source_file:
- return SafeJSONResponse(
- content={"error": "Missing source_file"}, status_code=400
- )
- rows = chroma_get_all(memory_know.vector_store.collection, user_id)
- to_delete = [
- row["id"] for row in rows
- if (row.get("metadata") or {}).get("source_file") == source_file
- ]
- if not to_delete:
- return SafeJSONResponse(
- content={"deleted": 0, "message": "no entries found for that source"}
- )
- # 1. Chroma bulk delete
- try:
- memory_know.vector_store.collection.delete(ids=to_delete)
- except Exception as e:
- return SafeJSONResponse(
- content={"error": f"chroma delete failed: {e}"}, status_code=500
- )
- # 2. SQLite cleanup
- sqlite_deleted = sqlite_delete_ids(to_delete)
- print(f"[delete by-source] source={source_file} "
- f"chroma={len(to_delete)} sqlite={sqlite_deleted}")
- return SafeJSONResponse(content={
- "deleted": len(to_delete),
- "sqlite_deleted": sqlite_deleted,
- "source_file": source_file,
- })
- # ---------------------------------------------------------------------------
- # /memory/{id} — single entry delete (knowledge or conversational)
- # ---------------------------------------------------------------------------
- @app.delete("/memory/{memory_id}")
- async def delete_single_memory(memory_id: str, req: Request):
- """
- Delete a single memory by ID from either collection.
- Body: { "collection": "knowledge" | "conversational" }
- Cleans both Chroma and SQLite.
- """
- data = await req.json()
- collection = data.get("collection", "knowledge")
- mem = memory_know if collection == "knowledge" else memory_conv
- # 1. Chroma delete
- try:
- mem.vector_store.collection.delete(ids=[memory_id])
- except Exception as e:
- return SafeJSONResponse(
- content={"error": f"chroma delete failed: {e}"}, status_code=500
- )
- # 2. SQLite cleanup
- sqlite_delete_ids([memory_id])
- print(f"[delete single] id={memory_id} collection={collection}")
- return SafeJSONResponse(content={"deleted": memory_id})
- # ---------------------------------------------------------------------------
- # /search — merged results from both collections (OpenClaw autorecall)
- # ---------------------------------------------------------------------------
- @app.post("/search")
- async def search_all(req: Request):
- """
- Query both collections and merge results.
- Results tagged with _source: conversational | knowledge.
- Accepts same payload as /memories/search.
- """
- data = await req.json()
- query = (data.get("query") or "").strip()
- user_id = extract_user_id(data)
- limit = int(data.get("limit", 5))
- if not query:
- return SafeJSONResponse(content={"results": []})
- fetch_k = max(limit * 3, 15)
- def fetch(mem: Memory, tag: str):
- try:
- r = mem.search(query, user_id=user_id, limit=fetch_k)
- items = r.get("results", [])
- except Exception:
- items = []
- for item in items:
- item["_source"] = tag
- return items
- conv_items = fetch(memory_conv, "conversational")
- know_items = fetch(memory_know, "knowledge")
- merged = rerank_results(query, conv_items + know_items, top_k=limit)
- print(
- f"[search/all] user={user_id} query={query!r} "
- f"conv={len(conv_items)} know={len(know_items)} merged={len(merged)}"
- )
- return SafeJSONResponse(content={"results": merged})
|