Selaa lähdekoodia

Route compactor summaries to /memories/raw

Lukas Goldschmidt 1 kuukausi sitten
vanhempi
commit
19402cf907
3 muutettua tiedostoa jossa 206 lisäystä ja 107 poistoa
  1. 4 4
      PROJECT.md
  2. 14 5
      README.md
  3. 188 98
      compactor.py

+ 4 - 4
PROJECT.md

@@ -6,10 +6,10 @@ Compact noisy conversational memory stretches (especially debug sessions) into d
 ## Current implementation
 - HTTP client for custom mem0 server endpoints:
   - `POST /memories/all`
-  - `POST /memories`
+  - `POST /memories/raw`
   - `DELETE /memory/{id}`
 - temporal clustering (`gap-minutes`)
-- debug-arc heuristic and compact summary writer
+- LLM segment extraction + summary writer
 - dry-run reporting
 
 ## Operating mode
@@ -18,9 +18,9 @@ Compact noisy conversational memory stretches (especially debug sessions) into d
 - bounded deletions with `--max-deletes`
 
 ## Design notes
-- keep newest item in each compacted cluster as guardrail
 - write compacted summary before deleting old entries
-- annotate summaries with provenance metadata (`compacted_from_ids`)
+- annotate summaries with provenance metadata (`segment_source_ids`, extraction)
+- set `metadata.created_at` to the segment end timestamp so summary ordering stays intact
 
 ## Next steps
 1. Add rollback snapshot file before apply

+ 14 - 5
README.md

@@ -5,15 +5,15 @@ Temporal conversational memory compactor for your custom mem0-python-server.
 ## What it does
 - pulls conversational memories for one `user_id`
 - clusters by time windows
-- detects likely debug-session arcs
-- creates a compact summary memory for resolved arcs
-- preserves extracted facts (names/phones/emails) in summary + metadata
-- deletes older intermediate entries (safe budget + dry-run first)
+- extracts a compact summary + structured facts per segment (Groq)
+- writes summaries **verbatim** to `POST /memories/raw`
+- preserves extracted facts in metadata alongside the summary
+- injects `metadata.created_at` so summaries sort correctly without date prefixes
+- optionally deletes older intermediate entries (safe budget + dry-run first)
 
 ## Safety defaults
 - **dry-run by default** (no writes/deletes)
 - keeps recent memories (`--min-age-days`)
-- keeps the latest entry in each compacted cluster
 - delete cap per run (`--max-deletes`)
 
 ## Run
@@ -27,11 +27,20 @@ Apply changes:
 python3 compactor.py --user-id main --apply
 ```
 
+## Environment
+The compactor auto-loads `.env` from this directory (useful for cron). At minimum set:
+```env
+GROQ_API_KEY=your_key_here
+```
+
 ## Useful options
 - `--base-url http://192.168.0.200:8420`
 - `--gap-minutes 45`
 - `--min-age-days 2`
 - `--max-deletes 50`
+- `--segment-max-items 15`
+- `--max-summaries 1` (limit created summaries for testing)
+- `--skip-ephemeral` (skip obvious weather-like chatter)
 
 ## Next improvements
 - semantic clustering (embeddings) beyond time windows

+ 188 - 98
compactor.py

@@ -4,7 +4,7 @@ Conversational memory compactor for custom mem0-python-server.
 
 Why:
 - keep long-term conversational memory useful
