compactor.py 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253
  1. #!/usr/bin/env python3
  2. """
  3. Conversational memory compactor for custom mem0-python-server.
  4. Why:
  5. - keep long-term conversational memory useful
  6. - collapse noisy debug sessions into resolved summaries
  7. - preserve safety with dry-run-first workflow
  8. """
  9. from __future__ import annotations
  10. import argparse
  11. import dataclasses
  12. import datetime as dt
  13. import json
  14. import re
  15. from typing import Any, Dict, List, Tuple
  16. import requests
  17. DEBUG_HINTS = {
  18. "error", "bug", "traceback", "exception", "fix", "retry", "failed",
  19. "works", "resolved", "done", "restart", "timeout", "stack", "issue"
  20. }
  21. PHONE_RE = re.compile(r"\b\+?[0-9][0-9\-\s]{4,}[0-9]\b")
  22. EMAIL_RE = re.compile(r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}\b")
  23. NAME_RE = re.compile(r"\b(?:name is|his name is|her name is)\s+([A-Z][a-z]+)\b", re.IGNORECASE)
  24. @dataclasses.dataclass
  25. class MemoryItem:
  26. id: str
  27. text: str
  28. created_at: str | None
  29. metadata: Dict[str, Any]
  30. @property
  31. def created_dt(self) -> dt.datetime:
  32. if not self.created_at:
  33. return dt.datetime.fromtimestamp(0, tz=dt.timezone.utc)
  34. try:
  35. return dt.datetime.fromisoformat(self.created_at.replace("Z", "+00:00"))
  36. except Exception:
  37. return dt.datetime.fromtimestamp(0, tz=dt.timezone.utc)
  38. class Mem0Client:
  39. def __init__(self, base_url: str, timeout: int = 15):
  40. self.base_url = base_url.rstrip("/")
  41. self.timeout = timeout
  42. def all_memories(self, user_id: str) -> List[MemoryItem]:
  43. r = requests.post(
  44. f"{self.base_url}/memories/all",
  45. json={"user_id": user_id},
  46. timeout=self.timeout,
  47. )
  48. r.raise_for_status()
  49. data = r.json().get("results", [])
  50. out = []
  51. for row in data:
  52. out.append(
  53. MemoryItem(
  54. id=row.get("id", ""),
  55. text=row.get("memory", ""),
  56. created_at=row.get("created_at"),
  57. metadata=row.get("metadata") or {},
  58. )
  59. )
  60. return out
  61. def write_memory(self, user_id: str, text: str, metadata: Dict[str, Any]) -> Dict[str, Any]:
  62. r = requests.post(
  63. f"{self.base_url}/memories",
  64. json={"text": text, "userId": user_id, "metadata": metadata},
  65. timeout=self.timeout,
  66. )
  67. r.raise_for_status()
  68. return r.json()
  69. def delete_memory(self, memory_id: str) -> Dict[str, Any]:
  70. r = requests.delete(
  71. f"{self.base_url}/memory/{memory_id}",
  72. json={"collection": "conversational"},
  73. timeout=self.timeout,
  74. )
  75. r.raise_for_status()
  76. return r.json() if r.content else {"ok": True}
  77. def normalize(text: str) -> str:
  78. text = text.lower().strip()
  79. text = re.sub(r"\s+", " ", text)
  80. return text
  81. def is_debug_like(text: str) -> bool:
  82. t = normalize(text)
  83. return any(k in t for k in DEBUG_HINTS)
  84. def cluster_by_time(memories: List[MemoryItem], gap_minutes: int) -> List[List[MemoryItem]]:
  85. if not memories:
  86. return []
  87. items = sorted(memories, key=lambda m: m.created_dt)
  88. clusters: List[List[MemoryItem]] = [[items[0]]]
  89. for item in items[1:]:
  90. prev = clusters[-1][-1]
  91. delta = (item.created_dt - prev.created_dt).total_seconds() / 60
  92. if delta <= gap_minutes:
  93. clusters[-1].append(item)
  94. else:
  95. clusters.append([item])
  96. return clusters
  97. def extract_facts(texts: List[str]) -> Dict[str, Any]:
  98. phones, emails, names = set(), set(), set()
  99. for t in texts:
  100. for m in PHONE_RE.findall(t):
  101. phones.add(re.sub(r"\s+", "", m))
  102. for m in EMAIL_RE.findall(t):
  103. emails.add(m)
  104. for m in NAME_RE.findall(t):
  105. names.add(m.strip().title())
  106. return {
  107. "phones": sorted(phones),
  108. "emails": sorted(emails),
  109. "names": sorted(names),
  110. }
  111. def cluster_has_fact_signals(texts: List[str]) -> bool:
  112. joined = "\n".join(texts)
  113. return bool(PHONE_RE.search(joined) or EMAIL_RE.search(joined) or NAME_RE.search(joined))
  114. def summarize_cluster(cluster: List[MemoryItem]) -> Tuple[str, bool, Dict[str, Any]]:
  115. texts = [c.text.strip() for c in cluster if c.text.strip()]
  116. if not texts:
  117. return "", False, {}
  118. debug_ratio = sum(1 for t in texts if is_debug_like(t)) / max(len(texts), 1)
  119. has_resolution = any(any(w in normalize(t) for w in ["resolved", "fixed", "works", "done"]) for t in texts)
  120. if len(texts) < 4 or debug_ratio < 0.35 or not has_resolution:
  121. return "", False, {}
  122. facts = extract_facts(texts)
  123. requires_fact_preservation = cluster_has_fact_signals(texts)
  124. has_extracted_facts = any(len(v) > 0 for v in facts.values())
  125. # guardrail: if cluster appears to contain facts but we couldn't preserve them, skip compaction
  126. if requires_fact_preservation and not has_extracted_facts:
  127. return "", False, {}
  128. head = texts[0][:280]
  129. tail = texts[-1][:280]
  130. facts_line = ""
  131. if has_extracted_facts:
  132. facts_line = (
  133. " Preserved facts: "
  134. f"names={facts['names'] or []}, "
  135. f"phones={facts['phones'] or []}, "
  136. f"emails={facts['emails'] or []}."
  137. )
  138. summary = (
  139. "[COMPACTED DEBUG ARC] "
  140. f"Started with: {head} | Final state: {tail}. "
  141. "Intermediate trial/error messages were compacted."
  142. + facts_line
  143. )
  144. return summary, True, facts
  145. def run(args: argparse.Namespace) -> None:
  146. client = Mem0Client(args.base_url, timeout=args.timeout)
  147. memories = client.all_memories(args.user_id)
  148. # keep very recent entries untouched
  149. cutoff = dt.datetime.now(dt.timezone.utc) - dt.timedelta(days=args.min_age_days)
  150. candidates = [m for m in memories if m.created_dt < cutoff]
  151. clusters = cluster_by_time(candidates, args.gap_minutes)
  152. report = {
  153. "user_id": args.user_id,
  154. "total_memories": len(memories),
  155. "candidates": len(candidates),
  156. "clusters": len(clusters),
  157. "actions": [],
  158. }
  159. delete_budget = args.max_deletes
  160. for cluster in clusters:
  161. summary, should_compact, facts = summarize_cluster(cluster)
  162. if not should_compact:
  163. continue
  164. ids = [m.id for m in cluster if m.id]
  165. if len(ids) < 2:
  166. continue
  167. to_delete = ids[:-1] # keep latest raw entry as an extra guardrail
  168. if len(to_delete) > delete_budget:
  169. continue
  170. action = {
  171. "type": "compact_debug_arc",
  172. "cluster_size": len(cluster),
  173. "delete_ids": to_delete,
  174. "keep_id": ids[-1],
  175. "summary_preview": summary[:240],
  176. "preserved_facts": facts,
  177. }
  178. report["actions"].append(action)
  179. if args.apply:
  180. metadata = {
  181. "compacted_at": dt.datetime.now(dt.timezone.utc).isoformat(),
  182. "compactor_version": "0.2",
  183. "compacted_from_ids": ids,
  184. "kind": "debug_arc_summary",
  185. "preserved_facts": facts,
  186. }
  187. client.write_memory(args.user_id, summary, metadata)
  188. for mid in to_delete:
  189. client.delete_memory(mid)
  190. delete_budget -= len(to_delete)
  191. print(json.dumps(report, indent=2, ensure_ascii=False))
  192. def parse_args() -> argparse.Namespace:
  193. p = argparse.ArgumentParser(description="Compacts conversational memories with temporal clustering.")
  194. p.add_argument("--base-url", default="http://192.168.0.200:8420")
  195. p.add_argument("--user-id", required=True)
  196. p.add_argument("--apply", action="store_true", help="Apply changes. Default is dry-run.")
  197. p.add_argument("--gap-minutes", type=int, default=45)
  198. p.add_argument("--min-age-days", type=int, default=2)
  199. p.add_argument("--max-deletes", type=int, default=50)
  200. p.add_argument("--timeout", type=int, default=20)
  201. return p.parse_args()
  202. if __name__ == "__main__":
  203. run(parse_args())