Browse Source

refactored

Lukas Goldschmidt 14 giờ trước cách đây
mục cha
commit
2e1a19a386

+ 14 - 2
README.md

@@ -20,7 +20,17 @@ The server maintains two independent Chroma collections with separate extraction
 | Conversational | `openclaw_mem` | `/memories` | OpenClaw agent | User-centric facts (`"User prefers…"`) |
 | Knowledge | `knowledge_mem` | `/knowledge` | book-ingestor | Objective, encyclopedic facts |
 
-Prompts for both collections are defined in the `PROMPTS` dict at the top of `mem0server.py` and are easy to edit without touching routing code.
+Prompts for both collections are defined in `mem0core/prompts.py`, so you can re-tune the extraction/update wording without touching the routing layer.
+
+## Code layout
+
+| Module | Role |
+|--------|------|
+| `mem0server.py` | backward-compatible entrypoint `uvicorn mem0server:app` that delegates to `mem0core.create_app()`. |
+| `mem0core/app.py` | FastAPI factory that wires memory instances and mounts the router built in `mem0core/routes.py`. |
+| `mem0core/prompts.py`, `mem0core/config.py`, `mem0core/memory_factory.py`, `mem0core/storage.py`, `mem0core/reranker.py` | Shared config/prompt helpers, mem0 instance construction, storage patches, and reranking logic—good places to change when you swap DBs or vector stores. |
+| `mem0core/handlers.py`, `mem0core/responses.py` | Shared helper functions (metadata sanitization, add/search/recent shared flows, SafeJSONResponse). |
+| `mem0core/routes.py` | All endpoints plus docstring metadata so `/docs` presents concise summaries.
 
 ## Environment variables
 
@@ -38,7 +48,7 @@ RERANKER_URL=http://192.168.0.200:5200/rerank
 
 ## Docker (recommended)
 
-The recommended setup mounts `mem0server.py` as a volume so code changes are picked up by uvicorn's `--reload` without rebuilding the image. Only touch `docker compose build` when `requirements.txt` changes.
+The recommended setup mounts `mem0server.py` and the new `mem0core/` package so uvicorn reloads on local edits without rebuilding the image. Only touch `docker compose build` when `requirements.txt` changes.
 
 ### `docker-compose.yml`
 
@@ -52,6 +62,8 @@ services:
       - "8420:8420"
     volumes:
       - ./mem0server.py:/app/mem0server.py:ro
+      - ./mem0core:/app/mem0core:ro
+      - ./dashboard.html:/app/dashboard.html:ro
     env_file:
       - .env
     restart: unless-stopped

+ 1 - 0
docker-compose.yml

@@ -9,6 +9,7 @@ services:
     volumes:
       - ./mem0server.py:/app/mem0server.py:ro
       - ./dashboard.html:/app/dashboard.html:ro
+      - ./mem0core:/app/mem0core:ro
     env_file:
       - .env
     restart: unless-stopped

+ 3 - 0
mem0core/__init__.py

@@ -0,0 +1,3 @@
+from .app import create_app
+
+__all__ = ["create_app"]

+ 12 - 0
mem0core/app.py

@@ -0,0 +1,12 @@
+from fastapi import FastAPI
+
+from .memory_factory import build_memories
+from .routes import build_router
+
+
+def create_app() -> FastAPI:
+    """Create the FastAPI app and register all mem0 server routes."""
+    app = FastAPI(title="mem0 server")
+    memory_conv, memory_know = build_memories()
+    app.include_router(build_router(memory_conv, memory_know))
+    return app

+ 12 - 0
mem0core/config.py

@@ -0,0 +1,12 @@
+import os
+
+GROQ_API_KEY = os.environ.get("GROQ_API_KEY")
+if not GROQ_API_KEY:
+    raise RuntimeError("GROQ_API_KEY environment variable is not set.")
+
+RERANKER_URL = os.environ.get("RERANKER_URL", "http://192.168.0.200:5200/rerank")
+SQLITE_PATH = os.path.expanduser("~/.mem0/history.db")
+
+COLLECTION_CONVERSATIONAL = "openclaw_mem"
+COLLECTION_KNOWLEDGE = "knowledge_mem"
+DEFAULT_USER_ID = "default"

+ 93 - 0
mem0core/handlers.py

@@ -0,0 +1,93 @@
+from fastapi import Request
+from mem0 import Memory
+
+from .config import DEFAULT_USER_ID
+from .reranker import rerank_results
+from .responses import SafeJSONResponse
+
+
+def sanitize_metadata(meta: dict) -> dict:
+    """Ensure metadata values are Chroma-compatible primitive types."""
+    clean = {}
+    for key, value in meta.items():
+        if value is None:
+            continue
+        if isinstance(value, (str, int, float, bool)):
+            clean[key] = value
+        else:
+            clean[key] = str(value)
+    return clean
+
+
+def extract_user_id(data: dict) -> str:
+    """Read user ID from either snake_case or camelCase payload fields."""
+    return data.get("userId") or data.get("user_id") or DEFAULT_USER_ID
+
+
+async def handle_add(req: Request, mem: Memory, verbatim_allowed: bool = False):
+    """Handle add for /memories and /knowledge with optional verbatim mode."""
+    data = await req.json()
+    user_id = extract_user_id(data)
+    raw_meta = data.get("metadata")
+    metadata = sanitize_metadata(raw_meta) if raw_meta else None
+    messages = data.get("messages")
+    text = data.get("text")
+
+    if not messages and not text:
+        return SafeJSONResponse(content={"error": "Provide 'text' or 'messages'"}, status_code=400)
+
+    if verbatim_allowed:
+        content = text or " ".join(m["content"] for m in messages if m.get("role") == "user")
+        result = mem.add(content, user_id=user_id, metadata=metadata, infer=False)
+        print(f"[add verbatim] user={user_id} chars={len(content)} meta={metadata}")
+        return SafeJSONResponse(content=result)
+
+    kwargs = {"user_id": user_id}
+    if metadata:
+        kwargs["metadata"] = metadata
+    result = mem.add(messages or text, **kwargs)
+    print(f"[add conversational] user={user_id} meta={metadata}")
+    return SafeJSONResponse(content=result)
+
+
+async def handle_search(req: Request, mem: Memory):
+    """Run semantic search, then rerank the candidate list."""
+    data = await req.json()
+    query = (data.get("query") or "").strip()
+    user_id = extract_user_id(data)
+    limit = int(data.get("limit", 5))
+
+    if not query:
+        return SafeJSONResponse(content={"results": []})
+
+    fetch_k = max(limit * 3, 15)
+    try:
+        result = mem.search(query, user_id=user_id, limit=fetch_k)
+    except Exception:
+        all_res = mem.get_all(user_id=user_id)
+        items = all_res.get("results", []) if isinstance(all_res, dict) else (all_res if isinstance(all_res, list) else [])
+        q = query.lower()
+        items = [r for r in items if q in r.get("memory", "").lower()]
+        result = {"results": items}
+
+    items = rerank_results(query, result.get("results", []), top_k=limit)
+    print(f"[search] user={user_id} query={query!r} hits={len(items)}")
+    return SafeJSONResponse(content={"results": items})
+
+
+async def handle_recent(req: Request, mem: Memory):
+    """Return most recently created memories for a user."""
+    data = await req.json()
+    user_id = extract_user_id(data)
+    if not user_id:
+        return SafeJSONResponse(content={"error": "Missing userId"}, status_code=400)
+
+    limit = int(data.get("limit", 5))
+    try:
+        results = mem.get_all(user_id=user_id)
+    except Exception:
+        results = mem.search(query="recent", user_id=user_id)
+
+    items = results.get("results", [])
+    items = sorted(items, key=lambda r: r.get("created_at", ""), reverse=True)
+    return SafeJSONResponse(content={"results": items[:limit]})

