#!/usr/bin/env python3 """ Conversational memory compactor for custom mem0-python-server. Why: - keep long-term conversational memory useful - collapse noisy debug sessions into resolved summaries - preserve safety with dry-run-first workflow """ from __future__ import annotations import argparse import dataclasses import datetime as dt import json import re from typing import Any, Dict, List, Tuple import requests DEBUG_HINTS = { "error", "bug", "traceback", "exception", "fix", "retry", "failed", "works", "resolved", "done", "restart", "timeout", "stack", "issue" } PHONE_RE = re.compile(r"\b\+?[0-9][0-9\-\s]{4,}[0-9]\b") EMAIL_RE = re.compile(r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}\b") NAME_RE = re.compile(r"\b(?:name is|his name is|her name is)\s+([A-Z][a-z]+)\b", re.IGNORECASE) @dataclasses.dataclass class MemoryItem: id: str text: str created_at: str | None metadata: Dict[str, Any] @property def created_dt(self) -> dt.datetime: if not self.created_at: return dt.datetime.fromtimestamp(0, tz=dt.timezone.utc) try: return dt.datetime.fromisoformat(self.created_at.replace("Z", "+00:00")) except Exception: return dt.datetime.fromtimestamp(0, tz=dt.timezone.utc) class Mem0Client: def __init__(self, base_url: str, timeout: int = 15): self.base_url = base_url.rstrip("/") self.timeout = timeout def all_memories(self, user_id: str) -> List[MemoryItem]: r = requests.post( f"{self.base_url}/memories/all", json={"user_id": user_id}, timeout=self.timeout, ) r.raise_for_status() data = r.json().get("results", []) out = [] for row in data: out.append( MemoryItem( id=row.get("id", ""), text=row.get("memory", ""), created_at=row.get("created_at"), metadata=row.get("metadata") or {}, ) ) return out def write_memory(self, user_id: str, text: str, metadata: Dict[str, Any]) -> Dict[str, Any]: r = requests.post( f"{self.base_url}/memories", json={"text": text, "userId": user_id, "metadata": metadata}, timeout=self.timeout, ) r.raise_for_status() return r.json() def delete_memory(self, memory_id: str) -> Dict[str, Any]: r = requests.delete( f"{self.base_url}/memory/{memory_id}", json={"collection": "conversational"}, timeout=self.timeout, ) r.raise_for_status() return r.json() if r.content else {"ok": True} def normalize(text: str) -> str: text = text.lower().strip() text = re.sub(r"\s+", " ", text) return text def is_debug_like(text: str) -> bool: t = normalize(text) return any(k in t for k in DEBUG_HINTS) def cluster_by_time(memories: List[MemoryItem], gap_minutes: int) -> List[List[MemoryItem]]: if not memories: return [] items = sorted(memories, key=lambda m: m.created_dt) clusters: List[List[MemoryItem]] = [[items[0]]] for item in items[1:]: prev = clusters[-1][-1] delta = (item.created_dt - prev.created_dt).total_seconds() / 60 if delta <= gap_minutes: clusters[-1].append(item) else: clusters.append([item]) return clusters def extract_facts(texts: List[str]) -> Dict[str, Any]: phones, emails, names = set(), set(), set() for t in texts: for m in PHONE_RE.findall(t): phones.add(re.sub(r"\s+", "", m)) for m in EMAIL_RE.findall(t): emails.add(m) for m in NAME_RE.findall(t): names.add(m.strip().title()) return { "phones": sorted(phones), "emails": sorted(emails), "names": sorted(names), } def cluster_has_fact_signals(texts: List[str]) -> bool: joined = "\n".join(texts) return bool(PHONE_RE.search(joined) or EMAIL_RE.search(joined) or NAME_RE.search(joined)) def summarize_cluster(cluster: List[MemoryItem]) -> Tuple[str, bool, Dict[str, Any]]: texts = [c.text.strip() for c in cluster if c.text.strip()] if not texts: return "", False, {} debug_ratio = sum(1 for t in texts if is_debug_like(t)) / max(len(texts), 1) has_resolution = any(any(w in normalize(t) for w in ["resolved", "fixed", "works", "done"]) for t in texts) if len(texts) < 4 or debug_ratio < 0.35 or not has_resolution: return "", False, {} facts = extract_facts(texts) requires_fact_preservation = cluster_has_fact_signals(texts) has_extracted_facts = any(len(v) > 0 for v in facts.values()) # guardrail: if cluster appears to contain facts but we couldn't preserve them, skip compaction if requires_fact_preservation and not has_extracted_facts: return "", False, {} head = texts[0][:280] tail = texts[-1][:280] facts_line = "" if has_extracted_facts: facts_line = ( " Preserved facts: " f"names={facts['names'] or []}, " f"phones={facts['phones'] or []}, " f"emails={facts['emails'] or []}." ) summary = ( "[COMPACTED DEBUG ARC] " f"Started with: {head} | Final state: {tail}. " "Intermediate trial/error messages were compacted." + facts_line ) return summary, True, facts def run(args: argparse.Namespace) -> None: client = Mem0Client(args.base_url, timeout=args.timeout) memories = client.all_memories(args.user_id) # keep very recent entries untouched cutoff = dt.datetime.now(dt.timezone.utc) - dt.timedelta(days=args.min_age_days) candidates = [m for m in memories if m.created_dt < cutoff] clusters = cluster_by_time(candidates, args.gap_minutes) report = { "user_id": args.user_id, "total_memories": len(memories), "candidates": len(candidates), "clusters": len(clusters), "actions": [], } delete_budget = args.max_deletes for cluster in clusters: summary, should_compact, facts = summarize_cluster(cluster) if not should_compact: continue ids = [m.id for m in cluster if m.id] if len(ids) < 2: continue to_delete = ids[:-1] # keep latest raw entry as an extra guardrail if len(to_delete) > delete_budget: continue action = { "type": "compact_debug_arc", "cluster_size": len(cluster), "delete_ids": to_delete, "keep_id": ids[-1], "summary_preview": summary[:240], "preserved_facts": facts, } report["actions"].append(action) if args.apply: metadata = { "compacted_at": dt.datetime.now(dt.timezone.utc).isoformat(), "compactor_version": "0.2", "compacted_from_ids": ids, "kind": "debug_arc_summary", "preserved_facts": facts, } client.write_memory(args.user_id, summary, metadata) for mid in to_delete: client.delete_memory(mid) delete_budget -= len(to_delete) print(json.dumps(report, indent=2, ensure_ascii=False)) def parse_args() -> argparse.Namespace: p = argparse.ArgumentParser(description="Compacts conversational memories with temporal clustering.") p.add_argument("--base-url", default="http://192.168.0.200:8420") p.add_argument("--user-id", required=True) p.add_argument("--apply", action="store_true", help="Apply changes. Default is dry-run.") p.add_argument("--gap-minutes", type=int, default=45) p.add_argument("--min-age-days", type=int, default=2) p.add_argument("--max-deletes", type=int, default=50) p.add_argument("--timeout", type=int, default=20) return p.parse_args() if __name__ == "__main__": run(parse_args())