from __future__ import annotations import json import sqlite3 from datetime import datetime, timezone from pathlib import Path from typing import Any ROOT = Path(__file__).resolve().parents[2] DATA_DIR = ROOT / "data" DB_PATH = DATA_DIR / "hermes_mcp.sqlite3" SCHEMA_STATEMENTS = [ """ create table if not exists concerns ( id text primary key, account_id text, market_symbol text, base_currency text, quote_currency text, strategy_id text, decision_profile_id text, source text not null, status text not null default 'active', notes text, created_at text not null, updated_at text not null ) """, """ create table if not exists decision_profiles ( id text primary key, name text not null, description text, config_json text not null, status text not null default 'active', created_at text not null, updated_at text not null ) """, """ create table if not exists strategy_groups ( id text primary key, concern_id text not null, name text not null, strategy_family text, decision_profile_id text, notes text, status text not null default 'active', created_at text not null, updated_at text not null, foreign key(concern_id) references concerns(id) on delete cascade, foreign key(decision_profile_id) references decision_profiles(id) ) """, """ create table if not exists strategy_assignments ( id text primary key, strategy_group_id text not null, strategy_id text not null, strategy_type text, role text, status text not null default 'active', notes text, created_at text not null, updated_at text not null, foreign key(strategy_group_id) references strategy_groups(id) on delete cascade ) """, """ create table if not exists cycles ( id text primary key, started_at text not null, finished_at text, status text not null default 'running', trigger text not null, notes text ) """, """ create table if not exists observations ( id text primary key, cycle_id text not null, concern_id text, source text not null, kind text not null, payload_json text not null, observed_at text not null, foreign key(cycle_id) references cycles(id), foreign key(concern_id) references concerns(id) ) """, """ create table if not exists states ( id text primary key, cycle_id text not null, concern_id text not null, market_regime text, volatility_state text, liquidity_state text, sentiment_pressure text, event_risk text, execution_quality text, confidence real, payload_json text not null, created_at text not null, foreign key(cycle_id) references cycles(id), foreign key(concern_id) references concerns(id) ) """, """ create table if not exists narratives ( id text primary key, cycle_id text not null, concern_id text not null, summary text not null, key_drivers_json text not null, risk_flags_json text not null, uncertainties_json text not null, confidence real, created_at text not null, foreign key(cycle_id) references cycles(id), foreign key(concern_id) references concerns(id) ) """, """ create table if not exists decisions ( id text primary key, cycle_id text not null, concern_id text not null, mode text not null, action text not null, target_strategy text, target_policy_json text, reason_summary text, confidence real, requires_action integer not null default 0, created_at text not null, foreign key(cycle_id) references cycles(id), foreign key(concern_id) references concerns(id) ) """, """ create table if not exists actions ( id text primary key, decision_id text not null, target text not null, command text not null, request_json text not null, response_json text, status text not null default 'pending', executed_at text, foreign key(decision_id) references decisions(id) ) """, """ create table if not exists coverage_gaps ( id text primary key, cycle_id text not null, concern_id text, gap_type text not null, summary text not null, recommendation_json text not null, status text not null default 'open', created_at text not null, foreign key(cycle_id) references cycles(id), foreign key(concern_id) references concerns(id) ) """, """ create table if not exists regime_samples ( id text primary key, cycle_id text not null, concern_id text not null, timeframe text not null, regime_json text not null, captured_at text not null, foreign key(cycle_id) references cycles(id), foreign key(concern_id) references concerns(id) ) """, "create index if not exists idx_observations_cycle on observations(cycle_id)", "create index if not exists idx_observations_concern on observations(concern_id)", "create index if not exists idx_concerns_profile on concerns(decision_profile_id)", "create index if not exists idx_decision_profiles_status on decision_profiles(status)", "create index if not exists idx_strategy_groups_concern on strategy_groups(concern_id)", "create index if not exists idx_strategy_groups_profile on strategy_groups(decision_profile_id)", "create index if not exists idx_strategy_assignments_group on strategy_assignments(strategy_group_id)", "create index if not exists idx_strategy_assignments_strategy on strategy_assignments(strategy_id)", "create index if not exists idx_states_cycle on states(cycle_id)", "create index if not exists idx_states_concern on states(concern_id)", "create index if not exists idx_narratives_cycle on narratives(cycle_id)", "create index if not exists idx_decisions_cycle on decisions(cycle_id)", "create index if not exists idx_actions_decision on actions(decision_id)", "create index if not exists idx_gaps_cycle on coverage_gaps(cycle_id)", "create index if not exists idx_regime_samples_concern on regime_samples(concern_id)", ] def _now() -> str: return datetime.now(timezone.utc).isoformat() def _connect() -> sqlite3.Connection: DATA_DIR.mkdir(parents=True, exist_ok=True) conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row conn.execute("pragma foreign_keys = on") return conn def init_db() -> None: with _connect() as conn: for stmt in SCHEMA_STATEMENTS: if stmt.lstrip().lower().startswith("create index"): continue conn.execute(stmt) conn.execute( """ create table if not exists state ( key text primary key, value text not null, updated_at text not null default current_timestamp ) """ ) concern_columns = {row[1] for row in conn.execute("pragma table_info(concerns)").fetchall()} for column in ("base_currency", "quote_currency", "decision_profile_id"): if column not in concern_columns: conn.execute(f"alter table concerns add column {column} text") for stmt in SCHEMA_STATEMENTS: if not stmt.lstrip().lower().startswith("create index"): continue conn.execute(stmt) def get_state() -> dict[str, Any]: init_db() with _connect() as conn: row = conn.execute("select value from state where key = ?", ("snapshot",)).fetchone() if not row: return { "status": "stub", "thinking": "Hermes is scaffolded and waiting for integrations.", "layers": ["concerns", "decision_profiles", "strategy_groups", "strategy_assignments", "cycles", "observations", "states", "narratives", "decisions", "actions", "coverage_gaps"], } return json.loads(row["value"]) def put_state(payload: dict[str, Any]) -> None: init_db() with _connect() as conn: conn.execute( "insert into state(key, value, updated_at) values(?, ?, current_timestamp) on conflict(key) do update set value=excluded.value, updated_at=current_timestamp", ("snapshot", json.dumps(payload)), ) def upsert_concern(*, id: str, account_id: str | None, market_symbol: str | None, base_currency: str | None = None, quote_currency: str | None = None, strategy_id: str | None, decision_profile_id: str | None = None, source: str, status: str = "active", notes: str | None = None) -> None: init_db() now = _now() with _connect() as conn: conn.execute( """ insert into concerns(id, account_id, market_symbol, base_currency, quote_currency, strategy_id, decision_profile_id, source, status, notes, created_at, updated_at) values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) on conflict(id) do update set account_id=excluded.account_id, market_symbol=excluded.market_symbol, base_currency=excluded.base_currency, quote_currency=excluded.quote_currency, strategy_id=excluded.strategy_id, decision_profile_id=excluded.decision_profile_id, source=excluded.source, status=excluded.status, notes=excluded.notes, updated_at=excluded.updated_at """, (id, account_id, market_symbol, base_currency, quote_currency, strategy_id, decision_profile_id, source, status, notes, now, now), ) def list_concerns() -> list[dict[str, Any]]: init_db() with _connect() as conn: rows = conn.execute("select * from concerns order by updated_at desc").fetchall() return [dict(r) for r in rows] def upsert_decision_profile(*, id: str, name: str, config: dict[str, Any], description: str | None = None, status: str = "active") -> None: init_db() now = _now() with _connect() as conn: conn.execute( """ insert into decision_profiles(id, name, description, config_json, status, created_at, updated_at) values(?, ?, ?, ?, ?, ?, ?) on conflict(id) do update set name=excluded.name, description=excluded.description, config_json=excluded.config_json, status=excluded.status, updated_at=excluded.updated_at """, (id, name, description, json.dumps(config, ensure_ascii=False), status, now, now), ) def list_decision_profiles() -> list[dict[str, Any]]: init_db() with _connect() as conn: rows = conn.execute("select * from decision_profiles order by updated_at desc").fetchall() return [dict(r) for r in rows] def get_decision_profile(*, profile_id: str) -> dict[str, Any] | None: init_db() profile_id = str(profile_id or "").strip() if not profile_id: return None with _connect() as conn: row = conn.execute("select * from decision_profiles where id = ?", (profile_id,)).fetchone() return dict(row) if row else None def upsert_strategy_group(*, id: str, concern_id: str, name: str, strategy_family: str | None = None, decision_profile_id: str | None = None, notes: str | None = None, status: str = "active") -> None: init_db() now = _now() with _connect() as conn: conn.execute( """ insert into strategy_groups(id, concern_id, name, strategy_family, decision_profile_id, notes, status, created_at, updated_at) values(?, ?, ?, ?, ?, ?, ?, ?, ?) on conflict(id) do update set concern_id=excluded.concern_id, name=excluded.name, strategy_family=excluded.strategy_family, decision_profile_id=excluded.decision_profile_id, notes=excluded.notes, status=excluded.status, updated_at=excluded.updated_at """, (id, concern_id, name, strategy_family, decision_profile_id, notes, status, now, now), ) def list_strategy_groups(*, concern_id: str | None = None) -> list[dict[str, Any]]: init_db() with _connect() as conn: if concern_id: rows = conn.execute("select * from strategy_groups where concern_id = ? order by updated_at desc", (concern_id,)).fetchall() else: rows = conn.execute("select * from strategy_groups order by updated_at desc").fetchall() return [dict(r) for r in rows] def upsert_strategy_assignment(*, id: str, strategy_group_id: str, strategy_id: str, strategy_type: str | None = None, role: str | None = None, status: str = "active", notes: str | None = None) -> None: init_db() now = _now() with _connect() as conn: conn.execute( """ insert into strategy_assignments(id, strategy_group_id, strategy_id, strategy_type, role, status, notes, created_at, updated_at) values(?, ?, ?, ?, ?, ?, ?, ?, ?) on conflict(id) do update set strategy_group_id=excluded.strategy_group_id, strategy_id=excluded.strategy_id, strategy_type=excluded.strategy_type, role=excluded.role, status=excluded.status, notes=excluded.notes, updated_at=excluded.updated_at """, (id, strategy_group_id, strategy_id, strategy_type, role, status, notes, now, now), ) def list_strategy_assignments(*, strategy_group_id: str | None = None) -> list[dict[str, Any]]: init_db() with _connect() as conn: if strategy_group_id: rows = conn.execute("select * from strategy_assignments where strategy_group_id = ? order by updated_at desc", (strategy_group_id,)).fetchall() else: rows = conn.execute("select * from strategy_assignments order by updated_at desc").fetchall() return [dict(r) for r in rows] def delete_concern(*, concern_id: str) -> dict[str, int]: init_db() concern_id = str(concern_id or "").strip() if not concern_id: return {"concerns": 0, "strategy_groups": 0, "strategy_assignments": 0, "observations": 0, "states": 0, "narratives": 0, "decisions": 0, "actions": 0, "coverage_gaps": 0, "regime_samples": 0} deleted = {"concerns": 0, "strategy_groups": 0, "strategy_assignments": 0, "observations": 0, "states": 0, "narratives": 0, "decisions": 0, "actions": 0, "coverage_gaps": 0, "regime_samples": 0} with _connect() as conn: decision_ids = [row[0] for row in conn.execute("select id from decisions where concern_id = ?", (concern_id,)).fetchall()] group_ids = [row[0] for row in conn.execute("select id from strategy_groups where concern_id = ?", (concern_id,)).fetchall()] deleted["actions"] = conn.execute( f"delete from actions where decision_id in ({','.join('?' for _ in decision_ids)})", decision_ids, ).rowcount if decision_ids else 0 deleted["strategy_assignments"] = conn.execute( f"delete from strategy_assignments where strategy_group_id in ({','.join('?' for _ in group_ids)})", group_ids, ).rowcount if group_ids else 0 deleted["strategy_groups"] = conn.execute("delete from strategy_groups where concern_id = ?", (concern_id,)).rowcount or 0 deleted["observations"] = conn.execute("delete from observations where concern_id = ?", (concern_id,)).rowcount or 0 deleted["states"] = conn.execute("delete from states where concern_id = ?", (concern_id,)).rowcount or 0 deleted["narratives"] = conn.execute("delete from narratives where concern_id = ?", (concern_id,)).rowcount or 0 deleted["coverage_gaps"] = conn.execute("delete from coverage_gaps where concern_id = ?", (concern_id,)).rowcount or 0 deleted["regime_samples"] = conn.execute("delete from regime_samples where concern_id = ?", (concern_id,)).rowcount or 0 deleted["decisions"] = conn.execute("delete from decisions where concern_id = ?", (concern_id,)).rowcount or 0 deleted["concerns"] = conn.execute("delete from concerns where id = ?", (concern_id,)).rowcount or 0 return deleted def prune_older_than(days: int) -> dict[str, int]: init_db() cutoff = datetime.now(timezone.utc).timestamp() - (days * 86400) cutoff_iso = datetime.fromtimestamp(cutoff, tz=timezone.utc).isoformat() with _connect() as conn: deleted = {} for table in ("actions", "decisions", "narratives", "states", "observations", "coverage_gaps", "cycles"): if table == "actions": where = "executed_at is not null and executed_at < ?" elif table in {"decisions", "narratives", "states", "observations", "coverage_gaps", "cycles"}: where = "created_at < ?" if table != "cycles" else "started_at < ?" else: continue cur = conn.execute(f"delete from {table} where {where}", (cutoff_iso,)) deleted[table] = cur.rowcount if cur.rowcount is not None else 0 return deleted def sync_concerns_from_strategies(strategies: list[dict[str, Any]]) -> list[dict[str, Any]]: seen: set[str] = set() synced: list[dict[str, Any]] = [] existing = {str(c.get("id") or ""): c for c in list_concerns()} for s in strategies: account_id = str(s.get("account_id") or "").strip() or None market_symbol = str(s.get("market_symbol") or "").strip() or None base_currency = str(s.get("base_currency") or "").strip() or None quote_currency = str(s.get("counter_currency") or s.get("quote_currency") or "").strip() or None strategy_id = str(s.get("id") or s.get("name") or s.get("strategy_type") or "").strip() or None if not account_id or not market_symbol: continue concern_id = f"{account_id}:{market_symbol}" if concern_id in seen: continue seen.add(concern_id) current = existing.get(concern_id, {}) upsert_concern( id=concern_id, account_id=account_id, market_symbol=market_symbol, base_currency=base_currency, quote_currency=quote_currency, strategy_id=strategy_id, decision_profile_id=str(current.get("decision_profile_id") or "").strip() or None, source=str(current.get("source") or "trader_inventory"), status=str(current.get("status") or "active"), notes=str(current.get("notes") or "mirrored from trader strategy inventory").strip() or None, ) synced.append({"id": concern_id, "account_id": account_id, "market_symbol": market_symbol, "base_currency": base_currency, "quote_currency": quote_currency, "strategy_id": strategy_id}) return synced def table_counts() -> dict[str, int]: init_db() tables = ["concerns", "decision_profiles", "strategy_groups", "strategy_assignments", "cycles", "observations", "states", "narratives", "decisions", "actions", "coverage_gaps", "regime_samples"] out: dict[str, int] = {} with _connect() as conn: for table in tables: out[table] = int(conn.execute(f"select count(*) as n from {table}").fetchone()["n"]) return out def latest_cycle() -> dict[str, Any] | None: init_db() with _connect() as conn: row = conn.execute("select * from cycles order by started_at desc limit 1").fetchone() return dict(row) if row else None def latest_cycles(limit: int = 20) -> list[dict[str, Any]]: init_db() with _connect() as conn: rows = conn.execute("select * from cycles order by started_at desc limit ?", (limit,)).fetchall() return [dict(r) for r in rows] def upsert_cycle(*, id: str, started_at: str, finished_at: str | None, status: str, trigger: str, notes: str | None = None) -> None: init_db() with _connect() as conn: conn.execute( """ insert into cycles(id, started_at, finished_at, status, trigger, notes) values(?, ?, ?, ?, ?, ?) on conflict(id) do update set started_at=excluded.started_at, finished_at=excluded.finished_at, status=excluded.status, trigger=excluded.trigger, notes=excluded.notes """, (id, started_at, finished_at, status, trigger, notes), ) def upsert_regime_sample(*, id: str, cycle_id: str, concern_id: str, timeframe: str, regime_json: str, captured_at: str) -> None: init_db() with _connect() as conn: conn.execute( """ insert into regime_samples(id, cycle_id, concern_id, timeframe, regime_json, captured_at) values(?, ?, ?, ?, ?, ?) on conflict(id) do update set cycle_id=excluded.cycle_id, concern_id=excluded.concern_id, timeframe=excluded.timeframe, regime_json=excluded.regime_json, captured_at=excluded.captured_at """, (id, cycle_id, concern_id, timeframe, regime_json, captured_at), ) def upsert_observation(*, id: str, cycle_id: str, concern_id: str | None, source: str, kind: str, payload_json: str, observed_at: str | None = None) -> None: init_db() observed_at = observed_at or _now() with _connect() as conn: conn.execute( """ insert into observations(id, cycle_id, concern_id, source, kind, payload_json, observed_at) values(?, ?, ?, ?, ?, ?, ?) on conflict(id) do update set cycle_id=excluded.cycle_id, concern_id=excluded.concern_id, source=excluded.source, kind=excluded.kind, payload_json=excluded.payload_json, observed_at=excluded.observed_at """, (id, cycle_id, concern_id, source, kind, payload_json, observed_at), ) def upsert_state(*, id: str, cycle_id: str, concern_id: str, market_regime: str | None, volatility_state: str | None, liquidity_state: str | None, sentiment_pressure: str | None, event_risk: str | None, execution_quality: str | None, confidence: float | None, payload_json: str, created_at: str | None = None) -> None: init_db() created_at = created_at or _now() with _connect() as conn: conn.execute( """ insert into states(id, cycle_id, concern_id, market_regime, volatility_state, liquidity_state, sentiment_pressure, event_risk, execution_quality, confidence, payload_json, created_at) values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) on conflict(id) do update set cycle_id=excluded.cycle_id, concern_id=excluded.concern_id, market_regime=excluded.market_regime, volatility_state=excluded.volatility_state, liquidity_state=excluded.liquidity_state, sentiment_pressure=excluded.sentiment_pressure, event_risk=excluded.event_risk, execution_quality=excluded.execution_quality, confidence=excluded.confidence, payload_json=excluded.payload_json, created_at=excluded.created_at """, (id, cycle_id, concern_id, market_regime, volatility_state, liquidity_state, sentiment_pressure, event_risk, execution_quality, confidence, payload_json, created_at), ) def upsert_narrative(*, id: str, cycle_id: str, concern_id: str, summary: str, key_drivers_json: str, risk_flags_json: str, uncertainties_json: str, confidence: float | None, created_at: str | None = None) -> None: init_db() created_at = created_at or _now() with _connect() as conn: conn.execute( """ insert into narratives(id, cycle_id, concern_id, summary, key_drivers_json, risk_flags_json, uncertainties_json, confidence, created_at) values(?, ?, ?, ?, ?, ?, ?, ?, ?) on conflict(id) do update set cycle_id=excluded.cycle_id, concern_id=excluded.concern_id, summary=excluded.summary, key_drivers_json=excluded.key_drivers_json, risk_flags_json=excluded.risk_flags_json, uncertainties_json=excluded.uncertainties_json, confidence=excluded.confidence, created_at=excluded.created_at """, (id, cycle_id, concern_id, summary, key_drivers_json, risk_flags_json, uncertainties_json, confidence, created_at), ) def upsert_decision(*, id: str, cycle_id: str, concern_id: str, mode: str, action: str, target_strategy: str | None, target_policy_json: str | None, reason_summary: str | None, confidence: float | None, requires_action: bool, created_at: str | None = None) -> None: init_db() created_at = created_at or _now() with _connect() as conn: conn.execute( """ insert into decisions(id, cycle_id, concern_id, mode, action, target_strategy, target_policy_json, reason_summary, confidence, requires_action, created_at) values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) on conflict(id) do update set cycle_id=excluded.cycle_id, concern_id=excluded.concern_id, mode=excluded.mode, action=excluded.action, target_strategy=excluded.target_strategy, target_policy_json=excluded.target_policy_json, reason_summary=excluded.reason_summary, confidence=excluded.confidence, requires_action=excluded.requires_action, created_at=excluded.created_at """, (id, cycle_id, concern_id, mode, action, target_strategy, target_policy_json, reason_summary, confidence, 1 if requires_action else 0, created_at), ) def latest_states(limit: int = 20) -> list[dict[str, Any]]: init_db() with _connect() as conn: rows = conn.execute("select * from states order by created_at desc limit ?", (limit,)).fetchall() return [dict(r) for r in rows] def recent_states_for_concern(*, concern_id: str, since_seconds: int, limit: int = 24) -> list[dict[str, Any]]: init_db() cutoff = datetime.now(timezone.utc).timestamp() - max(1, since_seconds) cutoff_iso = datetime.fromtimestamp(cutoff, tz=timezone.utc).isoformat() with _connect() as conn: rows = conn.execute( "select * from states where concern_id = ? and created_at >= ? order by created_at asc limit ?", (concern_id, cutoff_iso, limit), ).fetchall() return [dict(r) for r in rows] def latest_decisions(limit: int = 20) -> list[dict[str, Any]]: init_db() with _connect() as conn: rows = conn.execute("select * from decisions order by created_at desc limit ?", (limit,)).fetchall() return [dict(r) for r in rows] def latest_narratives(limit: int = 20) -> list[dict[str, Any]]: init_db() with _connect() as conn: rows = conn.execute("select * from narratives order by created_at desc limit ?", (limit,)).fetchall() return [dict(r) for r in rows] def latest_regime_samples(limit: int = 20) -> list[dict[str, Any]]: init_db() with _connect() as conn: rows = conn.execute("select * from regime_samples order by captured_at desc limit ?", (limit,)).fetchall() return [dict(r) for r in rows] def latest_observations(limit: int = 20, source: str | None = None) -> list[dict[str, Any]]: init_db() with _connect() as conn: if source: rows = conn.execute( "select * from observations where source = ? order by observed_at desc limit ?", (source, limit), ).fetchall() else: rows = conn.execute("select * from observations order by observed_at desc limit ?", (limit,)).fetchall() return [dict(r) for r in rows] def recent_regime_samples(limit: int = 200) -> list[dict[str, Any]]: init_db() with _connect() as conn: rows = conn.execute("select * from regime_samples order by captured_at desc limit ?", (limit,)).fetchall() return [dict(r) for r in rows]