+ 82 - 0
mem0core/memory_factory.py

@@ -0,0 +1,82 @@
+from mem0 import Memory
+
+from .config import COLLECTION_CONVERSATIONAL, COLLECTION_KNOWLEDGE
+from .prompts import PROMPTS
+
+NOOP_WHERE = {"$and": [{"user_id": {"$ne": ""}}, {"user_id": {"$ne": ""}}]}
+
+
+def make_config(collection_name: str, prompt_key: str) -> dict:
+    """Build a mem0 config with selected prompts and shared model/vector settings."""
+    prompts = PROMPTS[prompt_key]
+    return {
+        "llm": {
+            "provider": "groq",
+            "config": {
+                "model": "meta-llama/llama-4-scout-17b-16e-instruct",
+                "temperature": 0.025,
+                "max_tokens": 1500,
+            },
+        },
+        "vector_store": {
+            "provider": "chroma",
+            "config": {
+                "host": "192.168.0.200",
+                "port": 8001,
+                "collection_name": collection_name,
+            },
+        },
+        "embedder": {
+            "provider": "ollama",
+            "config": {
+                "model": "nomic-embed-text",
+                "ollama_base_url": "http://192.168.0.200:11434",
+            },
+        },
+        "custom_fact_extraction_prompt": prompts["fact_extraction"],
+        "custom_update_memory_prompt": prompts["update_memory"],
+    }
+
+
+def is_effectively_empty(filters) -> bool:
+    """Treat missing and known-empty filter shapes as empty filters."""
+    if not filters:
+        return True
+    if filters in ({"AND": []}, {"OR": []}):
+        return True
+    return False
+
+
+def make_safe_search(mem_instance: Memory):
+    """Patch mem0 search to survive Chroma errors on empty filters."""
+    original = mem_instance.vector_store.search
+
+    def safe_search(query, vectors, limit=10, filters=None):
+        if is_effectively_empty(filters):
+            return mem_instance.vector_store.collection.query(
+                query_embeddings=vectors,
+                n_results=limit,
+                where=NOOP_WHERE,
+            )
+        try:
+            return original(query=query, vectors=vectors, limit=limit, filters=filters)
+        except Exception as exc:
+            if "Expected where" in str(exc):
+                return mem_instance.vector_store.collection.query(
+                    query_embeddings=vectors,
+                    n_results=limit,
+                    where=NOOP_WHERE,
+                )
+            raise
+
+    return safe_search
+
+
+def build_memories() -> tuple[Memory, Memory]:
+    """Create and patch conversational + knowledge Memory instances."""
+    memory_conv = Memory.from_config(make_config(COLLECTION_CONVERSATIONAL, "conversational"))
+    memory_know = Memory.from_config(make_config(COLLECTION_KNOWLEDGE, "knowledge"))
+
+    memory_conv.vector_store.search = make_safe_search(memory_conv)
+    memory_know.vector_store.search = make_safe_search(memory_know)
+    return memory_conv, memory_know

+ 78 - 0
mem0core/prompts.py

@@ -0,0 +1,78 @@
+PROMPTS = {
+    "conversational": {
+        "fact_extraction": """
+You are an intelligent system that extracts useful long-term memory
+from a conversation.
+Your goal is to identify information that could help future interactions.
+Extract facts that describe:
+1. User preferences
+2. Important decisions
+3. Ongoing projects
+4. Tools or technologies being used
+5. Goals or plans
+6. Constraints or requirements
+7. Discoveries or conclusions
+8. Important context about tasks
+Ignore:
+- greetings
+- casual conversation
+- general world knowledge
+- temporary statements
+Return JSON:
+{
+ "facts": [
+   "fact 1",
+   "fact 2"
+ ]
+}
+Only include information that may be useful later.
+If nothing important is present return:
+{"facts": []}
+""".strip(),
+        "update_memory": """
+You manage a long-term memory database.
+You receive:
+1. existing stored memories
+2. new extracted facts
+For each fact decide whether to:
+ADD
+Create a new memory if it contains useful new information.
+UPDATE
+Modify an existing memory if the new fact refines or corrects it.
+DELETE
+Remove a memory if it is clearly outdated or incorrect.
+NONE
+Ignore the fact if it is redundant or trivial.
+Guidelines:
+- Prefer updating over adding duplicates
+- Keep memories concise
+- Avoid storing repeated information
+- Preserve important context
+Return JSON list:
+[
+ { "event": "ADD", "text": "..." },
+ { "event": "UPDATE", "id": "...", "text": "..." }
+]
+""".strip(),
+    },
+    "knowledge": {
+        "fact_extraction": """
+You are a knowledge extraction system that reads source material and produces
+a list of objective, encyclopedic facts. Write each fact as a precise,
+self-contained sentence. Do NOT reframe facts as user preferences or interests.
+Preserve names, terminology, and relationships exactly as they appear.
+Examples:
+  - "Silvio Gesell proposed demurrage as a mechanism to discourage hoarding of currency."
+  - "The MIDI standard uses a 7-bit checksum for SysEx message validation."
+Only extract verifiable facts. Ignore meta-commentary and transitional prose.
+Return JSON: {"facts": ["fact 1", "fact 2"]}
+""".strip(),
+        "update_memory": """
+You manage a knowledge base that stores objective facts extracted from books,
+documents, and reference material. You receive existing facts and new
+information. Update, merge, or add facts as needed. Keep each fact as a
+precise, self-contained sentence. Remove duplicates and outdated entries.
+Return JSON list: [{ "event": "ADD"|"UPDATE"|"DELETE"|"NONE", "text": "..." }]
+""".strip(),
+    },
+}

