浏览代码

prune fix

Lukas Goldschmidt 2 周之前
父节点
当前提交
ceef0fd2de
共有 6 个文件被更改,包括 215 次插入18 次删除
  1. 1 1
      .env.example
  2. 25 2
      src/hermes_mcp/config.py
  3. 2 2
      src/hermes_mcp/server.py
  4. 48 11
      src/hermes_mcp/store.py
  5. 119 0
      tests/test_concern_cleanup.py
  6. 20 2
      tests/test_config.py

+ 1 - 1
.env.example

@@ -5,7 +5,7 @@ HERMES_NEWS_URL=http://192.168.0.200:8506/mcp/sse
 HERMES_ATLAS_URL=http://192.168.0.249:8550/mcp/sse
 HERMES_EXEC_URL=http://192.168.0.249:8560/mcp/sse
 HERMES_CRYPTO_TIMEFRAMES=1m,5m,15m,1h,4h,1d
-HERMES_RETENTION_DAYS=7
+HERMES_RETENTION_HOURS=24
 HERMES_PRUNE_INTERVAL_HOURS=6
 HERMES_CYCLE_SECONDS=60
 HERMES_BREAKOUT_MEMORY_WINDOW_SECONDS=900

+ 25 - 2
src/hermes_mcp/config.py

@@ -49,6 +49,29 @@ def _env_int(name: str, default: int) -> int:
         return default
 
 
+def _env_retention_hours(*, default_hours: int = 24) -> int:
+    file_values = _load_env_file()
+    raw_hours = os.getenv("HERMES_RETENTION_HOURS")
+    if raw_hours is None:
+        raw_hours = file_values.get("HERMES_RETENTION_HOURS")
+    if raw_hours is not None:
+        try:
+            return int(raw_hours)
+        except Exception:
+            return default_hours
+
+    raw_days = os.getenv("HERMES_RETENTION_DAYS")
+    if raw_days is None:
+        raw_days = file_values.get("HERMES_RETENTION_DAYS")
+    if raw_days is not None:
+        try:
+            return int(raw_days) * 24
+        except Exception:
+            return default_hours
+
+    return default_hours
+
+
 @dataclass(frozen=True)
 class HermesConfig:
     trader_url: str
@@ -59,7 +82,7 @@ class HermesConfig:
     atlas_url: str
     exec_url: str
     crypto_timeframes: tuple[str, ...]
-    retention_days: int
+    retention_hours: int
     prune_interval_hours: int
     cycle_seconds: int
     breakout_memory_window_seconds: int
@@ -81,7 +104,7 @@ def load_config() -> HermesConfig:
         atlas_url=_env("HERMES_ATLAS_URL", "http://127.0.0.1:8550"),
         exec_url=_env("HERMES_EXEC_URL", "http://127.0.0.1:8560"),
         crypto_timeframes=timeframes,
-        retention_days=_env_int("HERMES_RETENTION_DAYS", 7),
+        retention_hours=_env_retention_hours(default_hours=24),
         prune_interval_hours=_env_int("HERMES_PRUNE_INTERVAL_HOURS", 6),
         cycle_seconds=_env_int("HERMES_CYCLE_SECONDS", 60),
         breakout_memory_window_seconds=_env_int("HERMES_BREAKOUT_MEMORY_WINDOW_SECONDS", 900),

+ 2 - 2
src/hermes_mcp/server.py

@@ -25,7 +25,7 @@ from .narrative_engine import build_narrative
 from .playbooks import default_playbook_id_for_strategies, get_playbook_definition, playbook_parameter_definitions, resolve_playbook_parameters, supported_playbook_parameter_ids
 from .replay import build_replay_input
 from .state_engine import synthesize_state
