| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214 |
- from fastapi import APIRouter, Request
- from fastapi.responses import HTMLResponse
- from mem0 import Memory
- from .config import COLLECTION_CONVERSATIONAL, COLLECTION_KNOWLEDGE, RERANKER_URL
- from .handlers import extract_user_id, handle_add, handle_recent, handle_search
- from .prompts import PROMPTS
- from .reranker import rerank_results
- from .responses import SafeJSONResponse
- from .storage import chroma_get_all, sqlite_delete_ids
- def build_router(memory_conv: Memory, memory_know: Memory) -> APIRouter:
- """Build and return all API routes bound to the provided memory instances."""
- router = APIRouter()
- with open("dashboard.html", "r", encoding="utf-8") as handle:
- dashboard_html = handle.read()
- @router.get("/dashboard", response_class=HTMLResponse, summary="Render dashboard")
- async def dashboard():
- """Serve the local dashboard HTML used for memory inspection and admin actions."""
- return HTMLResponse(content=dashboard_html)
- @router.get("/health", summary="Service health and prompt preview")
- async def health():
- """Return runtime health plus prompt snippets for quick configuration verification."""
- return SafeJSONResponse(
- content={
- "status": "ok",
- "reranker_url": RERANKER_URL,
- "collections": {
- "conversational": COLLECTION_CONVERSATIONAL,
- "knowledge": COLLECTION_KNOWLEDGE,
- },
- "prompts": {
- key: {p_key: text[:80] + "…" for p_key, text in prompt_set.items()}
- for key, prompt_set in PROMPTS.items()
- },
- }
- )
- @router.post("/memories", summary="Add conversational memory")
- async def add_memory(req: Request):
- """Store conversational memory with LLM extraction and deduplication enabled."""
- return await handle_add(req, memory_conv, verbatim_allowed=False)
- @router.post("/memories/search", summary="Search conversational memory")
- async def search_memories(req: Request):
- """Search conversational memory and rerank candidates by relevance."""
- return await handle_search(req, memory_conv)
- @router.post("/memories/recent", summary="Recent conversational memory")
- async def recent_memories(req: Request):
- """Return newest conversational memories ordered by creation time."""
- return await handle_recent(req, memory_conv)
- @router.delete("/memories", summary="Delete conversational memory by filter")
- async def delete_memory(req: Request):
- """Delete conversational memories using a mem0 filter object from the request body."""
- data = await req.json()
- return SafeJSONResponse(content=memory_conv.delete(data.get("filter", {})))
- @router.post("/memories/all", summary="List all conversational memories")
- async def memories_all(req: Request):
- """Fetch full conversational history for a user by paging Chroma directly."""
- data = await req.json()
- user_id = extract_user_id(data) or "main"
- rows = chroma_get_all(
- memory_conv.vector_store.collection,
- user_id,
- include=["metadatas", "documents"],
- )
- items = []
- for row in rows:
- meta = row.get("metadata") or {}
- items.append(
- {
- "id": row["id"],
- "memory": row.get("document") or meta.get("data", ""),
- "created_at": meta.get("created_at"),
- "metadata": meta,
- "user_id": user_id,
- }
- )
- items.sort(key=lambda r: r.get("created_at") or "", reverse=True)
- print(f"[memories/all] user={user_id} total={len(items)}")
- return SafeJSONResponse(content={"results": items})
- @router.post("/knowledge", summary="Add knowledge chunk")
- async def add_knowledge(req: Request):
- """Store knowledge verbatim without LLM extraction (ingestor already summarizes)."""
- return await handle_add(req, memory_know, verbatim_allowed=True)
- @router.post("/knowledge/search", summary="Search knowledge base")
- async def search_knowledge(req: Request):
- """Search knowledge memories and rerank candidates."""
- return await handle_search(req, memory_know)
- @router.post("/knowledge/recent", summary="Recent knowledge entries")
- async def recent_knowledge(req: Request):
- """Return newest knowledge entries for a user."""
- return await handle_recent(req, memory_know)
- @router.delete("/knowledge", summary="Delete knowledge by filter")
- async def delete_knowledge(req: Request):
- """Delete knowledge entries using a mem0 filter object from the request body."""
- data = await req.json()
- return SafeJSONResponse(content=memory_know.delete(data.get("filter", {})))
- @router.post("/knowledge/sources", summary="Knowledge source counts")
- async def knowledge_sources(req: Request):
- """List distinct source_file values with counts, bypassing mem0 get_all caps."""
- data = await req.json()
- user_id = extract_user_id(data) or "knowledge_base"
- rows = chroma_get_all(memory_know.vector_store.collection, user_id)
- counts = {}
- for row in rows:
- src = (row.get("metadata") or {}).get("source_file", "(no source)")
- counts[src] = counts.get(src, 0) + 1
- sources = [{"source_file": k, "count": v} for k, v in sorted(counts.items(), key=lambda x: -x[1])]
- print(f"[sources] user={user_id} total={len(rows)} books={len(sources)}")
- return SafeJSONResponse(content={"sources": sources, "total": len(rows)})
- @router.delete("/knowledge/by-source", summary="Delete all entries for one source file")
- async def delete_knowledge_by_source(req: Request):
- """Delete all knowledge entries for a source_file from Chroma and SQLite."""
- data = await req.json()
- source_file = data.get("source_file")
- user_id = extract_user_id(data) or "knowledge_base"
- if not source_file:
- return SafeJSONResponse(content={"error": "Missing source_file"}, status_code=400)
- rows = chroma_get_all(memory_know.vector_store.collection, user_id)
- to_delete = [
- row["id"]
- for row in rows
- if (row.get("metadata") or {}).get("source_file") == source_file
- ]
- if not to_delete:
- return SafeJSONResponse(content={"deleted": 0, "message": "no entries found for that source"})
- try:
- memory_know.vector_store.collection.delete(ids=to_delete)
- except Exception as exc:
- return SafeJSONResponse(content={"error": f"chroma delete failed: {exc}"}, status_code=500)
- sqlite_deleted = sqlite_delete_ids(to_delete)
- print(f"[delete by-source] source={source_file} chroma={len(to_delete)} sqlite={sqlite_deleted}")
- return SafeJSONResponse(
- content={
- "deleted": len(to_delete),
- "sqlite_deleted": sqlite_deleted,
- "source_file": source_file,
- }
- )
- @router.delete("/memory/{memory_id}", summary="Delete one memory by ID")
- async def delete_single_memory(memory_id: str, req: Request):
- """Delete a single memory from selected collection and mirror deletion in SQLite."""
- data = await req.json()
- collection = data.get("collection", "knowledge")
- mem = memory_know if collection == "knowledge" else memory_conv
- try:
- mem.vector_store.collection.delete(ids=[memory_id])
- except Exception as exc:
- return SafeJSONResponse(content={"error": f"chroma delete failed: {exc}"}, status_code=500)
- sqlite_delete_ids([memory_id])
- print(f"[delete single] id={memory_id} collection={collection}")
- return SafeJSONResponse(content={"deleted": memory_id})
- @router.post("/search", summary="Search both collections")
- async def search_all(req: Request):
- """Search conversational + knowledge collections, then rerank merged results."""
- data = await req.json()
- query = (data.get("query") or "").strip()
- user_id = extract_user_id(data)
- limit = int(data.get("limit", 5))
- if not query:
- return SafeJSONResponse(content={"results": []})
- fetch_k = max(limit * 3, 15)
- def fetch(mem: Memory, tag: str):
- try:
- response = mem.search(query, user_id=user_id, limit=fetch_k)
- items = response.get("results", [])
- except Exception:
- items = []
- for item in items:
- item["_source"] = tag
- return items
- conv_items = fetch(memory_conv, "conversational")
- know_items = fetch(memory_know, "knowledge")
- merged = rerank_results(query, conv_items + know_items, top_k=limit)
- print(
- f"[search/all] user={user_id} query={query!r} "
- f"conv={len(conv_items)} know={len(know_items)} merged={len(merged)}"
- )
- return SafeJSONResponse(content={"results": merged})
- return router
|