+ 30 - 0
mem0core/reranker.py

@@ -0,0 +1,30 @@
+import httpx
+
+from .config import RERANKER_URL
+
+
+def rerank_results(query: str, items: list, top_k: int) -> list:
+    """Rerank mem0 search results via local reranker, fallback to original order."""
+    if not items:
+        return items
+
+    documents = [r.get("memory", "") for r in items]
+    try:
+        response = httpx.post(
+            RERANKER_URL,
+            json={"query": query, "documents": documents, "top_k": top_k},
+            timeout=5.0,
+        )
+        response.raise_for_status()
+        reranked = response.json()["results"]
+    except Exception as exc:
+        print(f"[reranker] unavailable, skipping: {exc}")
+        return items[:top_k]
+
+    text_to_meta = {r.get("memory", ""): r for r in items}
+    merged = []
+    for item in reranked:
+        meta = text_to_meta.get(item["text"])
+        if meta:
+            merged.append({**meta, "rerank_score": item["score"]})
+    return merged

+ 23 - 0
mem0core/responses.py

@@ -0,0 +1,23 @@
+import json
+import math
+
+from fastapi.responses import JSONResponse
+
+
+def _sanitize(obj):
+    """Replace NaN/Infinity with None so every payload remains valid JSON."""
+    if isinstance(obj, float):
+        if math.isnan(obj) or math.isinf(obj):
+            return None
+    if isinstance(obj, dict):
+        return {k: _sanitize(v) for k, v in obj.items()}
+    if isinstance(obj, list):
+        return [_sanitize(i) for i in obj]
+    return obj
+
+
+class SafeJSONResponse(JSONResponse):
+    """JSONResponse variant that normalizes non-JSON float values."""
+
+    def render(self, content) -> bytes:
+        return json.dumps(_sanitize(content), ensure_ascii=False).encode("utf-8")

+ 214 - 0
mem0core/routes.py

