routes.py 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. from fastapi import APIRouter, Request
  2. from fastapi.responses import HTMLResponse
  3. from mem0 import Memory
  4. from .config import COLLECTION_CONVERSATIONAL, COLLECTION_KNOWLEDGE, RERANKER_URL
  5. from .handlers import extract_user_id, handle_add, handle_recent, handle_search
  6. from .prompts import PROMPTS
  7. from .reranker import rerank_results
  8. from .responses import SafeJSONResponse
  9. from .storage import chroma_get_all, sqlite_delete_ids
  10. def build_router(memory_conv: Memory, memory_know: Memory) -> APIRouter:
  11. """Build and return all API routes bound to the provided memory instances."""
  12. router = APIRouter()
  13. with open("dashboard.html", "r", encoding="utf-8") as handle:
  14. dashboard_html = handle.read()
  15. @router.get("/dashboard", response_class=HTMLResponse, summary="Render dashboard", tags=["dashboard"])
  16. async def dashboard():
  17. """Serve the local dashboard HTML used for memory inspection and admin actions."""
  18. return HTMLResponse(content=dashboard_html)
  19. @router.get("/health", summary="Service health and prompt preview", tags=["health"])
  20. async def health():
  21. """Return runtime health plus prompt snippets for quick configuration verification."""
  22. return SafeJSONResponse(
  23. content={
  24. "status": "ok",
  25. "reranker_url": RERANKER_URL,
  26. "collections": {
  27. "conversational": COLLECTION_CONVERSATIONAL,
  28. "knowledge": COLLECTION_KNOWLEDGE,
  29. },
  30. "prompts": {
  31. key: {p_key: text[:80] + "…" for p_key, text in prompt_set.items()}
  32. for key, prompt_set in PROMPTS.items()
  33. },
  34. }
  35. )
  36. @router.post("/memories", summary="Add conversational memory", tags=["memories"])
  37. async def add_memory(req: Request):
  38. """Store conversational memory with LLM extraction and deduplication enabled."""
  39. return await handle_add(req, memory_conv, verbatim_allowed=False)
  40. @router.post("/memories/search", summary="Search conversational memory", tags=["memories"])
  41. async def search_memories(req: Request):
  42. """Search conversational memory and rerank candidates by relevance."""
  43. return await handle_search(req, memory_conv)
  44. @router.post("/memories/recent", summary="Recent conversational memory", tags=["memories"])
  45. async def recent_memories(req: Request):
  46. """Return newest conversational memories ordered by creation time."""
  47. return await handle_recent(req, memory_conv)
  48. @router.delete("/memories", summary="Delete conversational memory by filter", tags=["memories"])
  49. async def delete_memory(req: Request):
  50. """Delete conversational memories using a mem0 filter object from the request body."""
  51. data = await req.json()
  52. return SafeJSONResponse(content=memory_conv.delete(data.get("filter", {})))
  53. @router.post("/memories/all", summary="List all conversational memories", tags=["memories"])
  54. async def memories_all(req: Request):
  55. """Fetch full conversational history for a user by paging Chroma directly."""
  56. data = await req.json()
  57. user_id = extract_user_id(data) or "main"
  58. rows = chroma_get_all(
  59. memory_conv.vector_store.collection,
  60. user_id,
  61. include=["metadatas", "documents"],
  62. )
  63. items = []
  64. for row in rows:
  65. meta = row.get("metadata") or {}
  66. items.append(
  67. {
  68. "id": row["id"],
  69. "memory": row.get("document") or meta.get("data", ""),
  70. "created_at": meta.get("created_at"),
  71. "metadata": meta,
  72. "user_id": user_id,
  73. }
  74. )
  75. items.sort(key=lambda r: r.get("created_at") or "", reverse=True)
  76. print(f"[memories/all] user={user_id} total={len(items)}")
  77. return SafeJSONResponse(content={"results": items})
  78. @router.post("/knowledge", summary="Add knowledge chunk", tags=["knowledge"])
  79. async def add_knowledge(req: Request):
  80. """Store knowledge verbatim without LLM extraction (ingestor already summarizes)."""
  81. return await handle_add(req, memory_know, verbatim_allowed=True)
  82. @router.post("/knowledge/search", summary="Search knowledge base", tags=["knowledge"])
  83. async def search_knowledge(req: Request):
  84. """Search knowledge memories and rerank candidates."""
  85. return await handle_search(req, memory_know)
  86. @router.post("/knowledge/recent", summary="Recent knowledge entries", tags=["knowledge"])
  87. async def recent_knowledge(req: Request):
  88. """Return newest knowledge entries for a user."""
  89. return await handle_recent(req, memory_know)
  90. @router.delete("/knowledge", summary="Delete knowledge by filter", tags=["knowledge"])
  91. async def delete_knowledge(req: Request):
  92. """Delete knowledge entries using a mem0 filter object from the request body."""
  93. data = await req.json()
  94. return SafeJSONResponse(content=memory_know.delete(data.get("filter", {})))
  95. @router.post("/knowledge/sources", summary="Knowledge source counts", tags=["knowledge"])
  96. async def knowledge_sources(req: Request):
  97. """List distinct source_file values with counts, bypassing mem0 get_all caps."""
  98. data = await req.json()
  99. user_id = extract_user_id(data) or "knowledge_base"
  100. rows = chroma_get_all(memory_know.vector_store.collection, user_id)
  101. counts = {}
  102. for row in rows:
  103. src = (row.get("metadata") or {}).get("source_file", "(no source)")
  104. counts[src] = counts.get(src, 0) + 1
  105. sources = [{"source_file": k, "count": v} for k, v in sorted(counts.items(), key=lambda x: -x[1])]
  106. print(f"[sources] user={user_id} total={len(rows)} books={len(sources)}")
  107. return SafeJSONResponse(content={"sources": sources, "total": len(rows)})
  108. @router.delete("/knowledge/by-source", summary="Delete all entries for one source file", tags=["knowledge"])
  109. async def delete_knowledge_by_source(req: Request):
  110. """Delete all knowledge entries for a source_file from Chroma and SQLite."""
  111. data = await req.json()
  112. source_file = data.get("source_file")
  113. user_id = extract_user_id(data) or "knowledge_base"
  114. if not source_file:
  115. return SafeJSONResponse(content={"error": "Missing source_file"}, status_code=400)
  116. rows = chroma_get_all(memory_know.vector_store.collection, user_id)
  117. to_delete = [
  118. row["id"]
  119. for row in rows
  120. if (row.get("metadata") or {}).get("source_file") == source_file
  121. ]
  122. if not to_delete:
  123. return SafeJSONResponse(content={"deleted": 0, "message": "no entries found for that source"})
  124. try:
  125. memory_know.vector_store.collection.delete(ids=to_delete)
  126. except Exception as exc:
  127. return SafeJSONResponse(content={"error": f"chroma delete failed: {exc}"}, status_code=500)
  128. sqlite_deleted = sqlite_delete_ids(to_delete)
  129. print(f"[delete by-source] source={source_file} chroma={len(to_delete)} sqlite={sqlite_deleted}")
  130. return SafeJSONResponse(
  131. content={
  132. "deleted": len(to_delete),
  133. "sqlite_deleted": sqlite_deleted,
  134. "source_file": source_file,
  135. }
  136. )
  137. @router.delete("/memory/{memory_id}", summary="Delete one memory by ID", tags=["memories", "knowledge"])
  138. async def delete_single_memory(memory_id: str, req: Request):
  139. """Delete a single memory from selected collection and mirror deletion in SQLite."""
  140. data = await req.json()
  141. collection = data.get("collection", "knowledge")
  142. mem = memory_know if collection == "knowledge" else memory_conv
  143. try:
  144. mem.vector_store.collection.delete(ids=[memory_id])
  145. except Exception as exc:
  146. return SafeJSONResponse(content={"error": f"chroma delete failed: {exc}"}, status_code=500)
  147. sqlite_delete_ids([memory_id])
  148. print(f"[delete single] id={memory_id} collection={collection}")
  149. return SafeJSONResponse(content={"deleted": memory_id})
  150. @router.post("/search", summary="Search both collections", tags=["search"])
  151. async def search_all(req: Request):
  152. """Search conversational + knowledge collections, then rerank merged results."""
  153. data = await req.json()
  154. query = (data.get("query") or "").strip()
  155. user_id = extract_user_id(data)
  156. limit = int(data.get("limit", 5))
  157. if not query:
  158. return SafeJSONResponse(content={"results": []})
  159. fetch_k = max(limit * 3, 15)
  160. def fetch(mem: Memory, tag: str):
  161. try:
  162. response = mem.search(query, user_id=user_id, limit=fetch_k)
  163. items = response.get("results", [])
  164. except Exception:
  165. items = []
  166. for item in items:
  167. item["_source"] = tag
  168. return items
  169. conv_items = fetch(memory_conv, "conversational")
  170. know_items = fetch(memory_know, "knowledge")
  171. merged = rerank_results(query, conv_items + know_items, top_k=limit)
  172. print(
  173. f"[search/all] user={user_id} query={query!r} "
  174. f"conv={len(conv_items)} know={len(know_items)} merged={len(merged)}"
  175. )
  176. return SafeJSONResponse(content={"results": merged})
  177. return router