| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473 |
- import os
- import math
- import json
- import httpx
- from fastapi import FastAPI, Request
- from fastapi.responses import JSONResponse
- from mem0 import Memory
- # =============================================================================
- # ENVIRONMENT
- # =============================================================================
- GROQ_API_KEY = os.environ.get("GROQ_API_KEY")
- if not GROQ_API_KEY:
- raise RuntimeError("GROQ_API_KEY environment variable is not set.")
- RERANKER_URL = os.environ.get("RERANKER_URL", "http://192.168.0.200:5200/rerank")
- # =============================================================================
- # SAFE JSON RESPONSE (handles Infinity / NaN from Chroma / reranker scores)
- # =============================================================================
- def _sanitize(obj):
- if isinstance(obj, float):
- if math.isnan(obj) or math.isinf(obj):
- return None
- if isinstance(obj, dict):
- return {k: _sanitize(v) for k, v in obj.items()}
- if isinstance(obj, list):
- return [_sanitize(i) for i in obj]
- return obj
- class SafeJSONResponse(JSONResponse):
- def render(self, content) -> bytes:
- return json.dumps(
- _sanitize(content), ensure_ascii=False
- ).encode("utf-8")
- # =============================================================================
- # PROMPTS
- # Edit these to change how each collection extracts and stores facts.
- # =============================================================================
- PROMPTS = {
- # Used by /memories — conversational, user-centric recall for OpenClaw.
- "conversational": {
- "fact_extraction": """
- You are an intelligent system that extracts useful long-term memory
- from a conversation.
- Your goal is to identify information that could help future interactions.
- Extract facts that describe:
- 1. User preferences
- 2. Important decisions
- 3. Ongoing projects
- 4. Tools or technologies being used
- 5. Goals or plans
- 6. Constraints or requirements
- 7. Discoveries or conclusions
- 8. Important context about tasks
- Ignore:
- - greetings
- - casual conversation
- - general world knowledge
- - temporary statements
- Return the result in JSON format:
- {
- "facts": [
- "fact 1",
- "fact 2"
- ]
- }
- Only include information that may be useful later.
- If nothing important is present return:
- {"facts": []}
- """.strip(),
- "update_memory": """
- You manage a long-term memory database.
- You receive:
- 1. existing stored memories
- 2. new extracted facts
- For each fact decide whether to:
- ADD
- Create a new memory if it contains useful new information.
- UPDATE
- Modify an existing memory if the new fact refines or corrects it.
- DELETE
- Remove a memory if it is clearly outdated or incorrect.
- NONE
- Ignore the fact if it is redundant or trivial.
- Guidelines:
- - Prefer updating over adding duplicates
- - Keep memories concise
- - Avoid storing repeated information
- - Preserve important context
- Return JSON list:
- [
- { "event": "ADD", "text": "..." },
- { "event": "UPDATE", "id": "...", "text": "..." }
- ]
- """.strip(),
- },
- # Used by /knowledge — objective, source-neutral facts for book/doc ingest.
- "knowledge": {
- "fact_extraction": """
- You are a knowledge extraction system that reads source material and produces
- a list of objective, encyclopedic facts. Write each fact as a precise,
- self-contained sentence. Do NOT reframe facts as user preferences or interests.
- Preserve names, terminology, and relationships exactly as they appear.
- Examples:
- - "Silvio Gesell proposed demurrage as a mechanism to discourage hoarding of currency."
- - "The MIDI standard uses a 7-bit checksum for SysEx message validation."
- Only extract verifiable facts. Ignore meta-commentary and transitional prose.
- """.strip(),
- "update_memory": """
- You manage a knowledge base that stores objective facts extracted from books,
- documents, and reference material. You receive existing facts and new
- information. Update, merge, or add facts as needed. Keep each fact as a
- precise, self-contained sentence. Remove duplicates and outdated entries.
- """.strip(),
- },
- }
- # =============================================================================
- # MEM0 CONFIG FACTORY
- # =============================================================================
- def make_config(collection_name: str, prompt_key: str) -> dict:
- return {
- "llm": {
- "provider": "groq",
- "config": {
- "model": "meta-llama/llama-4-scout-17b-16e-instruct",
- "temperature": 0.025,
- "max_tokens": 1500,
- },
- },
- "vector_store": {
- "provider": "chroma",
- "config": {
- "host": "192.168.0.200",
- "port": 8001,
- "collection_name": collection_name,
- },
- },
- "embedder": {
- "provider": "ollama",
- "config": {
- "model": "nomic-embed-text",
- "ollama_base_url": "http://192.168.0.200:11434",
- },
- },
- "custom_prompts": PROMPTS[prompt_key],
- }
- # =============================================================================
- # MEMORY INSTANCES
- # =============================================================================
- memory_conv = Memory.from_config(make_config("openclaw_mem", "conversational"))
- memory_know = Memory.from_config(make_config("knowledge_mem", "knowledge"))
- # =============================================================================
- # CHROMA EMPTY-FILTER PATCH (applied to both instances)
- # =============================================================================
- NOOP_WHERE = {"$and": [
- {"user_id": {"$ne": ""}},
- {"user_id": {"$ne": ""}},
- ]}
- def is_effectively_empty(filters) -> bool:
- if not filters:
- return True
- if filters in ({"AND": []}, {"OR": []}):
- return True
- return False
- def make_safe_search(mem_instance: Memory):
- orig = mem_instance.vector_store.search
- def safe_search(query, vectors, limit=10, filters=None):
- if is_effectively_empty(filters):
- return mem_instance.vector_store.collection.query(
- query_embeddings=vectors,
- n_results=limit,
- where=NOOP_WHERE,
- )
- try:
- return orig(query=query, vectors=vectors, limit=limit, filters=filters)
- except Exception as e:
- if "Expected where" in str(e):
- return mem_instance.vector_store.collection.query(
- query_embeddings=vectors,
- n_results=limit,
- where=NOOP_WHERE,
- )
- raise
- return safe_search
- memory_conv.vector_store.search = make_safe_search(memory_conv)
- memory_know.vector_store.search = make_safe_search(memory_know)
- # =============================================================================
- # RERANKER
- # =============================================================================
- def rerank_results(query: str, items: list, top_k: int) -> list:
- """Re-order results via local reranker. Falls back gracefully."""
- if not items:
- return items
- documents = [r.get("memory", "") for r in items]
- try:
- resp = httpx.post(
- RERANKER_URL,
- json={"query": query, "documents": documents, "top_k": top_k},
- timeout=5.0,
- )
- resp.raise_for_status()
- reranked = resp.json()["results"]
- except Exception as exc:
- print(f"[reranker] unavailable, skipping rerank: {exc}")
- return items[:top_k]
- text_to_meta = {r.get("memory", ""): r for r in items}
- merged = []
- for r in reranked:
- meta = text_to_meta.get(r["text"])
- if meta:
- merged.append({**meta, "rerank_score": r["score"]})
- return merged
- # =============================================================================
- # SHARED HELPERS
- # =============================================================================
- def extract_user_id(data: dict) -> str:
- return data.get("userId") or data.get("user_id") or "default"
- async def handle_add(req: Request, mem: Memory, verbatim_allowed: bool = False):
- """
- Shared add handler for /memories and /knowledge.
- Supports:
- - text — raw string (legacy)
- - messages — list of {role, content} dicts (standard mem0)
- - infer — bool, default True. If False and verbatim_allowed=True,
- stores content without LLM extraction.
- - metadata — dict, passed through to mem0
- - user_id / userId
- """
- data = await req.json()
- user_id = extract_user_id(data)
- metadata = data.get("metadata") or {}
- infer = data.get("infer", True)
- messages = data.get("messages")
- text = data.get("text")
- if not messages and not text:
- return SafeJSONResponse(
- content={"error": "Provide 'text' or 'messages'"}, status_code=400
- )
- # infer:false — store verbatim (knowledge collection only)
- if verbatim_allowed and not infer:
- content = text or " ".join(
- m["content"] for m in messages if m.get("role") == "user"
- )
- result = mem.add(content, user_id=user_id, metadata=metadata, infer=False)
- print(f"[add verbatim] user={user_id} chars={len(content)} meta={metadata}")
- return SafeJSONResponse(content=result)
- # Normal path — LLM extraction
- if messages:
- result = mem.add(messages, user_id=user_id, metadata=metadata)
- else:
- result = mem.add(text, user_id=user_id, metadata=metadata)
- print(f"[add] user={user_id} infer=True meta={metadata}")
- return SafeJSONResponse(content=result)
- async def handle_search(req: Request, mem: Memory):
- data = await req.json()
- query = (data.get("query") or "").strip()
- user_id = extract_user_id(data)
- limit = int(data.get("limit", 5))
- if not query:
- return SafeJSONResponse(content={"results": []})
- fetch_k = max(limit * 3, 15)
- try:
- result = mem.search(query, user_id=user_id, limit=fetch_k)
- except Exception:
- all_res = mem.get_all(user_id=user_id)
- items = (
- all_res.get("results", [])
- if isinstance(all_res, dict)
- else (all_res if isinstance(all_res, list) else [])
- )
- q = query.lower()
- items = [r for r in items if q in r.get("memory", "").lower()]
- result = {"results": items}
- items = result.get("results", [])
- items = rerank_results(query, items, top_k=limit)
- print(f"[search] user={user_id} query={query!r} hits={len(items)}")
- return SafeJSONResponse(content={"results": items})
- async def handle_recent(req: Request, mem: Memory):
- data = await req.json()
- user_id = extract_user_id(data)
- if not user_id:
- return SafeJSONResponse(content={"error": "Missing userId"}, status_code=400)
- limit = int(data.get("limit", 5))
- try:
- results = mem.get_all(user_id=user_id)
- except Exception:
- results = mem.search(query="recent", user_id=user_id)
- items = results.get("results", [])
- items = sorted(items, key=lambda r: r.get("created_at", ""), reverse=True)
- return SafeJSONResponse(content={"results": items[:limit]})
- # =============================================================================
- # APP
- # =============================================================================
- app = FastAPI(title="mem0 server")
- @app.get("/health")
- async def health():
- return SafeJSONResponse(content={
- "status": "ok",
- "reranker_url": RERANKER_URL,
- "collections": {
- "conversational": "openclaw_mem",
- "knowledge": "knowledge_mem",
- },
- "prompts": {
- k: {pk: pv[:80] + "…" for pk, pv in pv_dict.items()}
- for k, pv_dict in PROMPTS.items()
- },
- })
- # ---------------------------------------------------------------------------
- # /memories — conversational, OpenClaw
- # ---------------------------------------------------------------------------
- @app.post("/memories")
- async def add_memory(req: Request):
- return await handle_add(req, memory_conv, verbatim_allowed=False)
- @app.post("/memories/search")
- async def search_memories(req: Request):
- return await handle_search(req, memory_conv)
- @app.post("/memories/recent")
- async def recent_memories(req: Request):
- return await handle_recent(req, memory_conv)
- @app.delete("/memories")
- async def delete_memory(req: Request):
- data = await req.json()
- return SafeJSONResponse(content=memory_conv.delete(data.get("filter", {})))
- # ---------------------------------------------------------------------------
- # /knowledge — objective facts, book-ingestor
- # ---------------------------------------------------------------------------
- @app.post("/knowledge")
- async def add_knowledge(req: Request):
- return await handle_add(req, memory_know, verbatim_allowed=True)
- @app.post("/knowledge/search")
- async def search_knowledge(req: Request):
- return await handle_search(req, memory_know)
- @app.post("/knowledge/recent")
- async def recent_knowledge(req: Request):
- return await handle_recent(req, memory_know)
- @app.delete("/knowledge")
- async def delete_knowledge(req: Request):
- data = await req.json()
- return SafeJSONResponse(content=memory_know.delete(data.get("filter", {})))
- # ---------------------------------------------------------------------------
- # /search — merged results from both collections (OpenClaw autorecall)
- # ---------------------------------------------------------------------------
- @app.post("/search")
- async def search_all(req: Request):
- """
- Query both collections and merge results.
- Results are tagged with _source: conversational | knowledge.
- Accepts same payload as /memories/search.
- """
- data = await req.json()
- query = (data.get("query") or "").strip()
- user_id = extract_user_id(data)
- limit = int(data.get("limit", 5))
- if not query:
- return SafeJSONResponse(content={"results": []})
- fetch_k = max(limit * 3, 15)
- def fetch(mem: Memory, tag: str):
- try:
- r = mem.search(query, user_id=user_id, limit=fetch_k)
- items = r.get("results", [])
- except Exception:
- items = []
- for item in items:
- item["_source"] = tag
- return items
- conv_items = fetch(memory_conv, "conversational")
- know_items = fetch(memory_know, "knowledge")
- merged = conv_items + know_items
- merged = rerank_results(query, merged, top_k=limit)
- print(
- f"[search/all] user={user_id} query={query!r} "
- f"conv={len(conv_items)} know={len(know_items)} merged={len(merged)}"
- )
- return SafeJSONResponse(content={"results": merged})
|