| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182 |
- import os
- import httpx
- from fastapi import FastAPI, Request
- from fastapi.responses import JSONResponse
- from mem0 import Memory
- # --- Env validation -----------------------------------------------------------
- GROQ_API_KEY = os.environ.get("GROQ_API_KEY")
- if not GROQ_API_KEY:
- raise RuntimeError("GROQ_API_KEY environment variable is not set.")
- RERANKER_URL = os.environ.get("RERANKER_URL", "http://192.168.0.200:5200/rerank")
- # --- mem0 config --------------------------------------------------------------
- config = {
- "llm": {
- "provider": "groq",
- "config": {
- "model": "meta-llama/llama-4-scout-17b-16e-instruct",
- "temperature": 0.025,
- "max_tokens": 1500,
- },
- },
- "vector_store": {
- "provider": "chroma",
- "config": {
- "host": "192.168.0.200",
- "port": 8001,
- "collection_name": "openclaw_mem",
- },
- },
- "embedder": {
- "provider": "ollama",
- "config": {
- "model": "nomic-embed-text",
- "ollama_base_url": "http://192.168.0.200:11434",
- },
- },
- }
- memory = Memory.from_config(config)
- # --- Patch: Chroma empty-filter crash -----------------------------------------
- orig_search = memory.vector_store.search
- NOOP_WHERE = {"$and": [
- {"user_id": {"$ne": ""}},
- {"user_id": {"$ne": ""}},
- ]}
- def is_effectively_empty(filters):
- if not filters:
- return True
- if filters in ({"AND": []}, {"OR": []}):
- return True
- return False
- def safe_search(query, vectors, limit=10, filters=None):
- if is_effectively_empty(filters):
- return memory.vector_store.collection.query(
- query_embeddings=vectors,
- n_results=limit,
- where=NOOP_WHERE,
- )
- try:
- return orig_search(query=query, vectors=vectors, limit=limit, filters=filters)
- except Exception as e:
- if "Expected where" in str(e):
- return memory.vector_store.collection.query(
- query_embeddings=vectors,
- n_results=limit,
- where=NOOP_WHERE,
- )
- raise
- memory.vector_store.search = safe_search
- # --- Reranker -----------------------------------------------------------------
- def rerank_results(query: str, items: list, top_k: int) -> list:
- """
- Call the local reranker server and re-order mem0 results by score.
- Falls back to the original list if the reranker is unavailable.
- """
- if not items:
- return items
- documents = [r.get("memory", "") for r in items]
- try:
- resp = httpx.post(
- RERANKER_URL,
- json={"query": query, "documents": documents, "top_k": top_k},
- timeout=5.0,
- )
- resp.raise_for_status()
- reranked = resp.json()["results"]
- except Exception as exc:
- print(f"[reranker] unavailable, skipping rerank: {exc}")
- return items[:top_k]
- # Re-attach original mem0 metadata by matching text
- text_to_meta = {r.get("memory", ""): r for r in items}
- merged = []
- for r in reranked:
- meta = text_to_meta.get(r["text"])
- if meta:
- merged.append({**meta, "rerank_score": r["score"]})
- return merged
- # --- App ----------------------------------------------------------------------
- app = FastAPI(title="mem0 server")
- @app.get("/health")
- async def health():
- return {"status": "ok", "reranker_url": RERANKER_URL}
- @app.post("/memories")
- async def add_memory(req: Request):
- data = await req.json()
- text = data.get("text")
- user_id = data.get("userId") or data.get("user_id") or "default"
- if not text:
- return JSONResponse({"error": "Empty 'text' field"}, status_code=400)
- result = memory.add(text, user_id=user_id)
- print("add_memory:", {"user_id": user_id, "text": text[:80], "result": result})
- return result
- @app.post("/memories/search")
- async def search(req: Request):
- data = await req.json()
- query = (data.get("query") or "").strip()
- user_id = data.get("userId") or data.get("user_id") or "default"
- limit = int(data.get("limit", 5))
- if not query:
- return {"results": []}
- # 1. Retrieve candidates from mem0 (fetch more than limit for reranker)
- fetch_k = max(limit * 3, 15)
- try:
- result = memory.search(query, user_id=user_id, limit=fetch_k)
- except Exception:
- # Fallback: get_all + simple text filter
- all_res = memory.get_all(user_id=user_id)
- items = all_res.get("results", []) if isinstance(all_res, dict) else (all_res if isinstance(all_res, list) else [])
- q = query.lower()
- items = [r for r in items if q in r.get("memory", "").lower()]
- result = {"results": items}
- items = result.get("results", [])
- # 2. Rerank
- items = rerank_results(query, items, top_k=limit)
- result = {"results": items}
- print("search:", {"user_id": user_id, "query": query, "count": len(items)})
- return result
- @app.delete("/memories")
- async def delete(req: Request):
- data = await req.json()
- return memory.delete(data.get("filter", {}))
- @app.post("/memories/recent")
- async def recent(req: Request):
- data = await req.json()
- user_id = data.get("userId") or data.get("user_id") or "default"
- if not user_id:
- return JSONResponse({"error": "Missing userId"}, status_code=400)
- limit = int(data.get("limit", 5))
- print("recent payload:", data, "user_id:", user_id)
- try:
- results = memory.get_all(user_id=user_id)
- except Exception:
- results = memory.search(query="*", user_id=user_id)
- items = results.get("results", [])
- items = sorted(items, key=lambda r: r.get("created_at", ""), reverse=True)
- return {"results": items[:limit]}
|