@@ -0,0 +1,214 @@
+from fastapi import APIRouter, Request
+from fastapi.responses import HTMLResponse
+from mem0 import Memory
+
+from .config import COLLECTION_CONVERSATIONAL, COLLECTION_KNOWLEDGE, RERANKER_URL
+from .handlers import extract_user_id, handle_add, handle_recent, handle_search
+from .prompts import PROMPTS
+from .reranker import rerank_results
+from .responses import SafeJSONResponse
+from .storage import chroma_get_all, sqlite_delete_ids
+
+
+def build_router(memory_conv: Memory, memory_know: Memory) -> APIRouter:
+    """Build and return all API routes bound to the provided memory instances."""
+    router = APIRouter()
+
+    with open("dashboard.html", "r", encoding="utf-8") as handle:
+        dashboard_html = handle.read()
+
+    @router.get("/dashboard", response_class=HTMLResponse, summary="Render dashboard")
+    async def dashboard():
+        """Serve the local dashboard HTML used for memory inspection and admin actions."""
+        return HTMLResponse(content=dashboard_html)
+
+    @router.get("/health", summary="Service health and prompt preview")
+    async def health():
+        """Return runtime health plus prompt snippets for quick configuration verification."""
+        return SafeJSONResponse(
+            content={
+                "status": "ok",
+                "reranker_url": RERANKER_URL,
+                "collections": {
+                    "conversational": COLLECTION_CONVERSATIONAL,
+                    "knowledge": COLLECTION_KNOWLEDGE,
+                },
+                "prompts": {
+                    key: {p_key: text[:80] + "…" for p_key, text in prompt_set.items()}
+                    for key, prompt_set in PROMPTS.items()
+                },
+            }
+        )
+
+    @router.post("/memories", summary="Add conversational memory")
+    async def add_memory(req: Request):
+        """Store conversational memory with LLM extraction and deduplication enabled."""
+        return await handle_add(req, memory_conv, verbatim_allowed=False)
+
+    @router.post("/memories/search", summary="Search conversational memory")
+    async def search_memories(req: Request):
+        """Search conversational memory and rerank candidates by relevance."""
+        return await handle_search(req, memory_conv)
+
+    @router.post("/memories/recent", summary="Recent conversational memory")
+    async def recent_memories(req: Request):
+        """Return newest conversational memories ordered by creation time."""
+        return await handle_recent(req, memory_conv)
+
+    @router.delete("/memories", summary="Delete conversational memory by filter")
+    async def delete_memory(req: Request):
+        """Delete conversational memories using a mem0 filter object from the request body."""
+        data = await req.json()
+        return SafeJSONResponse(content=memory_conv.delete(data.get("filter", {})))
+
+    @router.post("/memories/all", summary="List all conversational memories")
+    async def memories_all(req: Request):
+        """Fetch full conversational history for a user by paging Chroma directly."""
+        data = await req.json()
+        user_id = extract_user_id(data) or "main"
+
+        rows = chroma_get_all(
+            memory_conv.vector_store.collection,
+            user_id,
+            include=["metadatas", "documents"],
+        )
+        items = []
+        for row in rows:
+            meta = row.get("metadata") or {}
+            items.append(
+                {
+                    "id": row["id"],
+                    "memory": row.get("document") or meta.get("data", ""),
+                    "created_at": meta.get("created_at"),
+                    "metadata": meta,
+                    "user_id": user_id,
+                }
+            )
+
+        items.sort(key=lambda r: r.get("created_at") or "", reverse=True)
+        print(f"[memories/all] user={user_id} total={len(items)}")
+        return SafeJSONResponse(content={"results": items})
+
+    @router.post("/knowledge", summary="Add knowledge chunk")
+    async def add_knowledge(req: Request):
+        """Store knowledge verbatim without LLM extraction (ingestor already summarizes)."""
+        return await handle_add(req, memory_know, verbatim_allowed=True)
+
+    @router.post("/knowledge/search", summary="Search knowledge base")
+    async def search_knowledge(req: Request):
+        """Search knowledge memories and rerank candidates."""
+        return await handle_search(req, memory_know)
+
+    @router.post("/knowledge/recent", summary="Recent knowledge entries")
+    async def recent_knowledge(req: Request):
+        """Return newest knowledge entries for a user."""
+        return await handle_recent(req, memory_know)
+
+    @router.delete("/knowledge", summary="Delete knowledge by filter")
+    async def delete_knowledge(req: Request):
+        """Delete knowledge entries using a mem0 filter object from the request body."""
+        data = await req.json()
+        return SafeJSONResponse(content=memory_know.delete(data.get("filter", {})))
+
+    @router.post("/knowledge/sources", summary="Knowledge source counts")
+    async def knowledge_sources(req: Request):
+        """List distinct source_file values with counts, bypassing mem0 get_all caps."""
+        data = await req.json()
+        user_id = extract_user_id(data) or "knowledge_base"
+
+        rows = chroma_get_all(memory_know.vector_store.collection, user_id)
+        counts = {}
+        for row in rows:
+            src = (row.get("metadata") or {}).get("source_file", "(no source)")
+            counts[src] = counts.get(src, 0) + 1
+
+        sources = [{"source_file": k, "count": v} for k, v in sorted(counts.items(), key=lambda x: -x[1])]
+        print(f"[sources] user={user_id} total={len(rows)} books={len(sources)}")
+        return SafeJSONResponse(content={"sources": sources, "total": len(rows)})
+
+    @router.delete("/knowledge/by-source", summary="Delete all entries for one source file")
+    async def delete_knowledge_by_source(req: Request):
+        """Delete all knowledge entries for a source_file from Chroma and SQLite."""
+        data = await req.json()
+        source_file = data.get("source_file")
+        user_id = extract_user_id(data) or "knowledge_base"
+
+        if not source_file:
+            return SafeJSONResponse(content={"error": "Missing source_file"}, status_code=400)
+
+        rows = chroma_get_all(memory_know.vector_store.collection, user_id)
+        to_delete = [
+            row["id"]
+            for row in rows
+            if (row.get("metadata") or {}).get("source_file") == source_file
+        ]
+
+        if not to_delete:
+            return SafeJSONResponse(content={"deleted": 0, "message": "no entries found for that source"})
+
+        try:
+            memory_know.vector_store.collection.delete(ids=to_delete)
+        except Exception as exc:
+            return SafeJSONResponse(content={"error": f"chroma delete failed: {exc}"}, status_code=500)
+
+        sqlite_deleted = sqlite_delete_ids(to_delete)
+        print(f"[delete by-source] source={source_file} chroma={len(to_delete)} sqlite={sqlite_deleted}")
+
+        return SafeJSONResponse(
+            content={
+                "deleted": len(to_delete),
+                "sqlite_deleted": sqlite_deleted,
+                "source_file": source_file,
+            }
+        )
+
+    @router.delete("/memory/{memory_id}", summary="Delete one memory by ID")
+    async def delete_single_memory(memory_id: str, req: Request):
+        """Delete a single memory from selected collection and mirror deletion in SQLite."""
+        data = await req.json()
+        collection = data.get("collection", "knowledge")
+        mem = memory_know if collection == "knowledge" else memory_conv
+
+        try:
+            mem.vector_store.collection.delete(ids=[memory_id])
+        except Exception as exc:
+            return SafeJSONResponse(content={"error": f"chroma delete failed: {exc}"}, status_code=500)
+
+        sqlite_delete_ids([memory_id])
+        print(f"[delete single] id={memory_id} collection={collection}")
+        return SafeJSONResponse(content={"deleted": memory_id})
+
+    @router.post("/search", summary="Search both collections")
+    async def search_all(req: Request):
+        """Search conversational + knowledge collections, then rerank merged results."""
+        data = await req.json()
+        query = (data.get("query") or "").strip()
+        user_id = extract_user_id(data)
+        limit = int(data.get("limit", 5))
+
+        if not query:
+            return SafeJSONResponse(content={"results": []})
+
+        fetch_k = max(limit * 3, 15)
+
+        def fetch(mem: Memory, tag: str):
+            try:
+                response = mem.search(query, user_id=user_id, limit=fetch_k)
+                items = response.get("results", [])
+            except Exception:
+                items = []
+            for item in items:
+                item["_source"] = tag
+            return items
+
+        conv_items = fetch(memory_conv, "conversational")
+        know_items = fetch(memory_know, "knowledge")
+        merged = rerank_results(query, conv_items + know_items, top_k=limit)
+
+        print(
+            f"[search/all] user={user_id} query={query!r} "
+            f"conv={len(conv_items)} know={len(know_items)} merged={len(merged)}"
+        )
+        return SafeJSONResponse(content={"results": merged})
+
+    return router

+ 55 - 0
mem0core/storage.py

