#!/usr/bin/env python3 """ Conversational memory compactor for custom mem0-python-server. Why: - keep long-term conversational memory useful - compact clusters into LLM summaries stored verbatim - preserve safety with dry-run-first workflow """ from __future__ import annotations import argparse import dataclasses import datetime as dt import json import os import re from typing import Any, Dict, List import requests def load_env_file(path: str) -> None: """Load simple KEY=VALUE pairs from a .env file into os.environ. This keeps cron usage predictable without adding a dependency on python-dotenv. """ if not os.path.exists(path): return with open(path, "r", encoding="utf-8") as handle: for line in handle: line = line.strip() if not line or line.startswith("#") or "=" not in line: continue key, value = line.split("=", 1) key = key.strip() value = value.strip().strip('"').strip("'") if key and key not in os.environ: os.environ[key] = value EPHEMERAL_HINTS = { "weather", "forecast", "temperature", "rain", "raining", "expected to stop", "wind", "humidity", "uv index", "clouds", "sunrise", "sunset", } DEFAULT_ENV_PATH = os.path.join(os.path.dirname(__file__), ".env") @dataclasses.dataclass class MemoryItem: id: str text: str created_at: str | None metadata: Dict[str, Any] @property def created_dt(self) -> dt.datetime: if not self.created_at: return dt.datetime.fromtimestamp(0, tz=dt.timezone.utc) try: return dt.datetime.fromisoformat(self.created_at.replace("Z", "+00:00")) except Exception: return dt.datetime.fromtimestamp(0, tz=dt.timezone.utc) class Mem0Client: def __init__(self, base_url: str, timeout: int = 15): self.base_url = base_url.rstrip("/") self.timeout = timeout def all_memories(self, user_id: str) -> List[MemoryItem]: r = requests.post( f"{self.base_url}/memories/all", json={"user_id": user_id}, timeout=self.timeout, ) r.raise_for_status() data = r.json().get("results", []) out = [] for row in data: out.append( MemoryItem( id=row.get("id", ""), text=row.get("memory", ""), created_at=row.get("created_at"), metadata=row.get("metadata") or {}, ) ) return out def write_memory(self, user_id: str, text: str, metadata: Dict[str, Any]) -> Dict[str, Any]: r = requests.post( f"{self.base_url}/memories/raw", json={"text": text, "userId": user_id, "metadata": metadata}, timeout=self.timeout, ) r.raise_for_status() return r.json() def delete_memory(self, memory_id: str) -> Dict[str, Any]: r = requests.delete( f"{self.base_url}/memory/{memory_id}", json={"collection": "conversational"}, timeout=self.timeout, ) r.raise_for_status() return r.json() if r.content else {"ok": True} def normalize(text: str) -> str: text = text.lower().strip() text = re.sub(r"\s+", " ", text) return text def is_ephemeral_cluster(texts: List[str]) -> bool: joined = normalize("\n".join(texts)) return any(hint in joined for hint in EPHEMERAL_HINTS) def cluster_by_time(memories: List[MemoryItem], gap_minutes: int) -> List[List[MemoryItem]]: if not memories: return [] items = sorted(memories, key=lambda m: m.created_dt) clusters: List[List[MemoryItem]] = [[items[0]]] for item in items[1:]: prev = clusters[-1][-1] delta = (item.created_dt - prev.created_dt).total_seconds() / 60 if delta <= gap_minutes: clusters[-1].append(item) else: clusters.append([item]) return clusters def format_segment(cluster: List[MemoryItem]) -> str: lines = [] for item in cluster: ts = item.created_at or "unknown" text = item.text.strip().replace("\n", " ") if text: lines.append(f"[{ts}] {text}") return "\n".join(lines) def split_cluster(cluster: List[MemoryItem], max_items: int) -> List[List[MemoryItem]]: if max_items <= 0 or len(cluster) <= max_items: return [cluster] chunks = [] for i in range(0, len(cluster), max_items): chunks.append(cluster[i:i + max_items]) return chunks def call_groq_extract(segment_text: str, model: str, timeout: int, base_url: str) -> Dict[str, Any]: api_key = os.getenv("GROQ_API_KEY") if not api_key: raise RuntimeError("GROQ_API_KEY is not set in the environment.") prompt = ( "You extract structured facts and a concise summary from a chat segment. " "Return ONLY raw JSON (no code fences, no markdown) with keys: " "facts, summary, segment_kind, resolution. " "facts must include: people (list of {name, phone, email}), " "projects (list of {name, url}), urls, paths, phones, emails, names. " "Only include facts explicitly present in the segment. Do NOT infer or invent. " "Never include generic 'user' as a person. Use null for unknown phone/email. " "If no facts exist, return empty lists. " "summary should be one or two sentences. " "segment_kind: implementation|debug_arc|planning|deployment|misc. " "resolution: resolved|open|unknown." ) payload = { "model": model, "messages": [ {"role": "system", "content": prompt}, {"role": "user", "content": segment_text}, ], "temperature": 0.2, "max_tokens": 600, } url = f"{base_url.rstrip('/')}/chat/completions" r = requests.post( url, headers={"Authorization": f"Bearer {api_key}"}, json=payload, timeout=timeout, ) if r.status_code >= 400: raise RuntimeError(f"Groq API error {r.status_code}: {r.text}") data = r.json() content = data["choices"][0]["message"]["content"].strip() if content.startswith("```"): content = re.sub(r"^```[a-zA-Z]*\n", "", content) content = re.sub(r"```$", "", content).strip() try: return json.loads(content) except json.JSONDecodeError: return {"parse_error": True, "raw": content} def is_compacted_memory(item: MemoryItem) -> bool: kind = (item.metadata or {}).get("kind") return kind in {"segment_summary", "debug_arc_summary"} def run(args: argparse.Namespace) -> None: load_env_file(DEFAULT_ENV_PATH) client = Mem0Client(args.base_url, timeout=args.timeout) memories = client.all_memories(args.user_id) # keep very recent entries untouched cutoff = dt.datetime.now(dt.timezone.utc) - dt.timedelta(days=args.min_age_days) candidates = [m for m in memories if m.created_dt < cutoff and not is_compacted_memory(m)] clusters = cluster_by_time(candidates, args.gap_minutes) report = { "user_id": args.user_id, "total_memories": len(memories), "candidates": len(candidates), "clusters": len(clusters), "max_summaries": args.max_summaries, "actions": [], } delete_budget = args.max_deletes created_count = 0 for cluster in clusters: texts = [c.text.strip() for c in cluster if c.text.strip()] if not texts: continue if len(texts) < args.segment_min_items: continue if args.skip_ephemeral and is_ephemeral_cluster(texts): continue for subcluster in split_cluster(cluster, args.segment_max_items): if args.max_summaries and created_count >= args.max_summaries: break segment_text = format_segment(subcluster) extraction = call_groq_extract(segment_text, args.model, args.timeout, args.groq_base_url) facts = extraction.get("facts") if isinstance(extraction, dict) else None summary = extraction.get("summary") if isinstance(extraction, dict) else "" parse_error = bool(extraction.get("parse_error")) if isinstance(extraction, dict) else True has_facts = bool(facts) and any( facts.get(k) for k in ["people", "projects", "urls", "paths", "phones", "emails", "names"] ) if not args.llm_report_all and not parse_error and not summary and not has_facts: continue ids = [m.id for m in subcluster if m.id] segment_start = subcluster[0].created_at if subcluster else None segment_end = subcluster[-1].created_at if subcluster else None action = { "type": "segment_extract", "cluster_size": len(subcluster), "segment_preview": segment_text[:240], "extraction": extraction, "source_ids": ids, "segment_start": segment_start, "segment_end": segment_end, } report["actions"].append(action) can_create = bool(summary) if args.apply and not args.dry_run and summary and args.purge_source and len(ids) > delete_budget: can_create = False if can_create: created_count += 1 if args.apply and not args.dry_run and summary: if args.purge_source and len(ids) > delete_budget: continue metadata = { "compacted_at": dt.datetime.now(dt.timezone.utc).isoformat(), "compactor_version": "0.4", "kind": "segment_summary", "segment_source_ids": ids, "segment_start": segment_start, "segment_end": segment_end, "created_at": segment_end or segment_start, "extraction": extraction, "model": args.model, } client.write_memory(args.user_id, summary, metadata) if args.purge_source: for mid in ids: client.delete_memory(mid) delete_budget -= len(ids) if args.max_summaries and created_count >= args.max_summaries: break print(json.dumps(report, indent=2, ensure_ascii=False)) def parse_args() -> argparse.Namespace: examples = """ Examples: python3 compactor.py --user-id main python3 compactor.py --user-id main --apply python3 compactor.py --user-id main --apply --max-summaries 1 python3 compactor.py --user-id main --segment-max-items 15 --skip-ephemeral """ class HelpFormatter(argparse.RawDescriptionHelpFormatter, argparse.ArgumentDefaultsHelpFormatter): pass p = argparse.ArgumentParser( description="Compacts conversational memories with temporal clustering.", formatter_class=HelpFormatter, epilog=examples.strip(), ) p.add_argument("--base-url", default="http://192.168.0.200:8420") p.add_argument("--user-id", required=True) p.add_argument("--apply", action="store_true", help="Apply changes. Default is dry-run.") p.add_argument("--dry-run", action="store_true", help="Force dry-run even with --apply.") p.add_argument("--gap-minutes", type=int, default=45) p.add_argument("--min-age-days", type=int, default=7) p.add_argument("--max-deletes", type=int, default=50) p.add_argument("--max-summaries", type=int, default=0, help="Limit the number of summaries created (0 = no limit).") p.add_argument("--timeout", type=int, default=20) p.add_argument("--model", default="meta-llama/llama-4-scout-17b-16e-instruct") p.add_argument("--segment-min-items", type=int, default=4) p.add_argument("--segment-max-items", type=int, default=15, help="Split large clusters to reduce topic drift.") p.add_argument("--skip-ephemeral", action="store_true", help="Skip obvious ephemeral weather-like segments.") p.add_argument("--llm-report-all", action="store_true", help="Report all LLM extractions even if empty.") p.add_argument("--purge-source", action="store_true", help="Delete source memories after writing a summary.") p.add_argument("--groq-base-url", default="https://api.groq.com/openai/v1") return p.parse_args() if __name__ == "__main__": run(parse_args())