Selaa lähdekoodia

Init memory compactor

Lukas Goldschmidt 1 kuukausi sitten
commit
e7e680afa5
4 muutettua tiedostoa jossa 324 lisäystä ja 0 poistoa
  1. 2 0
      .gitignore
  2. 29 0
      PROJECT.md
  3. 40 0
      README.md
  4. 253 0
      compactor.py

+ 2 - 0
.gitignore

@@ -0,0 +1,2 @@
+__pycache__/
+*.pyc

+ 29 - 0
PROJECT.md

@@ -0,0 +1,29 @@
+# PROJECT: memory-compactor
+
+## Aim
+Compact noisy conversational memory stretches (especially debug sessions) into durable resolved summaries while preserving recall quality.
+
+## Current implementation
+- HTTP client for custom mem0 server endpoints:
+  - `POST /memories/all`
+  - `POST /memories`
+  - `DELETE /memory/{id}`
+- temporal clustering (`gap-minutes`)
+- debug-arc heuristic and compact summary writer
+- dry-run reporting
+
+## Operating mode
+- default dry-run
+- `--apply` to mutate memory state
+- bounded deletions with `--max-deletes`
+
+## Design notes
+- keep newest item in each compacted cluster as guardrail
+- write compacted summary before deleting old entries
+- annotate summaries with provenance metadata (`compacted_from_ids`)
+
+## Next steps
+1. Add rollback snapshot file before apply
+2. Add explicit per-user allowlist
+3. Add cron integration helper (OpenClaw cron payload template)
+4. Add quality checks pre/post compaction

+ 40 - 0
README.md

@@ -0,0 +1,40 @@
+# memory-compactor
+
+Temporal conversational memory compactor for your custom mem0-python-server.
+
+## What it does
+- pulls conversational memories for one `user_id`
+- clusters by time windows
+- detects likely debug-session arcs
+- creates a compact summary memory for resolved arcs
+- preserves extracted facts (names/phones/emails) in summary + metadata
+- deletes older intermediate entries (safe budget + dry-run first)
+
+## Safety defaults
+- **dry-run by default** (no writes/deletes)
+- keeps recent memories (`--min-age-days`)
+- keeps the latest entry in each compacted cluster
+- delete cap per run (`--max-deletes`)
+
+## Run
+```bash
+cd /home/lucky/.openclaw/workspace/memory-compactor
+python3 compactor.py --user-id main
+```
+
+Apply changes:
+```bash
+python3 compactor.py --user-id main --apply
+```
+
+## Useful options
+- `--base-url http://192.168.0.200:8420`
+- `--gap-minutes 45`
+- `--min-age-days 2`
+- `--max-deletes 50`
+
+## Next improvements
+- semantic clustering (embeddings) beyond time windows
+- better resolution detection (state machine)
+- reversible snapshots before apply
+- cron wrapper + run history logging

+ 253 - 0
compactor.py