@@ -0,0 +1,55 @@
+import sqlite3
+
+from .config import SQLITE_PATH
+
+
+def sqlite_delete_ids(memory_ids: list[str]) -> int:
+    """Delete history rows by memory ID from SQLite and return deleted count."""
+    if not memory_ids:
+        return 0
+    try:
+        conn = sqlite3.connect(SQLITE_PATH)
+        cur = conn.cursor()
+        placeholders = ",".join("?" * len(memory_ids))
+        cur.execute(f"DELETE FROM history WHERE memory_id IN ({placeholders})", memory_ids)
+        deleted = cur.rowcount
+        conn.commit()
+        conn.close()
+        return deleted
+    except Exception as exc:
+        print(f"[sqlite] warning: {exc}")
+        return 0
+
+
+def chroma_get_all(collection, user_id: str, include: list | None = None) -> list[dict]:
+    """Page through a Chroma collection and return all rows for one user."""
+    if include is None:
+        include = ["metadatas"]
+
+    results = []
+    batch = 500
+    offset = 0
+
+    while True:
+        page = collection.get(
+            where={"user_id": {"$eq": user_id}},
+            limit=batch,
+            offset=offset,
+            include=include,
+        )
+        ids = page.get("ids", [])
+        if not ids:
+            break
+
+        for i, item_id in enumerate(ids):
+            row = {"id": item_id}
+            for field in include:
+                values = page.get(field, [])
+                row[field[:-1]] = values[i] if i < len(values) else None
+            results.append(row)
+
+        offset += len(ids)
+        if len(ids) < batch:
+            break
+
+    return results

+ 9 - 705
mem0server.py

@@ -1,711 +1,15 @@
-import os
-import math
-import json
-import sqlite3
-import httpx
-from fastapi import FastAPI, Request
-from fastapi.responses import JSONResponse, HTMLResponse
-from mem0 import Memory
+"""Backward-compatible mem0 server entrypoint.
 
-# =============================================================================
-# ENVIRONMENT
-# =============================================================================
+This module intentionally keeps the historical filename so existing
+commands like `uvicorn mem0server:app` continue to work after refactoring.
+"""
 
-GROQ_API_KEY = os.environ.get("GROQ_API_KEY")
-if not GROQ_API_KEY:
-    raise RuntimeError("GROQ_API_KEY environment variable is not set.")
+from mem0core.app import create_app
 
-RERANKER_URL = os.environ.get("RERANKER_URL", "http://192.168.0.200:5200/rerank")
-SQLITE_PATH  = os.path.expanduser("~/.mem0/history.db")
+app = create_app()
 
-# =============================================================================
-# SAFE JSON RESPONSE
-# Chroma and the reranker can emit Infinity/NaN which is invalid JSON.
-# Sanitize them to None before serializing.
-# =============================================================================
 
-def _sanitize(obj):
-    if isinstance(obj, float):
-        if math.isnan(obj) or math.isinf(obj):
-            return None
-    if isinstance(obj, dict):
-        return {k: _sanitize(v) for k, v in obj.items()}
-    if isinstance(obj, list):
-        return [_sanitize(i) for i in obj]
-    return obj
+if __name__ == "__main__":
+    import uvicorn
 
