| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253 |
- #!/usr/bin/env python3
- """
- Conversational memory compactor for custom mem0-python-server.
- Why:
- - keep long-term conversational memory useful
- - collapse noisy debug sessions into resolved summaries
- - preserve safety with dry-run-first workflow
- """
- from __future__ import annotations
- import argparse
- import dataclasses
- import datetime as dt
- import json
- import re
- from typing import Any, Dict, List, Tuple
- import requests
- DEBUG_HINTS = {
- "error", "bug", "traceback", "exception", "fix", "retry", "failed",
- "works", "resolved", "done", "restart", "timeout", "stack", "issue"
- }
- PHONE_RE = re.compile(r"\b\+?[0-9][0-9\-\s]{4,}[0-9]\b")
- EMAIL_RE = re.compile(r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}\b")
- NAME_RE = re.compile(r"\b(?:name is|his name is|her name is)\s+([A-Z][a-z]+)\b", re.IGNORECASE)
- @dataclasses.dataclass
- class MemoryItem:
- id: str
- text: str
- created_at: str | None
- metadata: Dict[str, Any]
- @property
- def created_dt(self) -> dt.datetime:
- if not self.created_at:
- return dt.datetime.fromtimestamp(0, tz=dt.timezone.utc)
- try:
- return dt.datetime.fromisoformat(self.created_at.replace("Z", "+00:00"))
- except Exception:
- return dt.datetime.fromtimestamp(0, tz=dt.timezone.utc)
- class Mem0Client:
- def __init__(self, base_url: str, timeout: int = 15):
- self.base_url = base_url.rstrip("/")
- self.timeout = timeout
- def all_memories(self, user_id: str) -> List[MemoryItem]:
- r = requests.post(
- f"{self.base_url}/memories/all",
- json={"user_id": user_id},
- timeout=self.timeout,
- )
- r.raise_for_status()
- data = r.json().get("results", [])
- out = []
- for row in data:
- out.append(
- MemoryItem(
- id=row.get("id", ""),
- text=row.get("memory", ""),
- created_at=row.get("created_at"),
- metadata=row.get("metadata") or {},
- )
- )
- return out
- def write_memory(self, user_id: str, text: str, metadata: Dict[str, Any]) -> Dict[str, Any]:
- r = requests.post(
- f"{self.base_url}/memories",
- json={"text": text, "userId": user_id, "metadata": metadata},
- timeout=self.timeout,
- )
- r.raise_for_status()
- return r.json()
- def delete_memory(self, memory_id: str) -> Dict[str, Any]:
- r = requests.delete(
- f"{self.base_url}/memory/{memory_id}",
- json={"collection": "conversational"},
- timeout=self.timeout,
- )
- r.raise_for_status()
- return r.json() if r.content else {"ok": True}
- def normalize(text: str) -> str:
- text = text.lower().strip()
- text = re.sub(r"\s+", " ", text)
- return text
- def is_debug_like(text: str) -> bool:
- t = normalize(text)
- return any(k in t for k in DEBUG_HINTS)
- def cluster_by_time(memories: List[MemoryItem], gap_minutes: int) -> List[List[MemoryItem]]:
- if not memories:
- return []
- items = sorted(memories, key=lambda m: m.created_dt)
- clusters: List[List[MemoryItem]] = [[items[0]]]
- for item in items[1:]:
- prev = clusters[-1][-1]
- delta = (item.created_dt - prev.created_dt).total_seconds() / 60
- if delta <= gap_minutes:
- clusters[-1].append(item)
- else:
- clusters.append([item])
- return clusters
- def extract_facts(texts: List[str]) -> Dict[str, Any]:
- phones, emails, names = set(), set(), set()
- for t in texts:
- for m in PHONE_RE.findall(t):
- phones.add(re.sub(r"\s+", "", m))
- for m in EMAIL_RE.findall(t):
- emails.add(m)
- for m in NAME_RE.findall(t):
- names.add(m.strip().title())
- return {
- "phones": sorted(phones),
- "emails": sorted(emails),
- "names": sorted(names),
- }
- def cluster_has_fact_signals(texts: List[str]) -> bool:
- joined = "\n".join(texts)
- return bool(PHONE_RE.search(joined) or EMAIL_RE.search(joined) or NAME_RE.search(joined))
- def summarize_cluster(cluster: List[MemoryItem]) -> Tuple[str, bool, Dict[str, Any]]:
- texts = [c.text.strip() for c in cluster if c.text.strip()]
- if not texts:
- return "", False, {}
- debug_ratio = sum(1 for t in texts if is_debug_like(t)) / max(len(texts), 1)
- has_resolution = any(any(w in normalize(t) for w in ["resolved", "fixed", "works", "done"]) for t in texts)
- if len(texts) < 4 or debug_ratio < 0.35 or not has_resolution:
- return "", False, {}
- facts = extract_facts(texts)
- requires_fact_preservation = cluster_has_fact_signals(texts)
- has_extracted_facts = any(len(v) > 0 for v in facts.values())
- # guardrail: if cluster appears to contain facts but we couldn't preserve them, skip compaction
- if requires_fact_preservation and not has_extracted_facts:
- return "", False, {}
- head = texts[0][:280]
- tail = texts[-1][:280]
- facts_line = ""
- if has_extracted_facts:
- facts_line = (
- " Preserved facts: "
- f"names={facts['names'] or []}, "
- f"phones={facts['phones'] or []}, "
- f"emails={facts['emails'] or []}."
- )
- summary = (
- "[COMPACTED DEBUG ARC] "
- f"Started with: {head} | Final state: {tail}. "
- "Intermediate trial/error messages were compacted."
- + facts_line
- )
- return summary, True, facts
- def run(args: argparse.Namespace) -> None:
- client = Mem0Client(args.base_url, timeout=args.timeout)
- memories = client.all_memories(args.user_id)
- # keep very recent entries untouched
- cutoff = dt.datetime.now(dt.timezone.utc) - dt.timedelta(days=args.min_age_days)
- candidates = [m for m in memories if m.created_dt < cutoff]
- clusters = cluster_by_time(candidates, args.gap_minutes)
- report = {
- "user_id": args.user_id,
- "total_memories": len(memories),
- "candidates": len(candidates),
- "clusters": len(clusters),
- "actions": [],
- }
- delete_budget = args.max_deletes
- for cluster in clusters:
- summary, should_compact, facts = summarize_cluster(cluster)
- if not should_compact:
- continue
- ids = [m.id for m in cluster if m.id]
- if len(ids) < 2:
- continue
- to_delete = ids[:-1] # keep latest raw entry as an extra guardrail
- if len(to_delete) > delete_budget:
- continue
- action = {
- "type": "compact_debug_arc",
- "cluster_size": len(cluster),
- "delete_ids": to_delete,
- "keep_id": ids[-1],
- "summary_preview": summary[:240],
- "preserved_facts": facts,
- }
- report["actions"].append(action)
- if args.apply:
- metadata = {
- "compacted_at": dt.datetime.now(dt.timezone.utc).isoformat(),
- "compactor_version": "0.2",
- "compacted_from_ids": ids,
- "kind": "debug_arc_summary",
- "preserved_facts": facts,
- }
- client.write_memory(args.user_id, summary, metadata)
- for mid in to_delete:
- client.delete_memory(mid)
- delete_budget -= len(to_delete)
- print(json.dumps(report, indent=2, ensure_ascii=False))
- def parse_args() -> argparse.Namespace:
- p = argparse.ArgumentParser(description="Compacts conversational memories with temporal clustering.")
- p.add_argument("--base-url", default="http://192.168.0.200:8420")
- p.add_argument("--user-id", required=True)
- p.add_argument("--apply", action="store_true", help="Apply changes. Default is dry-run.")
- p.add_argument("--gap-minutes", type=int, default=45)
- p.add_argument("--min-age-days", type=int, default=2)
- p.add_argument("--max-deletes", type=int, default=50)
- p.add_argument("--timeout", type=int, default=20)
- return p.parse_args()
- if __name__ == "__main__":
- run(parse_args())
|