@@ -0,0 +1,253 @@
+#!/usr/bin/env python3
+"""
+Conversational memory compactor for custom mem0-python-server.
+
+Why:
+- keep long-term conversational memory useful
+- collapse noisy debug sessions into resolved summaries
+- preserve safety with dry-run-first workflow
+"""
+
+from __future__ import annotations
+
+import argparse
+import dataclasses
+import datetime as dt
+import json
+import re
+from typing import Any, Dict, List, Tuple
+
+import requests
+
+DEBUG_HINTS = {
+    "error", "bug", "traceback", "exception", "fix", "retry", "failed",
+    "works", "resolved", "done", "restart", "timeout", "stack", "issue"
+}
+
+PHONE_RE = re.compile(r"\b\+?[0-9][0-9\-\s]{4,}[0-9]\b")
+EMAIL_RE = re.compile(r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}\b")
+NAME_RE = re.compile(r"\b(?:name is|his name is|her name is)\s+([A-Z][a-z]+)\b", re.IGNORECASE)
+
+
+@dataclasses.dataclass
+class MemoryItem:
+    id: str
+    text: str
+    created_at: str | None
+    metadata: Dict[str, Any]
+
+    @property
+    def created_dt(self) -> dt.datetime:
+        if not self.created_at:
+            return dt.datetime.fromtimestamp(0, tz=dt.timezone.utc)
+        try:
+            return dt.datetime.fromisoformat(self.created_at.replace("Z", "+00:00"))
+        except Exception:
+            return dt.datetime.fromtimestamp(0, tz=dt.timezone.utc)
+
+
+class Mem0Client:
+    def __init__(self, base_url: str, timeout: int = 15):
+        self.base_url = base_url.rstrip("/")
+        self.timeout = timeout
+
+    def all_memories(self, user_id: str) -> List[MemoryItem]:
+        r = requests.post(
+            f"{self.base_url}/memories/all",
+            json={"user_id": user_id},
+            timeout=self.timeout,
+        )
+        r.raise_for_status()
+        data = r.json().get("results", [])
+        out = []
+        for row in data:
+            out.append(
+                MemoryItem(
+                    id=row.get("id", ""),
+                    text=row.get("memory", ""),
+                    created_at=row.get("created_at"),
+                    metadata=row.get("metadata") or {},
+                )
+            )
+        return out
+
+    def write_memory(self, user_id: str, text: str, metadata: Dict[str, Any]) -> Dict[str, Any]:
+        r = requests.post(
+            f"{self.base_url}/memories",
+            json={"text": text, "userId": user_id, "metadata": metadata},
+            timeout=self.timeout,
+        )
+        r.raise_for_status()
+        return r.json()
+
+    def delete_memory(self, memory_id: str) -> Dict[str, Any]:
+        r = requests.delete(
+            f"{self.base_url}/memory/{memory_id}",
+            json={"collection": "conversational"},
+            timeout=self.timeout,
+        )
+        r.raise_for_status()
+        return r.json() if r.content else {"ok": True}
+
+
+def normalize(text: str) -> str:
+    text = text.lower().strip()
+    text = re.sub(r"\s+", " ", text)
+    return text
+
+
+def is_debug_like(text: str) -> bool:
+    t = normalize(text)
+    return any(k in t for k in DEBUG_HINTS)
+
+
+def cluster_by_time(memories: List[MemoryItem], gap_minutes: int) -> List[List[MemoryItem]]:
+    if not memories:
+        return []
+    items = sorted(memories, key=lambda m: m.created_dt)
+    clusters: List[List[MemoryItem]] = [[items[0]]]
+
+    for item in items[1:]:
+        prev = clusters[-1][-1]
+        delta = (item.created_dt - prev.created_dt).total_seconds() / 60
+        if delta <= gap_minutes:
+            clusters[-1].append(item)
+        else:
+            clusters.append([item])
+    return clusters
+
+
+def extract_facts(texts: List[str]) -> Dict[str, Any]:
+    phones, emails, names = set(), set(), set()
+    for t in texts:
+        for m in PHONE_RE.findall(t):
+            phones.add(re.sub(r"\s+", "", m))
+        for m in EMAIL_RE.findall(t):
+            emails.add(m)
+        for m in NAME_RE.findall(t):
+            names.add(m.strip().title())
+    return {
+        "phones": sorted(phones),
+        "emails": sorted(emails),
+        "names": sorted(names),
+    }
+
+
+def cluster_has_fact_signals(texts: List[str]) -> bool:
+    joined = "\n".join(texts)
+    return bool(PHONE_RE.search(joined) or EMAIL_RE.search(joined) or NAME_RE.search(joined))
+
+
+def summarize_cluster(cluster: List[MemoryItem]) -> Tuple[str, bool, Dict[str, Any]]:
+    texts = [c.text.strip() for c in cluster if c.text.strip()]
+    if not texts:
+        return "", False, {}
+
+    debug_ratio = sum(1 for t in texts if is_debug_like(t)) / max(len(texts), 1)
+    has_resolution = any(any(w in normalize(t) for w in ["resolved", "fixed", "works", "done"]) for t in texts)
+
+    if len(texts) < 4 or debug_ratio < 0.35 or not has_resolution:
+        return "", False, {}
+
+    facts = extract_facts(texts)
+    requires_fact_preservation = cluster_has_fact_signals(texts)
+    has_extracted_facts = any(len(v) > 0 for v in facts.values())
+
+    # guardrail: if cluster appears to contain facts but we couldn't preserve them, skip compaction
+    if requires_fact_preservation and not has_extracted_facts:
+        return "", False, {}
+
+    head = texts[0][:280]
+    tail = texts[-1][:280]
+
+    facts_line = ""
+    if has_extracted_facts:
+        facts_line = (
+            " Preserved facts: "
+            f"names={facts['names'] or []}, "
+            f"phones={facts['phones'] or []}, "
+            f"emails={facts['emails'] or []}."
+        )
+
+    summary = (
+        "[COMPACTED DEBUG ARC] "
+        f"Started with: {head} | Final state: {tail}. "
+        "Intermediate trial/error messages were compacted."
+        + facts_line
+    )
+    return summary, True, facts
+
+
+def run(args: argparse.Namespace) -> None:
+    client = Mem0Client(args.base_url, timeout=args.timeout)
+    memories = client.all_memories(args.user_id)
+
+    # keep very recent entries untouched
+    cutoff = dt.datetime.now(dt.timezone.utc) - dt.timedelta(days=args.min_age_days)
+    candidates = [m for m in memories if m.created_dt < cutoff]
+
+    clusters = cluster_by_time(candidates, args.gap_minutes)
+
+    report = {
+        "user_id": args.user_id,
+        "total_memories": len(memories),
+        "candidates": len(candidates),
+        "clusters": len(clusters),
+        "actions": [],
+    }
+
+    delete_budget = args.max_deletes
+
+    for cluster in clusters:
+        summary, should_compact, facts = summarize_cluster(cluster)
+        if not should_compact:
+            continue
+
+        ids = [m.id for m in cluster if m.id]
+        if len(ids) < 2:
+            continue
+
+        to_delete = ids[:-1]  # keep latest raw entry as an extra guardrail
+        if len(to_delete) > delete_budget:
+            continue
+
+        action = {
+            "type": "compact_debug_arc",
+            "cluster_size": len(cluster),
+            "delete_ids": to_delete,
+            "keep_id": ids[-1],
+            "summary_preview": summary[:240],
+            "preserved_facts": facts,
+        }
+        report["actions"].append(action)
+
+        if args.apply:
+            metadata = {
+                "compacted_at": dt.datetime.now(dt.timezone.utc).isoformat(),
+                "compactor_version": "0.2",
+                "compacted_from_ids": ids,
+                "kind": "debug_arc_summary",
+                "preserved_facts": facts,
+            }
+            client.write_memory(args.user_id, summary, metadata)
+            for mid in to_delete:
+                client.delete_memory(mid)
+            delete_budget -= len(to_delete)
+
+    print(json.dumps(report, indent=2, ensure_ascii=False))
+
+
+def parse_args() -> argparse.Namespace:
+    p = argparse.ArgumentParser(description="Compacts conversational memories with temporal clustering.")
+    p.add_argument("--base-url", default="http://192.168.0.200:8420")
+    p.add_argument("--user-id", required=True)
+    p.add_argument("--apply", action="store_true", help="Apply changes. Default is dry-run.")
+    p.add_argument("--gap-minutes", type=int, default=45)
+    p.add_argument("--min-age-days", type=int, default=2)
+    p.add_argument("--max-deletes", type=int, default=50)
+    p.add_argument("--timeout", type=int, default=20)
+    return p.parse_args()
+
+
+if __name__ == "__main__":
+    run(parse_args())