#!/usr/bin/env python3 """ Conversational memory compactor for custom mem0-python-server. Why: - keep long-term conversational memory useful - compact clusters into LLM summaries stored verbatim - preserve safety with dry-run-first workflow """ from __future__ import annotations import argparse import dataclasses import datetime as dt import json import os import re from typing import Any, Dict, List import requests def load_env_file(path: str) -> None: """Load simple KEY=VALUE pairs from a .env file into os.environ. This keeps cron usage predictable without adding a dependency on python-dotenv. """ if not os.path.exists(path): return with open(path, "r", encoding="utf-8") as handle: for line in handle: line = line.strip() if not line or line.startswith("#") or "=" not in line: continue key, value = line.split("=", 1) key = key.strip() value = value.strip().strip('"').strip("'") if key and key not in os.environ: os.environ[key] = value EPHEMERAL_HINTS = { "weather", "forecast", "temperature", "rain", "raining", "expected to stop", "wind", "humidity", "uv index", "clouds", "sunrise", "sunset", } DEFAULT_ENV_PATH = os.path.join(os.path.dirname(__file__), ".env") @dataclasses.dataclass class MemoryItem: id: str text: str created_at: str | None metadata: Dict[str, Any] @property def created_dt(self) -> dt.datetime: if not self.created_at: return dt.datetime.fromtimestamp(0, tz=dt.timezone.utc) try: return dt.datetime.fromisoformat(self.created_at.replace("Z", "+00:00")) except Exception: return dt.datetime.fromtimestamp(0, tz=dt.timezone.utc) class Mem0Client: def __init__(self, base_url: str, timeout: int = 15): self.base_url = base_url.rstrip("/") self.timeout = timeout def all_memories(self, user_id: str) -> List[MemoryItem]: r = requests.post( f"{self.base_url}/memories/all", json={"user_id": user_id}, timeout=self.timeout, ) r.raise_for_status() data = r.json().get("results", []) out = [] for row in data: out.append( MemoryItem( id=row.get("id", ""), text=row.get("memory", ""), created_at=row.get("created_at"), metadata=row.get("metadata") or {}, ) ) return out def write_memory(self, user_id: str, text: str, metadata: Dict[str, Any]) -> Dict[str, Any]: r = requests.post( f"{self.base_url}/memories/raw", json={"text": text, "userId": user_id, "metadata": metadata}, timeout=self.timeout, ) r.raise_for_status() return r.json() def delete_memory(self, memory_id: str) -> Dict[str, Any]: r = requests.delete( f"{self.base_url}/memory/{memory_id}", json={"collection": "conversational"}, timeout=self.timeout, ) r.raise_for_status() return r.json() if r.content else {"ok": True} def normalize(text: str) -> str: text = text.lower().strip() text = re.sub(r"\s+", " ", text) return text def is_ephemeral_cluster(texts: List[str]) -> bool: joined = normalize("\n".join(texts)) return any(hint in joined for hint in EPHEMERAL_HINTS) def cluster_by_time(memories: List[MemoryItem], gap_minutes: int) -> List[List[MemoryItem]]: if not memories: return [] items = sorted(memories, key=lambda m: m.created_dt) clusters: List[List[MemoryItem]] = [[items[0]]] for item in items[1:]: prev = clusters[-1][-1] delta = (item.created_dt - prev.created_dt).total_seconds() / 60 if delta <= gap_minutes: clusters[-1].append(item) else: clusters.append([item]) return clusters def format_segment(cluster: List[MemoryItem]) -> str: lines = [] for item in cluster: ts = item.created_at or "unknown" text = item.text.strip().replace("\n", " ") if text: lines.append(f"[{ts}] {text}") return "\n".join(lines) def split_cluster(cluster: List[MemoryItem], max_items: int) -> List[List[MemoryItem]]: if max_items <= 0 or len(cluster) <= max_items: return [cluster] chunks = [] for i in range(0, len(cluster), max_items): chunks.append(cluster[i:i + max_items]) return chunks def call_groq_extract(segment_text: str, model: str, timeout: int, base_url: str) -> Dict[str, Any]: api_key = os.getenv("GROQ_API_KEY") if not api_key: raise RuntimeError("GROQ_API_KEY is not set in the environment.") prompt = ( "You extract structured facts and a detailed summary from a chat segment. " "Return ONLY raw JSON (no code fences, no markdown) with keys: " "facts, summary, segment_kind, resolution. " "facts must include: " "people (list of {name, phone, email}), projects (list of {name, url}), " "urls, paths, commands, packages, services, env_vars, ips, ports, hosts, " "phones, emails, names. " "Only include facts explicitly present in the segment. Do NOT infer or invent. " "Never include generic 'user' as a person. Use null for unknown phone/email. " "If no facts exist, return empty lists. " "summary should be concise (1–3 sentences max) and include all concrete details (IP addresses, ports, commands, files, URLs). " "Use 1 sentence for short segments, 2 for medium, 3 for long. Avoid redundancy. " "segment_kind: implementation|debug_arc|planning|deployment|misc. " "resolution: resolved|open|unknown." ) payload = { "model": model, "messages": [ {"role": "system", "content": prompt}, {"role": "user", "content": segment_text}, ], "temperature": 0.2, "max_tokens": 600, } url = f"{base_url.rstrip('/')}/chat/completions" r = requests.post( url, headers={"Authorization": f"Bearer {api_key}"}, json=payload, timeout=timeout, ) if r.status_code >= 400: raise RuntimeError(f"Groq API error {r.status_code}: {r.text}") data = r.json() content = data["choices"][0]["message"]["content"].strip() if content.startswith("```"): content = re.sub(r"^```[a-zA-Z]*\n", "", content) content = re.sub(r"```$", "", content).strip() try: return json.loads(content) except json.JSONDecodeError: return {"parse_error": True, "raw": content} def is_compacted_memory(item: MemoryItem) -> bool: kind = (item.metadata or {}).get("kind") return kind in {"segment_summary", "debug_arc_summary"} def _limit_items(items: List[Any], max_items: int = 5) -> List[Any]: if len(items) <= max_items: return items return items[:max_items] def _format_people(people: List[Dict[str, Any]]) -> List[str]: out = [] for person in people: name = (person or {}).get("name") phone = (person or {}).get("phone") email = (person or {}).get("email") bits = [b for b in [name, phone, email] if b] if bits: out.append("/".join(bits)) return out def _format_projects(projects: List[Dict[str, Any]]) -> List[str]: out = [] for proj in projects: name = (proj or {}).get("name") url = (proj or {}).get("url") bits = [b for b in [name, url] if b] if bits: out.append("/".join(bits)) return out def format_facts_inline(facts: Dict[str, Any]) -> str: if not isinstance(facts, dict): return "" parts = [] people = _format_people(facts.get("people") or []) projects = _format_projects(facts.get("projects") or []) fields = [ ("people", people), ("projects", projects), ("urls", facts.get("urls") or []), ("paths", facts.get("paths") or []), ("commands", facts.get("commands") or []), ("packages", facts.get("packages") or []), ("services", facts.get("services") or []), ("env_vars", facts.get("env_vars") or []), ("ips", facts.get("ips") or []), ("ports", facts.get("ports") or []), ("hosts", facts.get("hosts") or []), ("phones", facts.get("phones") or []), ("emails", facts.get("emails") or []), ("names", facts.get("names") or []), ] for key, value in fields: if not value: continue trimmed = _limit_items(value) parts.append(f"{key}={trimmed}") if not parts: return "" return "Facts: " + "; ".join(parts) def build_summary_metadata( *, segment_ids: List[str], segment_start: str | None, segment_end: str | None, extraction: Dict[str, Any], model: str, ) -> Dict[str, Any]: # Keep summaries sortable without embedding timestamps in the text itself. created_at = segment_end or segment_start return { "compacted_at": dt.datetime.now(dt.timezone.utc).isoformat(), "compactor_version": "0.6", "kind": "segment_summary", "segment_source_ids": segment_ids, "segment_start": segment_start, "segment_end": segment_end, "created_at": created_at, "extraction": extraction, "model": model, "source": "memory-compactor", "scope": "compacted", } def run(args: argparse.Namespace) -> None: load_env_file(DEFAULT_ENV_PATH) client = Mem0Client(args.base_url, timeout=args.timeout) memories = client.all_memories(args.user_id) # keep very recent entries untouched cutoff = dt.datetime.now(dt.timezone.utc) - dt.timedelta(days=args.min_age_days) candidates = [m for m in memories if m.created_dt < cutoff and not is_compacted_memory(m)] clusters = cluster_by_time(candidates, args.gap_minutes) report = { "user_id": args.user_id, "total_memories": len(memories), "candidates": len(candidates), "clusters": len(clusters), "max_summaries": args.max_summaries, "actions": [], } delete_budget = args.max_deletes created_count = 0 for cluster in clusters: texts = [c.text.strip() for c in cluster if c.text.strip()] if not texts: continue if len(texts) < args.segment_min_items: continue if args.skip_ephemeral and is_ephemeral_cluster(texts): continue for subcluster in split_cluster(cluster, args.segment_max_items): if args.max_summaries and created_count >= args.max_summaries: break segment_text = format_segment(subcluster) extraction = call_groq_extract(segment_text, args.model, args.timeout, args.groq_base_url) facts = extraction.get("facts") if isinstance(extraction, dict) else {} summary_raw = extraction.get("summary") if isinstance(extraction, dict) else "" parse_error = bool(extraction.get("parse_error")) if isinstance(extraction, dict) else True has_facts = bool(facts) and any( facts.get(k) for k in [ "people", "projects", "urls", "paths", "commands", "packages", "services", "env_vars", "ips", "ports", "hosts", "phones", "emails", "names", ] ) if not args.llm_report_all and not parse_error and not summary_raw and not has_facts: continue facts_inline = format_facts_inline(facts) summary_text = summary_raw if facts_inline: summary_text = f"{summary_raw} {facts_inline}".strip() if summary_raw else facts_inline ids = [m.id for m in subcluster if m.id] segment_start = subcluster[0].created_at if subcluster else None segment_end = subcluster[-1].created_at if subcluster else None action = { "type": "segment_extract", "cluster_size": len(subcluster), "segment_preview": segment_text[:240], "extraction": extraction, "source_ids": ids, "segment_start": segment_start, "segment_end": segment_end, } metadata = None if summary_text: metadata = build_summary_metadata( segment_ids=ids, segment_start=segment_start, segment_end=segment_end, extraction=extraction, model=args.model, ) action["summary_raw"] = summary_raw action["summary_text"] = summary_text action["summary_metadata"] = metadata action["write_payload"] = {"text": summary_text, "metadata": metadata} report["actions"].append(action) can_create = bool(summary_text) if args.apply and not args.dry_run and summary_text and args.purge_source and len(ids) > delete_budget: can_create = False if can_create: created_count += 1 if args.apply and not args.dry_run and summary_text: if args.purge_source and len(ids) > delete_budget: continue client.write_memory(args.user_id, summary_text, metadata or {}) if args.purge_source: for mid in ids: client.delete_memory(mid) delete_budget -= len(ids) if args.max_summaries and created_count >= args.max_summaries: break print(json.dumps(report, indent=2, ensure_ascii=False)) def parse_args() -> argparse.Namespace: examples = """ Examples: python3 compactor.py --user-id main python3 compactor.py --user-id main --apply python3 compactor.py --user-id main --apply --max-summaries 1 python3 compactor.py --user-id main --segment-max-items 15 --skip-ephemeral """ class HelpFormatter(argparse.RawDescriptionHelpFormatter, argparse.ArgumentDefaultsHelpFormatter): pass p = argparse.ArgumentParser( description="Compacts conversational memories with temporal clustering.", formatter_class=HelpFormatter, epilog=examples.strip(), ) p.add_argument("--base-url", default="http://192.168.0.200:8420") p.add_argument("--user-id", required=True) p.add_argument("--apply", action="store_true", help="Apply changes. Default is dry-run.") p.add_argument("--dry-run", action="store_true", help="Force dry-run even with --apply.") p.add_argument("--gap-minutes", type=int, default=45) p.add_argument("--min-age-days", type=int, default=7) p.add_argument("--max-deletes", type=int, default=50) p.add_argument("--max-summaries", type=int, default=0, help="Limit the number of summaries created (0 = no limit).") p.add_argument("--timeout", type=int, default=20) p.add_argument("--model", default="meta-llama/llama-4-scout-17b-16e-instruct") p.add_argument("--segment-min-items", type=int, default=4) p.add_argument("--segment-max-items", type=int, default=15, help="Split large clusters to reduce topic drift.") p.add_argument("--skip-ephemeral", action="store_true", help="Skip obvious ephemeral weather-like segments.") p.add_argument("--llm-report-all", action="store_true", help="Report all LLM extractions even if empty.") p.add_argument("--purge-source", action="store_true", help="Delete source memories after writing a summary.") p.add_argument("--groq-base-url", default="https://api.groq.com/openai/v1") return p.parse_args() if __name__ == "__main__": run(parse_args())