-
-class SafeJSONResponse(JSONResponse):
-    def render(self, content) -> bytes:
-        return json.dumps(_sanitize(content), ensure_ascii=False).encode("utf-8")
-
-
-# =============================================================================
-# METADATA SANITIZER
-# Chroma MetadataValue only accepts str, int, float, bool.
-# Drop None values; coerce anything else (lists, dicts) to str.
-# =============================================================================
-
-def sanitize_metadata(meta: dict) -> dict:
-    clean = {}
-    for k, v in meta.items():
-        if v is None:
-            continue
-        if isinstance(v, (str, int, float, bool)):
-            clean[k] = v
-        else:
-            clean[k] = str(v)
-    return clean
-
-
-# =============================================================================
-# PROMPTS
-# Mapped to MemoryConfig.custom_fact_extraction_prompt /
-#            MemoryConfig.custom_update_memory_prompt  (top-level fields).
-#
-# conversational — active, used by /memories on every add
-# knowledge      — defined for future use; currently bypassed because
-#                  /knowledge always stores verbatim (infer=False)
-# =============================================================================
-
-
-PROMPTS = {
-    "conversational": {
-        "fact_extraction": """
-You are an intelligent system that extracts useful long-term memory
-from a conversation.
-Your goal is to identify information that could help future interactions.
-Extract facts that describe:
-1. User preferences
-2. Important decisions
-3. Ongoing projects
-4. Tools or technologies being used
-5. Goals or plans
-6. Constraints or requirements
-7. Discoveries or conclusions
-8. Important context about tasks
-Ignore:
-- greetings
-- casual conversation
-- general world knowledge
-- temporary statements
-Return JSON:
-{
- "facts": [
-   "fact 1",
-   "fact 2"
- ]
-}
-Only include information that may be useful later.
-If nothing important is present return:
-{"facts": []}
-""".strip(),
- 
-        "update_memory": """
-You manage a long-term memory database.
-You receive:
-1. existing stored memories
-2. new extracted facts
-For each fact decide whether to:
-ADD
-Create a new memory if it contains useful new information.
-UPDATE
-Modify an existing memory if the new fact refines or corrects it.
-DELETE
-Remove a memory if it is clearly outdated or incorrect.
-NONE
-Ignore the fact if it is redundant or trivial.
-Guidelines:
-- Prefer updating over adding duplicates
-- Keep memories concise
-- Avoid storing repeated information
-- Preserve important context
-Return JSON list:
-[
- { "event": "ADD", "text": "..." },
- { "event": "UPDATE", "id": "...", "text": "..." }
-]
-""".strip(),
-    },
- 
-    "knowledge": {
-        # Not active during ingest (infer=False bypasses extraction).
-        # Kept here so it can be enabled if infer=True is ever needed.
-        "fact_extraction": """
-You are a knowledge extraction system that reads source material and produces
-a list of objective, encyclopedic facts. Write each fact as a precise,
-self-contained sentence. Do NOT reframe facts as user preferences or interests.
-Preserve names, terminology, and relationships exactly as they appear.
-Examples:
-  - "Silvio Gesell proposed demurrage as a mechanism to discourage hoarding of currency."
-  - "The MIDI standard uses a 7-bit checksum for SysEx message validation."
-Only extract verifiable facts. Ignore meta-commentary and transitional prose.
-Return JSON: {"facts": ["fact 1", "fact 2"]}
-""".strip(),
- 
-        "update_memory": """
-You manage a knowledge base that stores objective facts extracted from books,
-documents, and reference material. You receive existing facts and new
-information. Update, merge, or add facts as needed. Keep each fact as a
-precise, self-contained sentence. Remove duplicates and outdated entries.
-Return JSON list: [{ "event": "ADD"|"UPDATE"|"DELETE"|"NONE", "text": "..." }]
-""".strip(),
-    },
-}
-
-# =============================================================================
-# MEM0 CONFIG FACTORY
-# Prompts are top-level MemoryConfig fields — not nested inside llm.config.
-# =============================================================================
-
-def make_config(collection_name: str, prompt_key: str) -> dict:
-    prompts = PROMPTS[prompt_key]
-    return {
-        "llm": {
-            "provider": "groq",
-            "config": {
-                "model": "meta-llama/llama-4-scout-17b-16e-instruct",
-                "temperature": 0.025,
-                "max_tokens": 1500,
-            },
-        },
-        "vector_store": {
-            "provider": "chroma",
-            "config": {
-                "host": "192.168.0.200",
-                "port": 8001,
-                "collection_name": collection_name,
-            },
-        },
-        "embedder": {
-            "provider": "ollama",
-            "config": {
-                "model": "nomic-embed-text",
-                "ollama_base_url": "http://192.168.0.200:11434",
-            },
-        },
-        # Top-level MemoryConfig fields — confirmed from MemoryConfig source
-        "custom_fact_extraction_prompt": prompts["fact_extraction"],
-        "custom_update_memory_prompt":   prompts["update_memory"],
-    }
-
-
-# =============================================================================
-# MEMORY INSTANCES
-# =============================================================================
-
-memory_conv = Memory.from_config(make_config("openclaw_mem", "conversational"))
-memory_know = Memory.from_config(make_config("knowledge_mem", "knowledge"))
-
-# =============================================================================
-# CHROMA EMPTY-FILTER PATCH
-# mem0 sometimes passes an empty filter dict to Chroma which raises an error.
-# Replace with a harmless always-true filter as fallback.
-# =============================================================================
-
-NOOP_WHERE = {"$and": [
-    {"user_id": {"$ne": ""}},
-    {"user_id": {"$ne": ""}},
-]}
-
-
-def is_effectively_empty(filters) -> bool:
-    if not filters:
-        return True
-    if filters in ({"AND": []}, {"OR": []}):
-        return True
-    return False
-
-
-def make_safe_search(mem_instance: Memory):
-    orig = mem_instance.vector_store.search
-
-    def safe_search(query, vectors, limit=10, filters=None):
-        if is_effectively_empty(filters):
-            return mem_instance.vector_store.collection.query(
-                query_embeddings=vectors,
-                n_results=limit,
-                where=NOOP_WHERE,
-            )
-        try:
-            return orig(query=query, vectors=vectors, limit=limit, filters=filters)
-        except Exception as e:
-            if "Expected where" in str(e):
-                return mem_instance.vector_store.collection.query(
-                    query_embeddings=vectors,
-                    n_results=limit,
-                    where=NOOP_WHERE,
-                )
-            raise
-
-    return safe_search
-
-
-memory_conv.vector_store.search = make_safe_search(memory_conv)
-memory_know.vector_store.search = make_safe_search(memory_know)
-
-# =============================================================================
-# RERANKER
-# Calls local reranker to re-order search results by relevance.
-# Falls back to raw mem0 order if unreachable.
-# =============================================================================
-
-def rerank_results(query: str, items: list, top_k: int) -> list:
-    if not items:
-        return items
-
-    documents = [r.get("memory", "") for r in items]
-    try:
-        resp = httpx.post(
-            RERANKER_URL,
-            json={"query": query, "documents": documents, "top_k": top_k},
-            timeout=5.0,
-        )
-        resp.raise_for_status()
-        reranked = resp.json()["results"]
-    except Exception as exc:
-        print(f"[reranker] unavailable, skipping: {exc}")
-        return items[:top_k]
-
-    # Re-attach original mem0 metadata by matching text
-    text_to_meta = {r.get("memory", ""): r for r in items}
-    merged = []
-    for r in reranked:
-        meta = text_to_meta.get(r["text"])
-        if meta:
-            merged.append({**meta, "rerank_score": r["score"]})
-    return merged
-
-
-# =============================================================================
-# SQLITE HELPER
-# mem0 maintains a local SQLite history alongside Chroma.
-# Both must be cleaned together or deleted entries reappear after restart.
-# =============================================================================
-
-def sqlite_delete_ids(memory_ids: list[str]) -> int:
-    """Delete rows by memory_id. Returns count deleted."""
-    if not memory_ids:
-        return 0
-    try:
-        conn = sqlite3.connect(SQLITE_PATH)
-        cur  = conn.cursor()
-        placeholders = ",".join("?" * len(memory_ids))
-        cur.execute(
-            f"DELETE FROM history WHERE memory_id IN ({placeholders})",
-            memory_ids
-        )
-        deleted = cur.rowcount
-        conn.commit()
-        conn.close()
-        return deleted
-    except Exception as e:
-        print(f"[sqlite] warning: {e}")
-        return 0
-
-
-# =============================================================================
-# CHROMA PAGINATION HELPER
-# mem0's get_all() is capped at 100 entries. This pages Chroma directly
-# in batches of 500 to retrieve the full collection without limits.
-# =============================================================================
-
-def chroma_get_all(collection, user_id: str, include: list = None) -> list[dict]:
-    if include is None:
-        include = ["metadatas"]
-
-    results = []
-    batch   = 500
-    offset  = 0
-
-    while True:
-        page = collection.get(
-            where={"user_id": {"$eq": user_id}},
-            limit=batch,
-            offset=offset,
-            include=include,
-        )
-        ids = page.get("ids", [])
-        if not ids:
-            break
-
-        for i, id_ in enumerate(ids):
-            row = {"id": id_}
-            for field in include:
-                values = page.get(field, [])
-                row[field[:-1]] = values[i] if i < len(values) else None
-            results.append(row)
-
-        offset += len(ids)
-        if len(ids) < batch:
-            break
-
-    return results
-
-
-# =============================================================================
-# SHARED HANDLERS
-# =============================================================================
-
-def extract_user_id(data: dict) -> str:
-    return data.get("userId") or data.get("user_id") or "default"
-
-
-async def handle_add(req: Request, mem: Memory, verbatim_allowed: bool = False):
-    """
-    Shared add handler for /memories and /knowledge.
-
-    /knowledge  (verbatim_allowed=True)  — always infer=False. The ingestor
-                                           already summarised; skip the second
-                                           LLM extraction pass.
-    /memories   (verbatim_allowed=False) — always LLM extraction using the
-                                           conversational prompts above.
-
-    Accepts: text | messages, user_id, metadata.
-    Metadata is sanitized — Chroma rejects None and complex types.
-    """
-    data     = await req.json()
-    user_id  = extract_user_id(data)
-#    metadata = sanitize_metadata(data.get("metadata") or {})
-    raw_meta = data.get("metadata")
-    metadata = sanitize_metadata(raw_meta) if raw_meta else None
-    messages = data.get("messages")
-    text     = data.get("text")
-
-    if not messages and not text:
-        return SafeJSONResponse(
-            content={"error": "Provide 'text' or 'messages'"}, status_code=400
-        )
-
-    if verbatim_allowed:
-        # /knowledge — store verbatim, ingestor already did the summarisation
-        content = text or " ".join(
-            m["content"] for m in messages if m.get("role") == "user"
-        )
-        result = mem.add(content, user_id=user_id, metadata=metadata, infer=False)
-        print(f"[add verbatim] user={user_id} chars={len(content)} meta={metadata}")
-        return SafeJSONResponse(content=result)
-
-    # in the /memories path
-    kwargs = {"user_id": user_id}
-    if metadata:
-        kwargs["metadata"] = metadata
-    result = mem.add(messages or text, **kwargs)
-
-#    # /memories — LLM extracts and deduplicates facts from conversation
-#    if messages:
-#        result = mem.add(messages, user_id=user_id)
-#    else:
-#        result = mem.add(text, user_id=user_id)
-
-    print(f"[add conversational] user={user_id} meta={metadata}")
-    return SafeJSONResponse(content=result)
-
-
-async def handle_search(req: Request, mem: Memory):
-    """Semantic search with reranking. Fetches limit×3 candidates then reranks."""
-    data    = await req.json()
-    query   = (data.get("query") or "").strip()
-    user_id = extract_user_id(data)
-    limit   = int(data.get("limit", 5))
-
-    if not query:
-        return SafeJSONResponse(content={"results": []})
-
-    fetch_k = max(limit * 3, 15)
-    try:
-        result = mem.search(query, user_id=user_id, limit=fetch_k)
-    except Exception:
-        # Fallback: get_all + simple text filter
-        all_res = mem.get_all(user_id=user_id)
-        items   = (
-            all_res.get("results", [])
-            if isinstance(all_res, dict)
-            else (all_res if isinstance(all_res, list) else [])
-        )
-        q      = query.lower()
-        items  = [r for r in items if q in r.get("memory", "").lower()]
-        result = {"results": items}
-
-    items = result.get("results", [])
-    items = rerank_results(query, items, top_k=limit)
-    print(f"[search] user={user_id} query={query!r} hits={len(items)}")
-    return SafeJSONResponse(content={"results": items})
-
-
-async def handle_recent(req: Request, mem: Memory):
-    """Return most recently created memories, sorted by created_at desc."""
-    data    = await req.json()
-    user_id = extract_user_id(data)
-    if not user_id:
-        return SafeJSONResponse(content={"error": "Missing userId"}, status_code=400)
-    limit = int(data.get("limit", 5))
-
-    try:
-        results = mem.get_all(user_id=user_id)
-    except Exception:
-        results = mem.search(query="recent", user_id=user_id)
-
-    items = results.get("results", [])
-    items = sorted(items, key=lambda r: r.get("created_at", ""), reverse=True)
-    return SafeJSONResponse(content={"results": items[:limit]})
-
-
-# =============================================================================
-# APP
-# =============================================================================
-
-app = FastAPI(title="mem0 server")
-
-
-# ---------------------------------------------------------------------------
-# DASHBOARD — served from file mounted via docker-compose volume
-# ---------------------------------------------------------------------------
-
-DASHBOARD_HTML = open("dashboard.html").read()
-
-@app.get("/dashboard")
-async def dashboard():
-    return HTMLResponse(content=DASHBOARD_HTML)
-
-
-# ---------------------------------------------------------------------------
-# HEALTH
-# ---------------------------------------------------------------------------
-
-@app.get("/health")
-async def health():
-    return SafeJSONResponse(content={
-        "status": "ok",
-        "reranker_url": RERANKER_URL,
-        "collections": {
-            "conversational": "openclaw_mem",
-            "knowledge":      "knowledge_mem",
-        },
-        # Show first 80 chars of each prompt for quick verification
-        "prompts": {
-            k: {pk: pv[:80] + "…" for pk, pv in pv_dict.items()}
-            for k, pv_dict in PROMPTS.items()
-        },
-    })
-
-
-# ---------------------------------------------------------------------------
-# /memories — conversational collection (OpenClaw)
-# ---------------------------------------------------------------------------
-
-@app.post("/memories")
-async def add_memory(req: Request):
-    return await handle_add(req, memory_conv, verbatim_allowed=False)
-
-
-@app.post("/memories/search")
-async def search_memories(req: Request):
-    return await handle_search(req, memory_conv)
-
-
-@app.post("/memories/recent")
-async def recent_memories(req: Request):
-    return await handle_recent(req, memory_conv)
-
-
-@app.delete("/memories")
-async def delete_memory(req: Request):
-    data = await req.json()
-    return SafeJSONResponse(content=memory_conv.delete(data.get("filter", {})))
-
-
-# ---------------------------------------------------------------------------
-# /knowledge — objective facts collection (book-ingestor)
-# ---------------------------------------------------------------------------
-
-@app.post("/knowledge")
-async def add_knowledge(req: Request):
-    return await handle_add(req, memory_know, verbatim_allowed=True)
-
-
-@app.post("/knowledge/search")
-async def search_knowledge(req: Request):
-    return await handle_search(req, memory_know)
-
-
-@app.post("/knowledge/recent")
-async def recent_knowledge(req: Request):
-    return await handle_recent(req, memory_know)
-
-
-@app.delete("/knowledge")
-async def delete_knowledge(req: Request):
-    data = await req.json()
-    return SafeJSONResponse(content=memory_know.delete(data.get("filter", {})))
-
-
-@app.post("/knowledge/sources")
-async def knowledge_sources(req: Request):
-    """
-    Return distinct source_file values with entry counts.
-    Pages Chroma directly — bypasses mem0's 100-entry get_all cap.
-    """
-    data    = await req.json()
-    user_id = extract_user_id(data) or "knowledge_base"
-
-    rows   = chroma_get_all(memory_know.vector_store.collection, user_id)
-    counts = {}
-    for row in rows:
-        src = (row.get("metadata") or {}).get("source_file", "(no source)")
-        counts[src] = counts.get(src, 0) + 1
-
-    sources = [
-        {"source_file": k, "count": v}
-        for k, v in sorted(counts.items(), key=lambda x: -x[1])
-    ]
-    print(f"[sources] user={user_id} total={len(rows)} books={len(sources)}")
-    return SafeJSONResponse(content={"sources": sources, "total": len(rows)})
-
-
-@app.delete("/knowledge/by-source")
-async def delete_knowledge_by_source(req: Request):
-    """
-    Delete all entries for a given source_file from both Chroma and SQLite.
-    Pages Chroma directly to avoid the 100-entry cap on get_all.
-    """
-    data        = await req.json()
-    source_file = data.get("source_file")
-    user_id     = extract_user_id(data) or "knowledge_base"
-
-    if not source_file:
-        return SafeJSONResponse(
-            content={"error": "Missing source_file"}, status_code=400
-        )
-
-    # Collect all IDs matching source_file across all pages
-    rows = chroma_get_all(memory_know.vector_store.collection, user_id)
-    to_delete = [
-        row["id"] for row in rows
-        if (row.get("metadata") or {}).get("source_file") == source_file
-    ]
-
-    if not to_delete:
-        return SafeJSONResponse(
-            content={"deleted": 0, "message": "no entries found for that source"}
-        )
-
-    # Delete from Chroma in one bulk call
-    try:
-        memory_know.vector_store.collection.delete(ids=to_delete)
-    except Exception as e:
-        return SafeJSONResponse(
-            content={"error": f"chroma delete failed: {e}"}, status_code=500
-        )
-
-    # Clean SQLite so entries don't reappear after server restart
-    sqlite_deleted = sqlite_delete_ids(to_delete)
-
-    print(f"[delete by-source] source={source_file} "
-          f"chroma={len(to_delete)} sqlite={sqlite_deleted}")
-
-    return SafeJSONResponse(content={
-        "deleted":        len(to_delete),
-        "sqlite_deleted": sqlite_deleted,
-        "source_file":    source_file,
-    })
-
-
-# ---------------------------------------------------------------------------
-# /memory/{id} — single entry delete for dashboard per-row buttons
-# ---------------------------------------------------------------------------
-
-@app.delete("/memory/{memory_id}")
-async def delete_single_memory(memory_id: str, req: Request):
-    """
-    Delete one memory by ID from either collection.
-    Body: { "collection": "knowledge" | "conversational" }
-    Cleans both Chroma and SQLite.
-    """
-    data       = await req.json()
-    collection = data.get("collection", "knowledge")
-    mem        = memory_know if collection == "knowledge" else memory_conv
-
-    try:
-        mem.vector_store.collection.delete(ids=[memory_id])
-    except Exception as e:
-        return SafeJSONResponse(
-            content={"error": f"chroma delete failed: {e}"}, status_code=500
-        )
-
-    sqlite_delete_ids([memory_id])
-
-    print(f"[delete single] id={memory_id} collection={collection}")
-    return SafeJSONResponse(content={"deleted": memory_id})
-
-
-# ---------------------------------------------------------------------------
-# /search — merged results from both collections (OpenClaw autorecall)
-# ---------------------------------------------------------------------------
-
-@app.post("/search")
-async def search_all(req: Request):
-    """
-    Query both collections simultaneously, tag results with _source,
-    then run a single rerank pass over the merged pool.
-    """
-    data    = await req.json()
-    query   = (data.get("query") or "").strip()
-    user_id = extract_user_id(data)
-    limit   = int(data.get("limit", 5))
-
-    if not query:
-        return SafeJSONResponse(content={"results": []})
-
-    fetch_k = max(limit * 3, 15)
-
-    def fetch(mem: Memory, tag: str):
-        try:
-            r     = mem.search(query, user_id=user_id, limit=fetch_k)
-            items = r.get("results", [])
-        except Exception:
-            items = []
-        for item in items:
-            item["_source"] = tag
-        return items
-
-    conv_items = fetch(memory_conv, "conversational")
-    know_items = fetch(memory_know, "knowledge")
-    merged     = rerank_results(query, conv_items + know_items, top_k=limit)
-
-    print(
-        f"[search/all] user={user_id} query={query!r} "
-        f"conv={len(conv_items)} know={len(know_items)} merged={len(merged)}"
-    )
-    return SafeJSONResponse(content={"results": merged})
-
-
-@app.post("/memories/all")
-async def memories_all(req: Request):
-    """
-    Return all memories for a user, paging Chroma directly.
-    Bypasses mem0's 100-entry get_all cap.
-    """
-    data    = await req.json()
-    user_id = extract_user_id(data) or "main"
-
-    rows = chroma_get_all(
-        memory_conv.vector_store.collection,
-        user_id,
-        include=["metadatas", "documents"]
-    )
-
-    items = []
-    for row in rows:
-        meta = row.get("metadata") or {}
-        items.append({
-            "id":         row["id"],
-            "memory":     row.get("document") or meta.get("data", ""),
-            "created_at": meta.get("created_at"),
-            "metadata":   meta,
-            "user_id":    user_id,
-        })
-
-    items.sort(key=lambda r: r.get("created_at") or "", reverse=True)
-    print(f"[memories/all] user={user_id} total={len(items)}")
-    return SafeJSONResponse(content={"results": items})
+    uvicorn.run("mem0server:app", host="0.0.0.0", port=8420, reload=False)