-from .store import delete_concern, get_decision_profile, get_state, init_db, list_concerns, list_strategy_assignments, list_strategy_groups, latest_cycle, latest_cycles, latest_decisions, latest_narratives, latest_observations, latest_regime_samples, prune_older_than, recent_regime_samples, recent_states_for_concern, sync_concerns_from_strategies, upsert_concern, upsert_cycle, upsert_decision, upsert_decision_profile, upsert_narrative, upsert_observation, upsert_regime_sample, upsert_state, latest_states, upsert_strategy_assignment, upsert_strategy_group
+from .store import delete_concern, get_decision_profile, get_state, init_db, list_concerns, list_strategy_assignments, list_strategy_groups, latest_cycle, latest_cycles, latest_decisions, latest_narratives, latest_observations, latest_regime_samples, prune_older_than_hours, recent_regime_samples, recent_states_for_concern, sync_concerns_from_strategies, upsert_concern, upsert_cycle, upsert_decision, upsert_decision_profile, upsert_narrative, upsert_observation, upsert_regime_sample, upsert_state, latest_states, upsert_strategy_assignment, upsert_strategy_group
 from .trader_client import apply_control_decision as trader_apply_control_decision, cancel_all_orders as trader_cancel_all_orders, control_strategy as trader_control_strategy, get_strategy as trader_get_strategy, list_strategies
 
 TESTING = "pytest" in sys.modules
@@ -237,7 +237,7 @@ async def lifespan(_: FastAPI):
     except Exception:
         pass
     try:
-        prune_older_than(cfg.retention_days)
+        prune_older_than_hours(cfg.retention_hours)
     except Exception:
         pass
 

+ 48 - 11
src/hermes_mcp/store.py

@@ -477,24 +477,61 @@ def delete_concern(*, concern_id: str) -> dict[str, int]:
     return deleted
 
 
-def prune_older_than(days: int) -> dict[str, int]:
+def prune_older_than_hours(hours: int) -> dict[str, int]:
     init_db()
-    cutoff = datetime.now(timezone.utc).timestamp() - (days * 86400)
+    cutoff = datetime.now(timezone.utc).timestamp() - (max(0, hours) * 3600)
     cutoff_iso = datetime.fromtimestamp(cutoff, tz=timezone.utc).isoformat()
     with _connect() as conn:
-        deleted = {}
-        for table in ("actions", "decisions", "narratives", "states", "observations", "coverage_gaps", "cycles"):
-            if table == "actions":
-                where = "executed_at is not null and executed_at < ?"
-            elif table in {"decisions", "narratives", "states", "observations", "coverage_gaps", "cycles"}:
-                where = "created_at < ?" if table != "cycles" else "started_at < ?"
-            else:
-                continue
-            cur = conn.execute(f"delete from {table} where {where}", (cutoff_iso,))
+        deleted = {table: 0 for table in ("actions", "decisions", "narratives", "states", "observations", "coverage_gaps", "regime_samples", "cycles")}
+
+        old_cycle_ids = [row[0] for row in conn.execute("select id from cycles where started_at < ?", (cutoff_iso,)).fetchall()]
+        old_cycle_decision_ids: list[str] = []
+        if old_cycle_ids:
+            placeholders = ",".join("?" for _ in old_cycle_ids)
+            old_cycle_decision_ids = [row[0] for row in conn.execute(f"select id from decisions where cycle_id in ({placeholders})", old_cycle_ids).fetchall()]
+
+        old_decision_ids = [row[0] for row in conn.execute("select id from decisions where created_at < ?", (cutoff_iso,)).fetchall()]
+        action_decision_ids = list({*old_cycle_decision_ids, *old_decision_ids})
+
+        if action_decision_ids:
+            placeholders = ",".join("?" for _ in action_decision_ids)
+            cur = conn.execute(f"delete from actions where decision_id in ({placeholders})", action_decision_ids)
+            deleted["actions"] += cur.rowcount if cur.rowcount is not None else 0
+        cur = conn.execute("delete from actions where executed_at is not null and executed_at < ?", (cutoff_iso,))
+        deleted["actions"] += cur.rowcount if cur.rowcount is not None else 0
+
+        for table, column in (
+            ("decisions", "created_at"),
+            ("narratives", "created_at"),
+            ("states", "created_at"),
+            ("observations", "observed_at"),
+            ("coverage_gaps", "created_at"),
+            ("regime_samples", "captured_at"),
+        ):
+            where_parts = [f"{column} < ?"]
+            params: list[Any] = [cutoff_iso]
+            if old_cycle_ids:
+                placeholders = ",".join("?" for _ in old_cycle_ids)
+                where_parts.append(f"cycle_id in ({placeholders})")
+                params.extend(old_cycle_ids)
+            cur = conn.execute(f"delete from {table} where {' or '.join(where_parts)}", params)
             deleted[table] = cur.rowcount if cur.rowcount is not None else 0
