routes.py 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224
  1. from fastapi import APIRouter, Request
  2. from fastapi.responses import HTMLResponse
  3. from mem0 import Memory
  4. from .config import COLLECTION_CONVERSATIONAL, COLLECTION_KNOWLEDGE, RERANKER_URL
  5. from .handlers import extract_user_id, handle_add, handle_recent, handle_search
  6. from .prompts import PROMPTS
  7. from .reranker import rerank_results
  8. from .responses import SafeJSONResponse
  9. from .storage import chroma_get_all, sqlite_delete_ids
  10. def build_router(memory_conv: Memory, memory_know: Memory) -> APIRouter:
  11. """Build and return all API routes bound to the provided memory instances."""
  12. router = APIRouter()
  13. with open("dashboard.html", "r", encoding="utf-8") as handle:
  14. dashboard_html = handle.read()
  15. @router.get("/dashboard", response_class=HTMLResponse, summary="Render dashboard", tags=["dashboard"])
  16. async def dashboard():
  17. """Serve the local dashboard HTML used for memory inspection and admin actions."""
  18. return HTMLResponse(content=dashboard_html)
  19. @router.get("/health", summary="Service health and prompt preview", tags=["health"])
  20. async def health():
  21. """Return runtime health plus prompt snippets for quick configuration verification."""
  22. return SafeJSONResponse(
  23. content={
  24. "status": "ok",
  25. "reranker_url": RERANKER_URL,
  26. "collections": {
  27. "conversational": COLLECTION_CONVERSATIONAL,
  28. "knowledge": COLLECTION_KNOWLEDGE,
  29. },
  30. "prompts": {
  31. key: {p_key: text[:80] + "…" for p_key, text in prompt_set.items()}
  32. for key, prompt_set in PROMPTS.items()
  33. },
  34. }
  35. )
  36. @router.post("/memories", summary="Add conversational memory", tags=["memories"])
  37. async def add_memory(req: Request):
  38. """Store conversational memory with LLM extraction and deduplication enabled."""
  39. return await handle_add(req, memory_conv, verbatim_allowed=False)
  40. @router.post("/memories/raw", summary="Store raw conversational memory", tags=["memories"])
  41. async def add_raw_memory(req: Request):
  42. """Store processed or pre-summarized conversational memory without rerunning the mem0 LLM."""
  43. return await handle_add(
  44. req,
  45. memory_conv,
  46. verbatim_allowed=True,
  47. allow_created_at_override=True,
  48. )
  49. @router.post("/memories/search", summary="Search conversational memory", tags=["memories"])
  50. async def search_memories(req: Request):
  51. """Search conversational memory and rerank candidates by relevance."""
  52. return await handle_search(req, memory_conv)
  53. @router.post("/memories/recent", summary="Recent conversational memory", tags=["memories"])
  54. async def recent_memories(req: Request):
  55. """Return newest conversational memories ordered by creation time."""
  56. return await handle_recent(req, memory_conv)
  57. @router.delete("/memories", summary="Delete conversational memory by filter", tags=["memories"])
  58. async def delete_memory(req: Request):
  59. """Delete conversational memories using a mem0 filter object from the request body."""
  60. data = await req.json()
  61. return SafeJSONResponse(content=memory_conv.delete(data.get("filter", {})))
  62. @router.post("/memories/all", summary="List all conversational memories", tags=["memories"])
  63. async def memories_all(req: Request):
  64. """Fetch full conversational history for a user by paging Chroma directly."""
  65. data = await req.json()
  66. user_id = extract_user_id(data) or "main"
  67. rows = chroma_get_all(
  68. memory_conv.vector_store.collection,
  69. user_id,
  70. include=["metadatas", "documents"],
  71. )
  72. items = []
  73. for row in rows:
  74. meta = row.get("metadata") or {}
  75. items.append(
  76. {
  77. "id": row["id"],
  78. "memory": row.get("document") or meta.get("data", ""),
  79. "created_at": meta.get("created_at"),
  80. "metadata": meta,
  81. "user_id": user_id,
  82. }
  83. )
  84. items.sort(key=lambda r: r.get("created_at") or "", reverse=True)
  85. print(f"[memories/all] user={user_id} total={len(items)}")
  86. return SafeJSONResponse(content={"results": items})
  87. @router.post("/knowledge", summary="Add knowledge chunk", tags=["knowledge"])
  88. async def add_knowledge(req: Request):
  89. """Store knowledge verbatim without LLM extraction (ingestor already summarizes)."""
  90. return await handle_add(req, memory_know, verbatim_allowed=True)
  91. @router.post("/knowledge/search", summary="Search knowledge base", tags=["knowledge"])
  92. async def search_knowledge(req: Request):
  93. """Search knowledge memories and rerank candidates."""
  94. return await handle_search(req, memory_know)
  95. @router.post("/knowledge/recent", summary="Recent knowledge entries", tags=["knowledge"])
  96. async def recent_knowledge(req: Request):
  97. """Return newest knowledge entries for a user."""
  98. return await handle_recent(req, memory_know)
  99. @router.delete("/knowledge", summary="Delete knowledge by filter", tags=["knowledge"])
  100. async def delete_knowledge(req: Request):
  101. """Delete knowledge entries using a mem0 filter object from the request body."""
  102. data = await req.json()
  103. return SafeJSONResponse(content=memory_know.delete(data.get("filter", {})))
  104. @router.post("/knowledge/sources", summary="Knowledge source counts", tags=["knowledge"])
  105. async def knowledge_sources(req: Request):
  106. """List distinct source_file values with counts, bypassing mem0 get_all caps."""
  107. data = await req.json()
  108. user_id = extract_user_id(data) or "knowledge_base"
  109. rows = chroma_get_all(memory_know.vector_store.collection, user_id)
  110. counts = {}
  111. for row in rows:
  112. src = (row.get("metadata") or {}).get("source_file", "(no source)")
  113. counts[src] = counts.get(src, 0) + 1
  114. sources = [{"source_file": k, "count": v} for k, v in sorted(counts.items(), key=lambda x: -x[1])]
  115. print(f"[sources] user={user_id} total={len(rows)} books={len(sources)}")
  116. return SafeJSONResponse(content={"sources": sources, "total": len(rows)})
  117. @router.delete("/knowledge/by-source", summary="Delete all entries for one source file", tags=["knowledge"])
  118. async def delete_knowledge_by_source(req: Request):
  119. """Delete all knowledge entries for a source_file from Chroma and SQLite."""
  120. data = await req.json()
  121. source_file = data.get("source_file")
  122. user_id = extract_user_id(data) or "knowledge_base"
  123. if not source_file:
  124. return SafeJSONResponse(content={"error": "Missing source_file"}, status_code=400)
  125. rows = chroma_get_all(memory_know.vector_store.collection, user_id)
  126. to_delete = [
  127. row["id"]
  128. for row in rows
  129. if (row.get("metadata") or {}).get("source_file") == source_file
  130. ]
  131. if not to_delete:
  132. return SafeJSONResponse(content={"deleted": 0, "message": "no entries found for that source"})
  133. try:
  134. memory_know.vector_store.collection.delete(ids=to_delete)
  135. except Exception as exc:
  136. return SafeJSONResponse(content={"error": f"chroma delete failed: {exc}"}, status_code=500)
  137. sqlite_deleted = sqlite_delete_ids(to_delete)
  138. print(f"[delete by-source] source={source_file} chroma={len(to_delete)} sqlite={sqlite_deleted}")
  139. return SafeJSONResponse(
  140. content={
  141. "deleted": len(to_delete),
  142. "sqlite_deleted": sqlite_deleted,
  143. "source_file": source_file,
  144. }
  145. )
  146. @router.delete("/memory/{memory_id}", summary="Delete one memory by ID", tags=["memories", "knowledge"])
  147. async def delete_single_memory(memory_id: str, req: Request):
  148. """Delete a single memory from selected collection and mirror deletion in SQLite."""
  149. data = await req.json()
  150. collection = data.get("collection", "knowledge")
  151. mem = memory_know if collection == "knowledge" else memory_conv
  152. try:
  153. mem.vector_store.collection.delete(ids=[memory_id])
  154. except Exception as exc:
  155. return SafeJSONResponse(content={"error": f"chroma delete failed: {exc}"}, status_code=500)
  156. sqlite_delete_ids([memory_id])
  157. print(f"[delete single] id={memory_id} collection={collection}")
  158. return SafeJSONResponse(content={"deleted": memory_id})
  159. @router.post("/search", summary="Search both collections", tags=["search"])
  160. async def search_all(req: Request):
  161. """Search conversational + knowledge collections, then rerank merged results."""
  162. data = await req.json()
  163. query = (data.get("query") or "").strip()
  164. user_id = extract_user_id(data)
  165. limit = int(data.get("limit", 5))
  166. if not query:
  167. return SafeJSONResponse(content={"results": []})
  168. fetch_k = max(limit * 3, 15)
  169. def fetch(mem: Memory, tag: str):
  170. try:
  171. response = mem.search(query, user_id=user_id, limit=fetch_k)
  172. items = response.get("results", [])
  173. except Exception:
  174. items = []
  175. for item in items:
  176. item["_source"] = tag
  177. return items
  178. conv_items = fetch(memory_conv, "conversational")
  179. know_items = fetch(memory_know, "knowledge")
  180. merged = rerank_results(query, conv_items + know_items, top_k=limit)
  181. print(
  182. f"[search/all] user={user_id} query={query!r} "
  183. f"conv={len(conv_items)} know={len(know_items)} merged={len(merged)}"
  184. )
  185. return SafeJSONResponse(content={"results": merged})
  186. return router