routes.py 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219
  1. from fastapi import APIRouter, Request
  2. from fastapi.responses import HTMLResponse
  3. from mem0 import Memory
  4. from .config import COLLECTION_CONVERSATIONAL, COLLECTION_KNOWLEDGE, RERANKER_URL
  5. from .handlers import extract_user_id, handle_add, handle_recent, handle_search
  6. from .prompts import PROMPTS
  7. from .reranker import rerank_results
  8. from .responses import SafeJSONResponse
  9. from .storage import chroma_get_all, sqlite_delete_ids
  10. def build_router(memory_conv: Memory, memory_know: Memory) -> APIRouter:
  11. """Build and return all API routes bound to the provided memory instances."""
  12. router = APIRouter()
  13. with open("dashboard.html", "r", encoding="utf-8") as handle:
  14. dashboard_html = handle.read()
  15. @router.get("/dashboard", response_class=HTMLResponse, summary="Render dashboard", tags=["dashboard"])
  16. async def dashboard():
  17. """Serve the local dashboard HTML used for memory inspection and admin actions."""
  18. return HTMLResponse(content=dashboard_html)
  19. @router.get("/health", summary="Service health and prompt preview", tags=["health"])
  20. async def health():
  21. """Return runtime health plus prompt snippets for quick configuration verification."""
  22. return SafeJSONResponse(
  23. content={
  24. "status": "ok",
  25. "reranker_url": RERANKER_URL,
  26. "collections": {
  27. "conversational": COLLECTION_CONVERSATIONAL,
  28. "knowledge": COLLECTION_KNOWLEDGE,
  29. },
  30. "prompts": {
  31. key: {p_key: text[:80] + "…" for p_key, text in prompt_set.items()}
  32. for key, prompt_set in PROMPTS.items()
  33. },
  34. }
  35. )
  36. @router.post("/memories", summary="Add conversational memory", tags=["memories"])
  37. async def add_memory(req: Request):
  38. """Store conversational memory with LLM extraction and deduplication enabled."""
  39. return await handle_add(req, memory_conv, verbatim_allowed=False)
  40. @router.post("/memories/raw", summary="Store raw conversational memory", tags=["memories"])
  41. async def add_raw_memory(req: Request):
  42. """Store processed or pre-summarized conversational memory without rerunning the mem0 LLM."""
  43. return await handle_add(req, memory_conv, verbatim_allowed=True)
  44. @router.post("/memories/search", summary="Search conversational memory", tags=["memories"])
  45. async def search_memories(req: Request):
  46. """Search conversational memory and rerank candidates by relevance."""
  47. return await handle_search(req, memory_conv)
  48. @router.post("/memories/recent", summary="Recent conversational memory", tags=["memories"])
  49. async def recent_memories(req: Request):
  50. """Return newest conversational memories ordered by creation time."""
  51. return await handle_recent(req, memory_conv)
  52. @router.delete("/memories", summary="Delete conversational memory by filter", tags=["memories"])
  53. async def delete_memory(req: Request):
  54. """Delete conversational memories using a mem0 filter object from the request body."""
  55. data = await req.json()
  56. return SafeJSONResponse(content=memory_conv.delete(data.get("filter", {})))
  57. @router.post("/memories/all", summary="List all conversational memories", tags=["memories"])
  58. async def memories_all(req: Request):
  59. """Fetch full conversational history for a user by paging Chroma directly."""
  60. data = await req.json()
  61. user_id = extract_user_id(data) or "main"
  62. rows = chroma_get_all(
  63. memory_conv.vector_store.collection,
  64. user_id,
  65. include=["metadatas", "documents"],
  66. )
  67. items = []
  68. for row in rows:
  69. meta = row.get("metadata") or {}
  70. items.append(
  71. {
  72. "id": row["id"],
  73. "memory": row.get("document") or meta.get("data", ""),
  74. "created_at": meta.get("created_at"),
  75. "metadata": meta,
  76. "user_id": user_id,
  77. }
  78. )
  79. items.sort(key=lambda r: r.get("created_at") or "", reverse=True)
  80. print(f"[memories/all] user={user_id} total={len(items)}")
  81. return SafeJSONResponse(content={"results": items})
  82. @router.post("/knowledge", summary="Add knowledge chunk", tags=["knowledge"])
  83. async def add_knowledge(req: Request):
  84. """Store knowledge verbatim without LLM extraction (ingestor already summarizes)."""
  85. return await handle_add(req, memory_know, verbatim_allowed=True)
  86. @router.post("/knowledge/search", summary="Search knowledge base", tags=["knowledge"])
  87. async def search_knowledge(req: Request):
  88. """Search knowledge memories and rerank candidates."""
  89. return await handle_search(req, memory_know)
  90. @router.post("/knowledge/recent", summary="Recent knowledge entries", tags=["knowledge"])
  91. async def recent_knowledge(req: Request):
  92. """Return newest knowledge entries for a user."""
  93. return await handle_recent(req, memory_know)
  94. @router.delete("/knowledge", summary="Delete knowledge by filter", tags=["knowledge"])
  95. async def delete_knowledge(req: Request):
  96. """Delete knowledge entries using a mem0 filter object from the request body."""
  97. data = await req.json()
  98. return SafeJSONResponse(content=memory_know.delete(data.get("filter", {})))
  99. @router.post("/knowledge/sources", summary="Knowledge source counts", tags=["knowledge"])
  100. async def knowledge_sources(req: Request):
  101. """List distinct source_file values with counts, bypassing mem0 get_all caps."""
  102. data = await req.json()
  103. user_id = extract_user_id(data) or "knowledge_base"
  104. rows = chroma_get_all(memory_know.vector_store.collection, user_id)
  105. counts = {}
  106. for row in rows:
  107. src = (row.get("metadata") or {}).get("source_file", "(no source)")
  108. counts[src] = counts.get(src, 0) + 1
  109. sources = [{"source_file": k, "count": v} for k, v in sorted(counts.items(), key=lambda x: -x[1])]
  110. print(f"[sources] user={user_id} total={len(rows)} books={len(sources)}")
  111. return SafeJSONResponse(content={"sources": sources, "total": len(rows)})
  112. @router.delete("/knowledge/by-source", summary="Delete all entries for one source file", tags=["knowledge"])
  113. async def delete_knowledge_by_source(req: Request):
  114. """Delete all knowledge entries for a source_file from Chroma and SQLite."""
  115. data = await req.json()
  116. source_file = data.get("source_file")
  117. user_id = extract_user_id(data) or "knowledge_base"
  118. if not source_file:
  119. return SafeJSONResponse(content={"error": "Missing source_file"}, status_code=400)
  120. rows = chroma_get_all(memory_know.vector_store.collection, user_id)
  121. to_delete = [
  122. row["id"]
  123. for row in rows
  124. if (row.get("metadata") or {}).get("source_file") == source_file
  125. ]
  126. if not to_delete:
  127. return SafeJSONResponse(content={"deleted": 0, "message": "no entries found for that source"})
  128. try:
  129. memory_know.vector_store.collection.delete(ids=to_delete)
  130. except Exception as exc:
  131. return SafeJSONResponse(content={"error": f"chroma delete failed: {exc}"}, status_code=500)
  132. sqlite_deleted = sqlite_delete_ids(to_delete)
  133. print(f"[delete by-source] source={source_file} chroma={len(to_delete)} sqlite={sqlite_deleted}")
  134. return SafeJSONResponse(
  135. content={
  136. "deleted": len(to_delete),
  137. "sqlite_deleted": sqlite_deleted,
  138. "source_file": source_file,
  139. }
  140. )
  141. @router.delete("/memory/{memory_id}", summary="Delete one memory by ID", tags=["memories", "knowledge"])
  142. async def delete_single_memory(memory_id: str, req: Request):
  143. """Delete a single memory from selected collection and mirror deletion in SQLite."""
  144. data = await req.json()
  145. collection = data.get("collection", "knowledge")
  146. mem = memory_know if collection == "knowledge" else memory_conv
  147. try:
  148. mem.vector_store.collection.delete(ids=[memory_id])
  149. except Exception as exc:
  150. return SafeJSONResponse(content={"error": f"chroma delete failed: {exc}"}, status_code=500)
  151. sqlite_delete_ids([memory_id])
  152. print(f"[delete single] id={memory_id} collection={collection}")
  153. return SafeJSONResponse(content={"deleted": memory_id})
  154. @router.post("/search", summary="Search both collections", tags=["search"])
  155. async def search_all(req: Request):
  156. """Search conversational + knowledge collections, then rerank merged results."""
  157. data = await req.json()
  158. query = (data.get("query") or "").strip()
  159. user_id = extract_user_id(data)
  160. limit = int(data.get("limit", 5))
  161. if not query:
  162. return SafeJSONResponse(content={"results": []})
  163. fetch_k = max(limit * 3, 15)
  164. def fetch(mem: Memory, tag: str):
  165. try:
  166. response = mem.search(query, user_id=user_id, limit=fetch_k)
  167. items = response.get("results", [])
  168. except Exception:
  169. items = []
  170. for item in items:
  171. item["_source"] = tag
  172. return items
  173. conv_items = fetch(memory_conv, "conversational")
  174. know_items = fetch(memory_know, "knowledge")
  175. merged = rerank_results(query, conv_items + know_items, top_k=limit)
  176. print(
  177. f"[search/all] user={user_id} query={query!r} "
  178. f"conv={len(conv_items)} know={len(know_items)} merged={len(merged)}"
  179. )
  180. return SafeJSONResponse(content={"results": merged})
  181. return router