+
+        if old_cycle_ids:
+            placeholders = ",".join("?" for _ in old_cycle_ids)
+            cur = conn.execute(f"delete from cycles where id in ({placeholders})", old_cycle_ids)
+            deleted["cycles"] = cur.rowcount if cur.rowcount is not None else 0
     return deleted
 
 
+def prune_older_than(days: int) -> dict[str, int]:
+    """Backward-compatible wrapper for older callers.
+
+    The historical interface used days; the new config path is hour-based.
+    """
+    return prune_older_than_hours(days * 24)
+
+
 def sync_concerns_from_strategies(strategies: list[dict[str, Any]]) -> list[dict[str, Any]]:
     seen: set[str] = set()
     synced: list[dict[str, Any]] = []

+ 119 - 0
tests/test_concern_cleanup.py

@@ -7,9 +7,13 @@ from hermes_mcp.store import (
     DB_PATH,
     delete_concern,
     init_db,
+    prune_older_than_hours,
+    upsert_decision_profile,
     upsert_concern,
     upsert_cycle,
     upsert_decision,
+    upsert_strategy_assignment,
+    upsert_strategy_group,
     upsert_narrative,
     upsert_observation,
     upsert_regime_sample,
@@ -24,6 +28,13 @@ def _count(table: str, value: str, column: str = "concern_id") -> int:
         return int(row["n"] if row else 0)
 
 
+def _total_count(table: str) -> int:
+    with sqlite3.connect(DB_PATH) as conn:
+        conn.row_factory = sqlite3.Row
+        row = conn.execute(f"select count(*) as n from {table}").fetchone()
+        return int(row["n"] if row else 0)
+
+
 def test_delete_concern_purges_related_rows():
     init_db()
     concern_id = f"test:{uuid4().hex}"
@@ -99,3 +110,111 @@ def test_delete_concern_purges_related_rows():
     for table in ("observations", "states", "narratives", "decisions", "coverage_gaps", "regime_samples"):
         assert _count(table, concern_id) == 0
     assert _count("actions", decision_id, "decision_id") == 0
+
+
+def test_prune_older_than_hours_keeps_concerns_and_config_tables():
+    init_db()
+    concern_id = f"test:{uuid4().hex}"
+    cycle_id = f"cycle:{uuid4().hex}"
+    decision_id = f"decision:{uuid4().hex}"
+    profile_id = f"profile:{uuid4().hex}"
+    group_id = f"group:{uuid4().hex}"
+    assignment_id = f"assignment:{uuid4().hex}"
+    old_at = "2026-04-24T00:00:00+00:00"
+
+    upsert_concern(
+        id=concern_id,
+        account_id="acct-1",
+        market_symbol="xrpusd",
+        base_currency="XRP",
+        quote_currency="USD",
+        strategy_id="trend-1",
+        source="test",
+        status="active",
+        notes="preserve me",
+    )
+    upsert_decision_profile(id=profile_id, name="Config profile", config={"keep": True})
+    upsert_strategy_group(id=group_id, concern_id=concern_id, name="Config group", strategy_family="mixed", decision_profile_id=profile_id)
+    upsert_strategy_assignment(id=assignment_id, strategy_group_id=group_id, strategy_id="trend-1", strategy_type="trend_follower", role="primary")
+    upsert_cycle(id=cycle_id, started_at=old_at, finished_at=None, status="ok", trigger="test")
+    upsert_observation(id=f"obs:{uuid4().hex}", cycle_id=cycle_id, concern_id=concern_id, source="test", kind="snapshot", payload_json="{}", observed_at=old_at)
+    upsert_state(
+        id=f"state:{uuid4().hex}",
+        cycle_id=cycle_id,
+        concern_id=concern_id,
+        market_regime="bull",
+        volatility_state="normal",
+        liquidity_state="good",
+        sentiment_pressure="neutral",
+        event_risk="low",
+        execution_quality="good",
+        confidence=0.9,
+        payload_json="{}",
+        created_at=old_at,
+    )
+    upsert_narrative(
+        id=f"narr:{uuid4().hex}",
+        cycle_id=cycle_id,
+        concern_id=concern_id,
+        summary="old",
+        key_drivers_json="[]",
+        risk_flags_json="[]",
+        uncertainties_json="[]",
+        confidence=0.8,
+        created_at=old_at,
+    )
+    upsert_decision(
+        id=decision_id,
+        cycle_id=cycle_id,
+        concern_id=concern_id,
+        action="replace_with_grid",
+        target_strategy="grid-1",
+        target_policy_json="{}",
+        reason_summary="old",
+        confidence=0.7,
+        requires_action=True,
+        created_at=old_at,
+    )
+    upsert_regime_sample(id=f"regime:{uuid4().hex}", cycle_id=cycle_id, concern_id=concern_id, timeframe="1h", regime_json="{}", captured_at=old_at)
+    with sqlite3.connect(DB_PATH) as conn:
+        conn.execute(
+            "insert into actions(id, decision_id, target, command, request_json, response_json, status, executed_at) values(?, ?, ?, ?, ?, ?, ?, ?)",
+            (f"action:{uuid4().hex}", decision_id, "trader", "switch", "{}", None, "done", old_at),
+        )
+        conn.commit()
+
+    before_counts = {
+        "cycles": _total_count("cycles"),
+        "decisions": _total_count("decisions"),
+        "actions": _total_count("actions"),
+        "observations": _total_count("observations"),
+        "states": _total_count("states"),
+        "narratives": _total_count("narratives"),
+        "coverage_gaps": _total_count("coverage_gaps"),
+        "regime_samples": _total_count("regime_samples"),
+    }
+
+    deleted = prune_older_than_hours(24)
+
+    assert deleted["cycles"] == before_counts["cycles"] - _total_count("cycles")
+    assert deleted["decisions"] == before_counts["decisions"] - _total_count("decisions")
+    assert deleted["actions"] == before_counts["actions"] - _total_count("actions")
+    assert deleted["observations"] == before_counts["observations"] - _total_count("observations")
+    assert deleted["states"] == before_counts["states"] - _total_count("states")
+    assert deleted["narratives"] == before_counts["narratives"] - _total_count("narratives")
+    assert deleted["coverage_gaps"] == before_counts["coverage_gaps"] - _total_count("coverage_gaps")
+    assert deleted["regime_samples"] == before_counts["regime_samples"] - _total_count("regime_samples")
+    assert "concerns" not in deleted
+    assert "decision_profiles" not in deleted
+    assert "strategy_groups" not in deleted
+    assert "strategy_assignments" not in deleted
+
+    assert _count("concerns", concern_id, "id") == 1
+    assert _count("decision_profiles", profile_id, "id") == 1
+    assert _count("strategy_groups", group_id, "id") == 1
+    assert _count("strategy_assignments", assignment_id, "id") == 1
+    assert _count("decisions", concern_id) == 0
+    assert _count("observations", concern_id) == 0
+    assert _count("states", concern_id) == 0
+    assert _count("narratives", concern_id) == 0
+    assert _count("regime_samples", concern_id) == 0

+ 20 - 2
tests/test_config.py

@@ -1,9 +1,27 @@
 from pathlib import Path
 
+import hermes_mcp.config as config
 from hermes_mcp.config import _load_env_file
 
 
-def test_env_example_includes_breakout_memory_window_setting():
+def test_env_example_includes_retention_hours_setting():
     env_example = Path(__file__).resolve().parents[1] / ".env.example"
     values = _load_env_file(env_example)
-    assert values["HERMES_BREAKOUT_MEMORY_WINDOW_SECONDS"] == "900"
+    assert values["HERMES_RETENTION_HOURS"] == "24"
+
+
+def test_load_config_uses_hours_and_legacy_days_fallback(monkeypatch):
+    monkeypatch.setattr(config, "_load_env_file", lambda path=config.ENV_PATH: {})
+    monkeypatch.delenv("HERMES_RETENTION_HOURS", raising=False)
+    monkeypatch.delenv("HERMES_RETENTION_DAYS", raising=False)
+    cfg = config.load_config()
+    assert cfg.retention_hours == 24
+
+    monkeypatch.setenv("HERMES_RETENTION_HOURS", "12")
+    cfg = config.load_config()
+    assert cfg.retention_hours == 12
+
+    monkeypatch.delenv("HERMES_RETENTION_HOURS", raising=False)
+    monkeypatch.setenv("HERMES_RETENTION_DAYS", "2")
+    cfg = config.load_config()
+    assert cfg.retention_hours == 48