mem0server.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482
  1. import os
  2. import math
  3. import json
  4. import httpx
  5. from fastapi import FastAPI, Request
  6. from fastapi.responses import JSONResponse
  7. from fastapi.responses import HTMLResponse
  8. from mem0 import Memory
  9. # =============================================================================
  10. # ENVIRONMENT
  11. # =============================================================================
  12. GROQ_API_KEY = os.environ.get("GROQ_API_KEY")
  13. if not GROQ_API_KEY:
  14. raise RuntimeError("GROQ_API_KEY environment variable is not set.")
  15. RERANKER_URL = os.environ.get("RERANKER_URL", "http://192.168.0.200:5200/rerank")
  16. DASHBOARD_HTML = open("dashboard.html").read()
  17. # =============================================================================
  18. # SAFE JSON RESPONSE (handles Infinity / NaN from Chroma / reranker scores)
  19. # =============================================================================
  20. def _sanitize(obj):
  21. if isinstance(obj, float):
  22. if math.isnan(obj) or math.isinf(obj):
  23. return None
  24. if isinstance(obj, dict):
  25. return {k: _sanitize(v) for k, v in obj.items()}
  26. if isinstance(obj, list):
  27. return [_sanitize(i) for i in obj]
  28. return obj
  29. class SafeJSONResponse(JSONResponse):
  30. def render(self, content) -> bytes:
  31. return json.dumps(
  32. _sanitize(content), ensure_ascii=False
  33. ).encode("utf-8")
  34. # =============================================================================
  35. # PROMPTS
  36. # Edit these to change how each collection extracts and stores facts.
  37. # =============================================================================
  38. PROMPTS = {
  39. # Used by /memories — conversational, user-centric recall for OpenClaw.
  40. "conversational": {
  41. "fact_extraction": """
  42. You are an intelligent system that extracts useful long-term memory
  43. from a conversation.
  44. Your goal is to identify information that could help future interactions.
  45. Extract facts that describe:
  46. 1. User preferences
  47. 2. Important decisions
  48. 3. Ongoing projects
  49. 4. Tools or technologies being used
  50. 5. Goals or plans
  51. 6. Constraints or requirements
  52. 7. Discoveries or conclusions
  53. 8. Important context about tasks, persons, programs, projects and locations
  54. Ignore:
  55. - greetings
  56. - casual conversation
  57. - general world knowledge
  58. - temporary statements
  59. - debugging work
  60. Return the result in JSON format:
  61. {
  62. "facts": [
  63. "fact 1",
  64. "fact 2"
  65. ]
  66. }
  67. Only include information that may be useful later.
  68. If nothing important is present return:
  69. {"facts": []}
  70. """.strip(),
  71. "update_memory": """
  72. You manage a long-term memory database.
  73. You receive:
  74. 1. existing stored memories
  75. 2. new extracted facts
  76. For each fact decide whether to:
  77. ADD
  78. Create a new memory if it contains useful new information.
  79. UPDATE
  80. Modify an existing memory if the new fact refines or corrects it.
  81. DELETE
  82. Remove a memory if it is clearly outdated or incorrect.
  83. NONE
  84. Ignore the fact if it is redundant or trivial.
  85. Guidelines:
  86. - Prefer updating over adding duplicates
  87. - Keep memories concise
  88. - Avoid storing repeated information
  89. - Preserve important context
  90. Return JSON list:
  91. [
  92. { "event": "ADD", "text": "..." },
  93. { "event": "UPDATE", "id": "...", "text": "..." }
  94. ]
  95. """.strip(),
  96. },
  97. # Used by /knowledge — objective, source-neutral facts for book/doc ingest.
  98. "knowledge": {
  99. "fact_extraction": """
  100. You are a knowledge extraction system that reads source material and produces
  101. a list of objective, encyclopedic facts. Write each fact as a precise,
  102. self-contained sentence. Do NOT reframe facts as user preferences or interests.
  103. Preserve names, terminology, and relationships exactly as they appear.
  104. Examples:
  105. - "Silvio Gesell proposed demurrage as a mechanism to discourage hoarding of currency."
  106. - "The MIDI standard uses a 7-bit checksum for SysEx message validation."
  107. Only extract verifiable facts. Ignore meta-commentary and transitional prose.
  108. """.strip(),
  109. "update_memory": """
  110. You manage a knowledge base that stores objective facts extracted from books,
  111. documents, and reference material. You receive existing facts and new
  112. information. Update, merge, or add facts as needed. Keep each fact as a
  113. precise, self-contained sentence. Remove duplicates and outdated entries.
  114. """.strip(),
  115. },
  116. }
  117. # =============================================================================
  118. # MEM0 CONFIG FACTORY
  119. # =============================================================================
  120. def make_config(collection_name: str, prompt_key: str) -> dict:
  121. return {
  122. "llm": {
  123. "provider": "groq",
  124. "config": {
  125. "model": "meta-llama/llama-4-scout-17b-16e-instruct",
  126. "temperature": 0.025,
  127. "max_tokens": 1500,
  128. },
  129. },
  130. "vector_store": {
  131. "provider": "chroma",
  132. "config": {
  133. "host": "192.168.0.200",
  134. "port": 8001,
  135. "collection_name": collection_name,
  136. },
  137. },
  138. "embedder": {
  139. "provider": "ollama",
  140. "config": {
  141. "model": "nomic-embed-text",
  142. "ollama_base_url": "http://192.168.0.200:11434",
  143. },
  144. },
  145. "custom_prompts": PROMPTS[prompt_key],
  146. }
  147. # =============================================================================
  148. # MEMORY INSTANCES
  149. # =============================================================================
  150. memory_conv = Memory.from_config(make_config("openclaw_mem", "conversational"))
  151. memory_know = Memory.from_config(make_config("knowledge_mem", "knowledge"))
  152. # =============================================================================
  153. # CHROMA EMPTY-FILTER PATCH (applied to both instances)
  154. # =============================================================================
  155. NOOP_WHERE = {"$and": [
  156. {"user_id": {"$ne": ""}},
  157. {"user_id": {"$ne": ""}},
  158. ]}
  159. def is_effectively_empty(filters) -> bool:
  160. if not filters:
  161. return True
  162. if filters in ({"AND": []}, {"OR": []}):
  163. return True
  164. return False
  165. def make_safe_search(mem_instance: Memory):
  166. orig = mem_instance.vector_store.search
  167. def safe_search(query, vectors, limit=10, filters=None):
  168. if is_effectively_empty(filters):
  169. return mem_instance.vector_store.collection.query(
  170. query_embeddings=vectors,
  171. n_results=limit,
  172. where=NOOP_WHERE,
  173. )
  174. try:
  175. return orig(query=query, vectors=vectors, limit=limit, filters=filters)
  176. except Exception as e:
  177. if "Expected where" in str(e):
  178. return mem_instance.vector_store.collection.query(
  179. query_embeddings=vectors,
  180. n_results=limit,
  181. where=NOOP_WHERE,
  182. )
  183. raise
  184. return safe_search
  185. memory_conv.vector_store.search = make_safe_search(memory_conv)
  186. memory_know.vector_store.search = make_safe_search(memory_know)
  187. # =============================================================================
  188. # RERANKER
  189. # =============================================================================
  190. def rerank_results(query: str, items: list, top_k: int) -> list:
  191. """Re-order results via local reranker. Falls back gracefully."""
  192. if not items:
  193. return items
  194. documents = [r.get("memory", "") for r in items]
  195. try:
  196. resp = httpx.post(
  197. RERANKER_URL,
  198. json={"query": query, "documents": documents, "top_k": top_k},
  199. timeout=5.0,
  200. )
  201. resp.raise_for_status()
  202. reranked = resp.json()["results"]
  203. except Exception as exc:
  204. print(f"[reranker] unavailable, skipping rerank: {exc}")
  205. return items[:top_k]
  206. text_to_meta = {r.get("memory", ""): r for r in items}
  207. merged = []
  208. for r in reranked:
  209. meta = text_to_meta.get(r["text"])
  210. if meta:
  211. merged.append({**meta, "rerank_score": r["score"]})
  212. return merged
  213. # =============================================================================
  214. # SHARED HELPERS
  215. # =============================================================================
  216. def extract_user_id(data: dict) -> str:
  217. return data.get("userId") or data.get("user_id") or "default"
  218. async def handle_add(req: Request, mem: Memory, verbatim_allowed: bool = False):
  219. """
  220. Shared add handler for /memories and /knowledge.
  221. Supports:
  222. - text — raw string (legacy)
  223. - messages — list of {role, content} dicts (standard mem0)
  224. - infer — bool, default True. If False and verbatim_allowed=True,
  225. stores content without LLM extraction.
  226. - metadata — dict, passed through to mem0
  227. - user_id / userId
  228. """
  229. data = await req.json()
  230. user_id = extract_user_id(data)
  231. metadata = data.get("metadata") or {}
  232. infer = data.get("infer", True)
  233. messages = data.get("messages")
  234. text = data.get("text")
  235. if not messages and not text:
  236. return SafeJSONResponse(
  237. content={"error": "Provide 'text' or 'messages'"}, status_code=400
  238. )
  239. # infer:false — store verbatim (knowledge collection only)
  240. if verbatim_allowed and not infer:
  241. content = text or " ".join(
  242. m["content"] for m in messages if m.get("role") == "user"
  243. )
  244. result = mem.add(content, user_id=user_id, metadata=metadata, infer=False)
  245. print(f"[add verbatim] user={user_id} chars={len(content)} meta={metadata}")
  246. return SafeJSONResponse(content=result)
  247. # Normal path — LLM extraction
  248. if messages:
  249. result = mem.add(messages, user_id=user_id, metadata=metadata)
  250. else:
  251. result = mem.add(text, user_id=user_id, metadata=metadata)
  252. print(f"[add] user={user_id} infer=True meta={metadata}")
  253. return SafeJSONResponse(content=result)
  254. async def handle_search(req: Request, mem: Memory):
  255. data = await req.json()
  256. query = (data.get("query") or "").strip()
  257. user_id = extract_user_id(data)
  258. limit = int(data.get("limit", 5))
  259. if not query:
  260. return SafeJSONResponse(content={"results": []})
  261. fetch_k = max(limit * 3, 15)
  262. try:
  263. result = mem.search(query, user_id=user_id, limit=fetch_k)
  264. except Exception:
  265. all_res = mem.get_all(user_id=user_id)
  266. items = (
  267. all_res.get("results", [])
  268. if isinstance(all_res, dict)
  269. else (all_res if isinstance(all_res, list) else [])
  270. )
  271. q = query.lower()
  272. items = [r for r in items if q in r.get("memory", "").lower()]
  273. result = {"results": items}
  274. items = result.get("results", [])
  275. items = rerank_results(query, items, top_k=limit)
  276. print(f"[search] user={user_id} query={query!r} hits={len(items)}")
  277. return SafeJSONResponse(content={"results": items})
  278. async def handle_recent(req: Request, mem: Memory):
  279. data = await req.json()
  280. user_id = extract_user_id(data)
  281. if not user_id:
  282. return SafeJSONResponse(content={"error": "Missing userId"}, status_code=400)
  283. limit = int(data.get("limit", 5))
  284. try:
  285. results = mem.get_all(user_id=user_id)
  286. except Exception:
  287. results = mem.search(query="recent", user_id=user_id)
  288. items = results.get("results", [])
  289. items = sorted(items, key=lambda r: r.get("created_at", ""), reverse=True)
  290. return SafeJSONResponse(content={"results": items[:limit]})
  291. # =============================================================================
  292. # APP
  293. # =============================================================================
  294. app = FastAPI(title="mem0 server")
  295. @app.get("/health")
  296. async def health():
  297. return SafeJSONResponse(content={
  298. "status": "ok",
  299. "reranker_url": RERANKER_URL,
  300. "collections": {
  301. "conversational": "openclaw_mem",
  302. "knowledge": "knowledge_mem",
  303. },
  304. "prompts": {
  305. k: {pk: pv[:80] + "…" for pk, pv in pv_dict.items()}
  306. for k, pv_dict in PROMPTS.items()
  307. },
  308. })
  309. # ---------------------------------------------------------------------------
  310. # /memories — conversational, OpenClaw
  311. # ---------------------------------------------------------------------------
  312. @app.post("/memories")
  313. async def add_memory(req: Request):
  314. return await handle_add(req, memory_conv, verbatim_allowed=False)
  315. @app.post("/memories/search")
  316. async def search_memories(req: Request):
  317. return await handle_search(req, memory_conv)
  318. @app.post("/memories/recent")
  319. async def recent_memories(req: Request):
  320. return await handle_recent(req, memory_conv)
  321. @app.delete("/memories")
  322. async def delete_memory(req: Request):
  323. data = await req.json()
  324. return SafeJSONResponse(content=memory_conv.delete(data.get("filter", {})))
  325. # ---------------------------------------------------------------------------
  326. # /knowledge — objective facts, book-ingestor
  327. # ---------------------------------------------------------------------------
  328. @app.post("/knowledge")
  329. async def add_knowledge(req: Request):
  330. return await handle_add(req, memory_know, verbatim_allowed=True)
  331. @app.post("/knowledge/search")
  332. async def search_knowledge(req: Request):
  333. return await handle_search(req, memory_know)
  334. @app.post("/knowledge/recent")
  335. async def recent_knowledge(req: Request):
  336. return await handle_recent(req, memory_know)
  337. @app.delete("/knowledge")
  338. async def delete_knowledge(req: Request):
  339. data = await req.json()
  340. return SafeJSONResponse(content=memory_know.delete(data.get("filter", {})))
  341. # ---------------------------------------------------------------------------
  342. # /search — merged results from both collections (OpenClaw autorecall)
  343. # ---------------------------------------------------------------------------
  344. @app.post("/search")
  345. async def search_all(req: Request):
  346. """
  347. Query both collections and merge results.
  348. Results are tagged with _source: conversational | knowledge.
  349. Accepts same payload as /memories/search.
  350. """
  351. data = await req.json()
  352. query = (data.get("query") or "").strip()
  353. user_id = extract_user_id(data)
  354. limit = int(data.get("limit", 5))
  355. if not query:
  356. return SafeJSONResponse(content={"results": []})
  357. fetch_k = max(limit * 3, 15)
  358. def fetch(mem: Memory, tag: str):
  359. try:
  360. r = mem.search(query, user_id=user_id, limit=fetch_k)
  361. items = r.get("results", [])
  362. except Exception:
  363. items = []
  364. for item in items:
  365. item["_source"] = tag
  366. return items
  367. conv_items = fetch(memory_conv, "conversational")
  368. know_items = fetch(memory_know, "knowledge")
  369. merged = conv_items + know_items
  370. merged = rerank_results(query, merged, top_k=limit)
  371. print(
  372. f"[search/all] user={user_id} query={query!r} "
  373. f"conv={len(conv_items)} know={len(know_items)} merged={len(merged)}"
  374. )
  375. return SafeJSONResponse(content={"results": merged})
  376. @app.get("/dashboard")
  377. async def dashboard():
  378. return HTMLResponse(content=DASHBOARD_HTML)