-- collapse noisy debug sessions into resolved summaries
+- compact clusters into LLM summaries stored verbatim
 - preserve safety with dry-run-first workflow
 """
 
@@ -14,19 +14,38 @@ import argparse
 import dataclasses
 import datetime as dt
 import json
+import os
 import re
-from typing import Any, Dict, List, Tuple
+from typing import Any, Dict, List
 
 import requests
 
-DEBUG_HINTS = {
-    "error", "bug", "traceback", "exception", "fix", "retry", "failed",
-    "works", "resolved", "done", "restart", "timeout", "stack", "issue"
+
+def load_env_file(path: str) -> None:
+    """Load simple KEY=VALUE pairs from a .env file into os.environ.
+
+    This keeps cron usage predictable without adding a dependency on python-dotenv.
+    """
+    if not os.path.exists(path):
+        return
+    with open(path, "r", encoding="utf-8") as handle:
+        for line in handle:
+            line = line.strip()
+            if not line or line.startswith("#") or "=" not in line:
+                continue
+            key, value = line.split("=", 1)
+            key = key.strip()
+            value = value.strip().strip('"').strip("'")
+            if key and key not in os.environ:
+                os.environ[key] = value
+
+EPHEMERAL_HINTS = {
+    "weather", "forecast", "temperature", "rain", "raining", "expected to stop",
+    "wind", "humidity", "uv index", "clouds", "sunrise", "sunset",
 }
 
-PHONE_RE = re.compile(r"\b\+?[0-9][0-9\-\s]{4,}[0-9]\b")
-EMAIL_RE = re.compile(r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}\b")
-NAME_RE = re.compile(r"\b(?:name is|his name is|her name is)\s+([A-Z][a-z]+)\b", re.IGNORECASE)
+DEFAULT_ENV_PATH = os.path.join(os.path.dirname(__file__), ".env")
+
 
 
 @dataclasses.dataclass
@@ -73,7 +92,7 @@ class Mem0Client:
 
     def write_memory(self, user_id: str, text: str, metadata: Dict[str, Any]) -> Dict[str, Any]:
         r = requests.post(
-            f"{self.base_url}/memories",
+            f"{self.base_url}/memories/raw",
             json={"text": text, "userId": user_id, "metadata": metadata},
             timeout=self.timeout,
         )
@@ -95,10 +114,9 @@ def normalize(text: str) -> str:
     text = re.sub(r"\s+", " ", text)
     return text
 
-
-def is_debug_like(text: str) -> bool:
-    t = normalize(text)
-    return any(k in t for k in DEBUG_HINTS)
+def is_ephemeral_cluster(texts: List[str]) -> bool:
+    joined = normalize("\n".join(texts))
+    return any(hint in joined for hint in EPHEMERAL_HINTS)
 
 
 def cluster_by_time(memories: List[MemoryItem], gap_minutes: int) -> List[List[MemoryItem]]:
@@ -117,74 +135,86 @@ def cluster_by_time(memories: List[MemoryItem], gap_minutes: int) -> List[List[M
     return clusters
 
 
-def extract_facts(texts: List[str]) -> Dict[str, Any]:
-    phones, emails, names = set(), set(), set()
-    for t in texts:
-        for m in PHONE_RE.findall(t):
-            phones.add(re.sub(r"\s+", "", m))
-        for m in EMAIL_RE.findall(t):
-            emails.add(m)
-        for m in NAME_RE.findall(t):
-            names.add(m.strip().title())
-    return {
-        "phones": sorted(phones),
-        "emails": sorted(emails),
-        "names": sorted(names),
-    }
-
-
-def cluster_has_fact_signals(texts: List[str]) -> bool:
-    joined = "\n".join(texts)
-    return bool(PHONE_RE.search(joined) or EMAIL_RE.search(joined) or NAME_RE.search(joined))
-
-
-def summarize_cluster(cluster: List[MemoryItem]) -> Tuple[str, bool, Dict[str, Any]]:
-    texts = [c.text.strip() for c in cluster if c.text.strip()]
-    if not texts:
-        return "", False, {}
-
-    debug_ratio = sum(1 for t in texts if is_debug_like(t)) / max(len(texts), 1)
-    has_resolution = any(any(w in normalize(t) for w in ["resolved", "fixed", "works", "done"]) for t in texts)
-
-    if len(texts) < 4 or debug_ratio < 0.35 or not has_resolution:
-        return "", False, {}
+def format_segment(cluster: List[MemoryItem]) -> str:
+    lines = []
+    for item in cluster:
+        ts = item.created_at or "unknown"
+        text = item.text.strip().replace("\n", " ")
+        if text:
+            lines.append(f"[{ts}] {text}")
+    return "\n".join(lines)
+
+
+def split_cluster(cluster: List[MemoryItem], max_items: int) -> List[List[MemoryItem]]:
+    if max_items <= 0 or len(cluster) <= max_items:
+        return [cluster]
+    chunks = []
+    for i in range(0, len(cluster), max_items):
+        chunks.append(cluster[i:i + max_items])
+    return chunks
+def call_groq_extract(segment_text: str, model: str, timeout: int, base_url: str) -> Dict[str, Any]:
+    api_key = os.getenv("GROQ_API_KEY")
+    if not api_key:
+        raise RuntimeError("GROQ_API_KEY is not set in the environment.")
+
+    prompt = (
+        "You extract structured facts and a concise summary from a chat segment. "
+        "Return ONLY raw JSON (no code fences, no markdown) with keys: "
+        "facts, summary, segment_kind, resolution. "
+        "facts must include: people (list of {name, phone, email}), "
+        "projects (list of {name, url}), urls, paths, phones, emails, names. "
+        "Only include facts explicitly present in the segment. Do NOT infer or invent. "
+        "Never include generic 'user' as a person. Use null for unknown phone/email. "
+        "If no facts exist, return empty lists. "
+        "summary should be one or two sentences. "
+        "segment_kind: implementation|debug_arc|planning|deployment|misc. "
+        "resolution: resolved|open|unknown."
+    )
 
-    facts = extract_facts(texts)
-    requires_fact_preservation = cluster_has_fact_signals(texts)
-    has_extracted_facts = any(len(v) > 0 for v in facts.values())
+    payload = {
+        "model": model,
+        "messages": [
+            {"role": "system", "content": prompt},
+            {"role": "user", "content": segment_text},
+        ],
+        "temperature": 0.2,
+        "max_tokens": 600,
+    }
 
-    # guardrail: if cluster appears to contain facts but we couldn't preserve them, skip compaction
-    if requires_fact_preservation and not has_extracted_facts:
-        return "", False, {}
+    url = f"{base_url.rstrip('/')}/chat/completions"
+    r = requests.post(
+        url,
+        headers={"Authorization": f"Bearer {api_key}"},
+        json=payload,
+        timeout=timeout,
+    )
+    if r.status_code >= 400:
+        raise RuntimeError(f"Groq API error {r.status_code}: {r.text}")
 
-    head = texts[0][:280]
-    tail = texts[-1][:280]
+    data = r.json()
+    content = data["choices"][0]["message"]["content"].strip()
+    if content.startswith("```"):
+        content = re.sub(r"^```[a-zA-Z]*\n", "", content)
+        content = re.sub(r"```$", "", content).strip()
+    try:
+        return json.loads(content)
+    except json.JSONDecodeError:
+        return {"parse_error": True, "raw": content}
 
-    facts_line = ""
-    if has_extracted_facts:
-        facts_line = (
-            " Preserved facts: "
-            f"names={facts['names'] or []}, "
-            f"phones={facts['phones'] or []}, "
-            f"emails={facts['emails'] or []}."
-        )
 
-    summary = (
-        "[COMPACTED DEBUG ARC] "
-        f"Started with: {head} | Final state: {tail}. "
-        "Intermediate trial/error messages were compacted."
-        + facts_line
-    )
-    return summary, True, facts
+def is_compacted_memory(item: MemoryItem) -> bool:
+    kind = (item.metadata or {}).get("kind")
+    return kind in {"segment_summary", "debug_arc_summary"}
 
 
 def run(args: argparse.Namespace) -> None:
+    load_env_file(DEFAULT_ENV_PATH)
     client = Mem0Client(args.base_url, timeout=args.timeout)
     memories = client.all_memories(args.user_id)
 
     # keep very recent entries untouched
     cutoff = dt.datetime.now(dt.timezone.utc) - dt.timedelta(days=args.min_age_days)
-    candidates = [m for m in memories if m.created_dt < cutoff]
+    candidates = [m for m in memories if m.created_dt < cutoff and not is_compacted_memory(m)]
 
     clusters = cluster_by_time(candidates, args.gap_minutes)
 
@@ -193,59 +223,119 @@ def run(args: argparse.Namespace) -> None:
         "total_memories": len(memories),
         "candidates": len(candidates),
         "clusters": len(clusters),
+        "max_summaries": args.max_summaries,
         "actions": [],
     }
 
     delete_budget = args.max_deletes
+    created_count = 0
 
     for cluster in clusters:
-        summary, should_compact, facts = summarize_cluster(cluster)
-        if not should_compact:
+        texts = [c.text.strip() for c in cluster if c.text.strip()]
+        if not texts:
             continue
 
-        ids = [m.id for m in cluster if m.id]
-        if len(ids) < 2:
+        if len(texts) < args.segment_min_items:
             continue
-
-        to_delete = ids[:-1]  # keep latest raw entry as an extra guardrail
-        if len(to_delete) > delete_budget:
+        if args.skip_ephemeral and is_ephemeral_cluster(texts):
             continue
 
-        action = {
-            "type": "compact_debug_arc",
-            "cluster_size": len(cluster),
-            "delete_ids": to_delete,
-            "keep_id": ids[-1],
-            "summary_preview": summary[:240],
-            "preserved_facts": facts,
-        }
-        report["actions"].append(action)
-
-        if args.apply:
-            metadata = {
-                "compacted_at": dt.datetime.now(dt.timezone.utc).isoformat(),
-                "compactor_version": "0.2",
-                "compacted_from_ids": ids,
-                "kind": "debug_arc_summary",
-                "preserved_facts": facts,
+        for subcluster in split_cluster(cluster, args.segment_max_items):
+            if args.max_summaries and created_count >= args.max_summaries:
+                break
+
+            segment_text = format_segment(subcluster)
+            extraction = call_groq_extract(segment_text, args.model, args.timeout, args.groq_base_url)
+            facts = extraction.get("facts") if isinstance(extraction, dict) else None
+            summary = extraction.get("summary") if isinstance(extraction, dict) else ""
+            parse_error = bool(extraction.get("parse_error")) if isinstance(extraction, dict) else True
+
+            has_facts = bool(facts) and any(
+                facts.get(k) for k in ["people", "projects", "urls", "paths", "phones", "emails", "names"]
+            )
+            if not args.llm_report_all and not parse_error and not summary and not has_facts:
+                continue
+
+            ids = [m.id for m in subcluster if m.id]
+            segment_start = subcluster[0].created_at if subcluster else None
+            segment_end = subcluster[-1].created_at if subcluster else None
+            action = {
+                "type": "segment_extract",
+                "cluster_size": len(subcluster),
+                "segment_preview": segment_text[:240],
+                "extraction": extraction,
+                "source_ids": ids,
+                "segment_start": segment_start,
+                "segment_end": segment_end,
             }
-            client.write_memory(args.user_id, summary, metadata)
-            for mid in to_delete:
-                client.delete_memory(mid)
-            delete_budget -= len(to_delete)
+            report["actions"].append(action)
+
+            can_create = bool(summary)
+            if args.apply and not args.dry_run and summary and args.purge_source and len(ids) > delete_budget:
+                can_create = False
+            if can_create:
+                created_count += 1
+
+            if args.apply and not args.dry_run and summary:
+                if args.purge_source and len(ids) > delete_budget:
+                    continue
+
+                metadata = {
+                    "compacted_at": dt.datetime.now(dt.timezone.utc).isoformat(),
+                    "compactor_version": "0.4",
+                    "kind": "segment_summary",
+                    "segment_source_ids": ids,
+                    "segment_start": segment_start,
+                    "segment_end": segment_end,
+                    "created_at": segment_end or segment_start,
+                    "extraction": extraction,
+                    "model": args.model,
+                }
+                client.write_memory(args.user_id, summary, metadata)
+
+                if args.purge_source:
+                    for mid in ids:
+                        client.delete_memory(mid)
+                    delete_budget -= len(ids)
+
+        if args.max_summaries and created_count >= args.max_summaries:
+            break
 
     print(json.dumps(report, indent=2, ensure_ascii=False))
 
 
 def parse_args() -> argparse.Namespace:
-    p = argparse.ArgumentParser(description="Compacts conversational memories with temporal clustering.")
+    examples = """
+Examples:
+  python3 compactor.py --user-id main
+  python3 compactor.py --user-id main --apply
+  python3 compactor.py --user-id main --apply --max-summaries 1
+  python3 compactor.py --user-id main --segment-max-items 15 --skip-ephemeral
+"""
+    class HelpFormatter(argparse.RawDescriptionHelpFormatter, argparse.ArgumentDefaultsHelpFormatter):
+        pass
+
+    p = argparse.ArgumentParser(
+        description="Compacts conversational memories with temporal clustering.",
+        formatter_class=HelpFormatter,
+        epilog=examples.strip(),
+    )
     p.add_argument("--base-url", default="http://192.168.0.200:8420")
     p.add_argument("--user-id", required=True)
     p.add_argument("--apply", action="store_true", help="Apply changes. Default is dry-run.")
+    p.add_argument("--dry-run", action="store_true", help="Force dry-run even with --apply.")
     p.add_argument("--gap-minutes", type=int, default=45)
-    p.add_argument("--min-age-days", type=int, default=2)
+    p.add_argument("--min-age-days", type=int, default=7)
     p.add_argument("--max-deletes", type=int, default=50)
+    p.add_argument("--max-summaries", type=int, default=0, help="Limit the number of summaries created (0 = no limit).")
     p.add_argument("--timeout", type=int, default=20)
+    p.add_argument("--model", default="meta-llama/llama-4-scout-17b-16e-instruct")
+    p.add_argument("--segment-min-items", type=int, default=4)
+    p.add_argument("--segment-max-items", type=int, default=15, help="Split large clusters to reduce topic drift.")
+    p.add_argument("--skip-ephemeral", action="store_true", help="Skip obvious ephemeral weather-like segments.")
+    p.add_argument("--llm-report-all", action="store_true", help="Report all LLM extractions even if empty.")
+    p.add_argument("--purge-source", action="store_true", help="Delete source memories after writing a summary.")
+    p.add_argument("--groq-base-url", default="https://api.groq.com/openai/v1")
     return p.parse_args()