| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343 |
- #!/usr/bin/env python3
- """
- Conversational memory compactor for custom mem0-python-server.
- Why:
- - keep long-term conversational memory useful
- - compact clusters into LLM summaries stored verbatim
- - preserve safety with dry-run-first workflow
- """
- from __future__ import annotations
- import argparse
- import dataclasses
- import datetime as dt
- import json
- import os
- import re
- from typing import Any, Dict, List
- import requests
- def load_env_file(path: str) -> None:
- """Load simple KEY=VALUE pairs from a .env file into os.environ.
- This keeps cron usage predictable without adding a dependency on python-dotenv.
- """
- if not os.path.exists(path):
- return
- with open(path, "r", encoding="utf-8") as handle:
- for line in handle:
- line = line.strip()
- if not line or line.startswith("#") or "=" not in line:
- continue
- key, value = line.split("=", 1)
- key = key.strip()
- value = value.strip().strip('"').strip("'")
- if key and key not in os.environ:
- os.environ[key] = value
- EPHEMERAL_HINTS = {
- "weather", "forecast", "temperature", "rain", "raining", "expected to stop",
- "wind", "humidity", "uv index", "clouds", "sunrise", "sunset",
- }
- DEFAULT_ENV_PATH = os.path.join(os.path.dirname(__file__), ".env")
- @dataclasses.dataclass
- class MemoryItem:
- id: str
- text: str
- created_at: str | None
- metadata: Dict[str, Any]
- @property
- def created_dt(self) -> dt.datetime:
- if not self.created_at:
- return dt.datetime.fromtimestamp(0, tz=dt.timezone.utc)
- try:
- return dt.datetime.fromisoformat(self.created_at.replace("Z", "+00:00"))
- except Exception:
- return dt.datetime.fromtimestamp(0, tz=dt.timezone.utc)
- class Mem0Client:
- def __init__(self, base_url: str, timeout: int = 15):
- self.base_url = base_url.rstrip("/")
- self.timeout = timeout
- def all_memories(self, user_id: str) -> List[MemoryItem]:
- r = requests.post(
- f"{self.base_url}/memories/all",
- json={"user_id": user_id},
- timeout=self.timeout,
- )
- r.raise_for_status()
- data = r.json().get("results", [])
- out = []
- for row in data:
- out.append(
- MemoryItem(
- id=row.get("id", ""),
- text=row.get("memory", ""),
- created_at=row.get("created_at"),
- metadata=row.get("metadata") or {},
- )
- )
- return out
- def write_memory(self, user_id: str, text: str, metadata: Dict[str, Any]) -> Dict[str, Any]:
- r = requests.post(
- f"{self.base_url}/memories/raw",
- json={"text": text, "userId": user_id, "metadata": metadata},
- timeout=self.timeout,
- )
- r.raise_for_status()
- return r.json()
- def delete_memory(self, memory_id: str) -> Dict[str, Any]:
- r = requests.delete(
- f"{self.base_url}/memory/{memory_id}",
- json={"collection": "conversational"},
- timeout=self.timeout,
- )
- r.raise_for_status()
- return r.json() if r.content else {"ok": True}
- def normalize(text: str) -> str:
- text = text.lower().strip()
- text = re.sub(r"\s+", " ", text)
- return text
- def is_ephemeral_cluster(texts: List[str]) -> bool:
- joined = normalize("\n".join(texts))
- return any(hint in joined for hint in EPHEMERAL_HINTS)
- def cluster_by_time(memories: List[MemoryItem], gap_minutes: int) -> List[List[MemoryItem]]:
- if not memories:
- return []
- items = sorted(memories, key=lambda m: m.created_dt)
- clusters: List[List[MemoryItem]] = [[items[0]]]
- for item in items[1:]:
- prev = clusters[-1][-1]
- delta = (item.created_dt - prev.created_dt).total_seconds() / 60
- if delta <= gap_minutes:
- clusters[-1].append(item)
- else:
- clusters.append([item])
- return clusters
- def format_segment(cluster: List[MemoryItem]) -> str:
- lines = []
- for item in cluster:
- ts = item.created_at or "unknown"
- text = item.text.strip().replace("\n", " ")
- if text:
- lines.append(f"[{ts}] {text}")
- return "\n".join(lines)
- def split_cluster(cluster: List[MemoryItem], max_items: int) -> List[List[MemoryItem]]:
- if max_items <= 0 or len(cluster) <= max_items:
- return [cluster]
- chunks = []
- for i in range(0, len(cluster), max_items):
- chunks.append(cluster[i:i + max_items])
- return chunks
- def call_groq_extract(segment_text: str, model: str, timeout: int, base_url: str) -> Dict[str, Any]:
- api_key = os.getenv("GROQ_API_KEY")
- if not api_key:
- raise RuntimeError("GROQ_API_KEY is not set in the environment.")
- prompt = (
- "You extract structured facts and a concise summary from a chat segment. "
- "Return ONLY raw JSON (no code fences, no markdown) with keys: "
- "facts, summary, segment_kind, resolution. "
- "facts must include: people (list of {name, phone, email}), "
- "projects (list of {name, url}), urls, paths, phones, emails, names. "
- "Only include facts explicitly present in the segment. Do NOT infer or invent. "
- "Never include generic 'user' as a person. Use null for unknown phone/email. "
- "If no facts exist, return empty lists. "
- "summary should be one or two sentences. "
- "segment_kind: implementation|debug_arc|planning|deployment|misc. "
- "resolution: resolved|open|unknown."
- )
- payload = {
- "model": model,
- "messages": [
- {"role": "system", "content": prompt},
- {"role": "user", "content": segment_text},
- ],
- "temperature": 0.2,
- "max_tokens": 600,
- }
- url = f"{base_url.rstrip('/')}/chat/completions"
- r = requests.post(
- url,
- headers={"Authorization": f"Bearer {api_key}"},
- json=payload,
- timeout=timeout,
- )
- if r.status_code >= 400:
- raise RuntimeError(f"Groq API error {r.status_code}: {r.text}")
- data = r.json()
- content = data["choices"][0]["message"]["content"].strip()
- if content.startswith("```"):
- content = re.sub(r"^```[a-zA-Z]*\n", "", content)
- content = re.sub(r"```$", "", content).strip()
- try:
- return json.loads(content)
- except json.JSONDecodeError:
- return {"parse_error": True, "raw": content}
- def is_compacted_memory(item: MemoryItem) -> bool:
- kind = (item.metadata or {}).get("kind")
- return kind in {"segment_summary", "debug_arc_summary"}
- def run(args: argparse.Namespace) -> None:
- load_env_file(DEFAULT_ENV_PATH)
- client = Mem0Client(args.base_url, timeout=args.timeout)
- memories = client.all_memories(args.user_id)
- # keep very recent entries untouched
- cutoff = dt.datetime.now(dt.timezone.utc) - dt.timedelta(days=args.min_age_days)
- candidates = [m for m in memories if m.created_dt < cutoff and not is_compacted_memory(m)]
- clusters = cluster_by_time(candidates, args.gap_minutes)
- report = {
- "user_id": args.user_id,
- "total_memories": len(memories),
- "candidates": len(candidates),
- "clusters": len(clusters),
- "max_summaries": args.max_summaries,
- "actions": [],
- }
- delete_budget = args.max_deletes
- created_count = 0
- for cluster in clusters:
- texts = [c.text.strip() for c in cluster if c.text.strip()]
- if not texts:
- continue
- if len(texts) < args.segment_min_items:
- continue
- if args.skip_ephemeral and is_ephemeral_cluster(texts):
- continue
- for subcluster in split_cluster(cluster, args.segment_max_items):
- if args.max_summaries and created_count >= args.max_summaries:
- break
- segment_text = format_segment(subcluster)
- extraction = call_groq_extract(segment_text, args.model, args.timeout, args.groq_base_url)
- facts = extraction.get("facts") if isinstance(extraction, dict) else None
- summary = extraction.get("summary") if isinstance(extraction, dict) else ""
- parse_error = bool(extraction.get("parse_error")) if isinstance(extraction, dict) else True
- has_facts = bool(facts) and any(
- facts.get(k) for k in ["people", "projects", "urls", "paths", "phones", "emails", "names"]
- )
- if not args.llm_report_all and not parse_error and not summary and not has_facts:
- continue
- ids = [m.id for m in subcluster if m.id]
- segment_start = subcluster[0].created_at if subcluster else None
- segment_end = subcluster[-1].created_at if subcluster else None
- action = {
- "type": "segment_extract",
- "cluster_size": len(subcluster),
- "segment_preview": segment_text[:240],
- "extraction": extraction,
- "source_ids": ids,
- "segment_start": segment_start,
- "segment_end": segment_end,
- }
- report["actions"].append(action)
- can_create = bool(summary)
- if args.apply and not args.dry_run and summary and args.purge_source and len(ids) > delete_budget:
- can_create = False
- if can_create:
- created_count += 1
- if args.apply and not args.dry_run and summary:
- if args.purge_source and len(ids) > delete_budget:
- continue
- metadata = {
- "compacted_at": dt.datetime.now(dt.timezone.utc).isoformat(),
- "compactor_version": "0.4",
- "kind": "segment_summary",
- "segment_source_ids": ids,
- "segment_start": segment_start,
- "segment_end": segment_end,
- "created_at": segment_end or segment_start,
- "extraction": extraction,
- "model": args.model,
- }
- client.write_memory(args.user_id, summary, metadata)
- if args.purge_source:
- for mid in ids:
- client.delete_memory(mid)
- delete_budget -= len(ids)
- if args.max_summaries and created_count >= args.max_summaries:
- break
- print(json.dumps(report, indent=2, ensure_ascii=False))
- def parse_args() -> argparse.Namespace:
- examples = """
- Examples:
- python3 compactor.py --user-id main
- python3 compactor.py --user-id main --apply
- python3 compactor.py --user-id main --apply --max-summaries 1
- python3 compactor.py --user-id main --segment-max-items 15 --skip-ephemeral
- """
- class HelpFormatter(argparse.RawDescriptionHelpFormatter, argparse.ArgumentDefaultsHelpFormatter):
- pass
- p = argparse.ArgumentParser(
- description="Compacts conversational memories with temporal clustering.",
- formatter_class=HelpFormatter,
- epilog=examples.strip(),
- )
- p.add_argument("--base-url", default="http://192.168.0.200:8420")
- p.add_argument("--user-id", required=True)
- p.add_argument("--apply", action="store_true", help="Apply changes. Default is dry-run.")
- p.add_argument("--dry-run", action="store_true", help="Force dry-run even with --apply.")
- p.add_argument("--gap-minutes", type=int, default=45)
- p.add_argument("--min-age-days", type=int, default=7)
- p.add_argument("--max-deletes", type=int, default=50)
- p.add_argument("--max-summaries", type=int, default=0, help="Limit the number of summaries created (0 = no limit).")
- p.add_argument("--timeout", type=int, default=20)
- p.add_argument("--model", default="meta-llama/llama-4-scout-17b-16e-instruct")
- p.add_argument("--segment-min-items", type=int, default=4)
- p.add_argument("--segment-max-items", type=int, default=15, help="Split large clusters to reduce topic drift.")
- p.add_argument("--skip-ephemeral", action="store_true", help="Skip obvious ephemeral weather-like segments.")
- p.add_argument("--llm-report-all", action="store_true", help="Report all LLM extractions even if empty.")
- p.add_argument("--purge-source", action="store_true", help="Delete source memories after writing a summary.")
- p.add_argument("--groq-base-url", default="https://api.groq.com/openai/v1")
- return p.parse_args()
- if __name__ == "__main__":
- run(parse_args())
|