| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678 |
- from __future__ import annotations
- import json
- import sqlite3
- from datetime import datetime, timezone
- from pathlib import Path
- from typing import Any
- ROOT = Path(__file__).resolve().parents[2]
- DATA_DIR = ROOT / "data"
- DB_PATH = DATA_DIR / "hermes_mcp.sqlite3"
- SCHEMA_STATEMENTS = [
- """
- create table if not exists concerns (
- id text primary key,
- account_id text,
- market_symbol text,
- base_currency text,
- quote_currency text,
- strategy_id text,
- decision_profile_id text,
- source text not null,
- status text not null default 'active',
- notes text,
- created_at text not null,
- updated_at text not null
- )
- """,
- """
- create table if not exists decision_profiles (
- id text primary key,
- name text not null,
- description text,
- config_json text not null,
- status text not null default 'active',
- created_at text not null,
- updated_at text not null
- )
- """,
- """
- create table if not exists strategy_groups (
- id text primary key,
- concern_id text not null,
- name text not null,
- strategy_family text,
- decision_profile_id text,
- notes text,
- status text not null default 'active',
- created_at text not null,
- updated_at text not null,
- foreign key(concern_id) references concerns(id) on delete cascade,
- foreign key(decision_profile_id) references decision_profiles(id)
- )
- """,
- """
- create table if not exists strategy_assignments (
- id text primary key,
- strategy_group_id text not null,
- strategy_id text not null,
- strategy_type text,
- role text,
- status text not null default 'active',
- notes text,
- created_at text not null,
- updated_at text not null,
- foreign key(strategy_group_id) references strategy_groups(id) on delete cascade
- )
- """,
- """
- create table if not exists cycles (
- id text primary key,
- started_at text not null,
- finished_at text,
- status text not null default 'running',
- trigger text not null,
- notes text
- )
- """,
- """
- create table if not exists observations (
- id text primary key,
- cycle_id text not null,
- concern_id text,
- source text not null,
- kind text not null,
- payload_json text not null,
- observed_at text not null,
- foreign key(cycle_id) references cycles(id),
- foreign key(concern_id) references concerns(id)
- )
- """,
- """
- create table if not exists states (
- id text primary key,
- cycle_id text not null,
- concern_id text not null,
- market_regime text,
- volatility_state text,
- liquidity_state text,
- sentiment_pressure text,
- event_risk text,
- execution_quality text,
- confidence real,
- payload_json text not null,
- created_at text not null,
- foreign key(cycle_id) references cycles(id),
- foreign key(concern_id) references concerns(id)
- )
- """,
- """
- create table if not exists narratives (
- id text primary key,
- cycle_id text not null,
- concern_id text not null,
- summary text not null,
- key_drivers_json text not null,
- risk_flags_json text not null,
- uncertainties_json text not null,
- confidence real,
- created_at text not null,
- foreign key(cycle_id) references cycles(id),
- foreign key(concern_id) references concerns(id)
- )
- """,
- """
- create table if not exists decisions (
- id text primary key,
- cycle_id text not null,
- concern_id text not null,
- mode text not null,
- action text not null,
- target_strategy text,
- target_policy_json text,
- reason_summary text,
- confidence real,
- requires_action integer not null default 0,
- created_at text not null,
- foreign key(cycle_id) references cycles(id),
- foreign key(concern_id) references concerns(id)
- )
- """,
- """
- create table if not exists actions (
- id text primary key,
- decision_id text not null,
- target text not null,
- command text not null,
- request_json text not null,
- response_json text,
- status text not null default 'pending',
- executed_at text,
- foreign key(decision_id) references decisions(id)
- )
- """,
- """
- create table if not exists coverage_gaps (
- id text primary key,
- cycle_id text not null,
- concern_id text,
- gap_type text not null,
- summary text not null,
- recommendation_json text not null,
- status text not null default 'open',
- created_at text not null,
- foreign key(cycle_id) references cycles(id),
- foreign key(concern_id) references concerns(id)
- )
- """,
- """
- create table if not exists regime_samples (
- id text primary key,
- cycle_id text not null,
- concern_id text not null,
- timeframe text not null,
- regime_json text not null,
- captured_at text not null,
- foreign key(cycle_id) references cycles(id),
- foreign key(concern_id) references concerns(id)
- )
- """,
- "create index if not exists idx_observations_cycle on observations(cycle_id)",
- "create index if not exists idx_observations_concern on observations(concern_id)",
- "create index if not exists idx_concerns_profile on concerns(decision_profile_id)",
- "create index if not exists idx_decision_profiles_status on decision_profiles(status)",
- "create index if not exists idx_strategy_groups_concern on strategy_groups(concern_id)",
- "create index if not exists idx_strategy_groups_profile on strategy_groups(decision_profile_id)",
- "create index if not exists idx_strategy_assignments_group on strategy_assignments(strategy_group_id)",
- "create index if not exists idx_strategy_assignments_strategy on strategy_assignments(strategy_id)",
- "create index if not exists idx_states_cycle on states(cycle_id)",
- "create index if not exists idx_states_concern on states(concern_id)",
- "create index if not exists idx_narratives_cycle on narratives(cycle_id)",
- "create index if not exists idx_decisions_cycle on decisions(cycle_id)",
- "create index if not exists idx_actions_decision on actions(decision_id)",
- "create index if not exists idx_gaps_cycle on coverage_gaps(cycle_id)",
- "create index if not exists idx_regime_samples_concern on regime_samples(concern_id)",
- ]
- def _now() -> str:
- return datetime.now(timezone.utc).isoformat()
- def _connect() -> sqlite3.Connection:
- DATA_DIR.mkdir(parents=True, exist_ok=True)
- conn = sqlite3.connect(DB_PATH)
- conn.row_factory = sqlite3.Row
- conn.execute("pragma foreign_keys = on")
- return conn
- def init_db() -> None:
- with _connect() as conn:
- for stmt in SCHEMA_STATEMENTS:
- if stmt.lstrip().lower().startswith("create index"):
- continue
- conn.execute(stmt)
- conn.execute(
- """
- create table if not exists state (
- key text primary key,
- value text not null,
- updated_at text not null default current_timestamp
- )
- """
- )
- concern_columns = {row[1] for row in conn.execute("pragma table_info(concerns)").fetchall()}
- for column in ("base_currency", "quote_currency", "decision_profile_id"):
- if column not in concern_columns:
- conn.execute(f"alter table concerns add column {column} text")
- for stmt in SCHEMA_STATEMENTS:
- if not stmt.lstrip().lower().startswith("create index"):
- continue
- conn.execute(stmt)
- def get_state() -> dict[str, Any]:
- init_db()
- with _connect() as conn:
- row = conn.execute("select value from state where key = ?", ("snapshot",)).fetchone()
- if not row:
- return {
- "status": "stub",
- "thinking": "Hermes is scaffolded and waiting for integrations.",
- "layers": ["concerns", "decision_profiles", "strategy_groups", "strategy_assignments", "cycles", "observations", "states", "narratives", "decisions", "actions", "coverage_gaps"],
- }
- return json.loads(row["value"])
- def put_state(payload: dict[str, Any]) -> None:
- init_db()
- with _connect() as conn:
- conn.execute(
- "insert into state(key, value, updated_at) values(?, ?, current_timestamp) on conflict(key) do update set value=excluded.value, updated_at=current_timestamp",
- ("snapshot", json.dumps(payload)),
- )
- def upsert_concern(*, id: str, account_id: str | None, market_symbol: str | None, base_currency: str | None = None, quote_currency: str | None = None, strategy_id: str | None, decision_profile_id: str | None = None, source: str, status: str = "active", notes: str | None = None) -> None:
- init_db()
- now = _now()
- with _connect() as conn:
- conn.execute(
- """
- insert into concerns(id, account_id, market_symbol, base_currency, quote_currency, strategy_id, decision_profile_id, source, status, notes, created_at, updated_at)
- values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
- on conflict(id) do update set
- account_id=excluded.account_id,
- market_symbol=excluded.market_symbol,
- base_currency=excluded.base_currency,
- quote_currency=excluded.quote_currency,
- strategy_id=excluded.strategy_id,
- decision_profile_id=excluded.decision_profile_id,
- source=excluded.source,
- status=excluded.status,
- notes=excluded.notes,
- updated_at=excluded.updated_at
- """,
- (id, account_id, market_symbol, base_currency, quote_currency, strategy_id, decision_profile_id, source, status, notes, now, now),
- )
- def list_concerns() -> list[dict[str, Any]]:
- init_db()
- with _connect() as conn:
- rows = conn.execute("select * from concerns order by updated_at desc").fetchall()
- return [dict(r) for r in rows]
- def upsert_decision_profile(*, id: str, name: str, config: dict[str, Any], description: str | None = None, status: str = "active") -> None:
- init_db()
- now = _now()
- with _connect() as conn:
- conn.execute(
- """
- insert into decision_profiles(id, name, description, config_json, status, created_at, updated_at)
- values(?, ?, ?, ?, ?, ?, ?)
- on conflict(id) do update set
- name=excluded.name,
- description=excluded.description,
- config_json=excluded.config_json,
- status=excluded.status,
- updated_at=excluded.updated_at
- """,
- (id, name, description, json.dumps(config, ensure_ascii=False), status, now, now),
- )
- def list_decision_profiles() -> list[dict[str, Any]]:
- init_db()
- with _connect() as conn:
- rows = conn.execute("select * from decision_profiles order by updated_at desc").fetchall()
- return [dict(r) for r in rows]
- def get_decision_profile(*, profile_id: str) -> dict[str, Any] | None:
- init_db()
- profile_id = str(profile_id or "").strip()
- if not profile_id:
- return None
- with _connect() as conn:
- row = conn.execute("select * from decision_profiles where id = ?", (profile_id,)).fetchone()
- return dict(row) if row else None
- def upsert_strategy_group(*, id: str, concern_id: str, name: str, strategy_family: str | None = None, decision_profile_id: str | None = None, notes: str | None = None, status: str = "active") -> None:
- init_db()
- now = _now()
- with _connect() as conn:
- conn.execute(
- """
- insert into strategy_groups(id, concern_id, name, strategy_family, decision_profile_id, notes, status, created_at, updated_at)
- values(?, ?, ?, ?, ?, ?, ?, ?, ?)
- on conflict(id) do update set
- concern_id=excluded.concern_id,
- name=excluded.name,
- strategy_family=excluded.strategy_family,
- decision_profile_id=excluded.decision_profile_id,
- notes=excluded.notes,
- status=excluded.status,
- updated_at=excluded.updated_at
- """,
- (id, concern_id, name, strategy_family, decision_profile_id, notes, status, now, now),
- )
- def list_strategy_groups(*, concern_id: str | None = None) -> list[dict[str, Any]]:
- init_db()
- with _connect() as conn:
- if concern_id:
- rows = conn.execute("select * from strategy_groups where concern_id = ? order by updated_at desc", (concern_id,)).fetchall()
- else:
- rows = conn.execute("select * from strategy_groups order by updated_at desc").fetchall()
- return [dict(r) for r in rows]
- def upsert_strategy_assignment(*, id: str, strategy_group_id: str, strategy_id: str, strategy_type: str | None = None, role: str | None = None, status: str = "active", notes: str | None = None) -> None:
- init_db()
- now = _now()
- with _connect() as conn:
- conn.execute(
- """
- insert into strategy_assignments(id, strategy_group_id, strategy_id, strategy_type, role, status, notes, created_at, updated_at)
- values(?, ?, ?, ?, ?, ?, ?, ?, ?)
- on conflict(id) do update set
- strategy_group_id=excluded.strategy_group_id,
- strategy_id=excluded.strategy_id,
- strategy_type=excluded.strategy_type,
- role=excluded.role,
- status=excluded.status,
- notes=excluded.notes,
- updated_at=excluded.updated_at
- """,
- (id, strategy_group_id, strategy_id, strategy_type, role, status, notes, now, now),
- )
- def list_strategy_assignments(*, strategy_group_id: str | None = None) -> list[dict[str, Any]]:
- init_db()
- with _connect() as conn:
- if strategy_group_id:
- rows = conn.execute("select * from strategy_assignments where strategy_group_id = ? order by updated_at desc", (strategy_group_id,)).fetchall()
- else:
- rows = conn.execute("select * from strategy_assignments order by updated_at desc").fetchall()
- return [dict(r) for r in rows]
- def delete_concern(*, concern_id: str) -> dict[str, int]:
- init_db()
- concern_id = str(concern_id or "").strip()
- if not concern_id:
- return {"concerns": 0, "strategy_groups": 0, "strategy_assignments": 0, "observations": 0, "states": 0, "narratives": 0, "decisions": 0, "actions": 0, "coverage_gaps": 0, "regime_samples": 0}
- deleted = {"concerns": 0, "strategy_groups": 0, "strategy_assignments": 0, "observations": 0, "states": 0, "narratives": 0, "decisions": 0, "actions": 0, "coverage_gaps": 0, "regime_samples": 0}
- with _connect() as conn:
- decision_ids = [row[0] for row in conn.execute("select id from decisions where concern_id = ?", (concern_id,)).fetchall()]
- group_ids = [row[0] for row in conn.execute("select id from strategy_groups where concern_id = ?", (concern_id,)).fetchall()]
- deleted["actions"] = conn.execute(
- f"delete from actions where decision_id in ({','.join('?' for _ in decision_ids)})",
- decision_ids,
- ).rowcount if decision_ids else 0
- deleted["strategy_assignments"] = conn.execute(
- f"delete from strategy_assignments where strategy_group_id in ({','.join('?' for _ in group_ids)})",
- group_ids,
- ).rowcount if group_ids else 0
- deleted["strategy_groups"] = conn.execute("delete from strategy_groups where concern_id = ?", (concern_id,)).rowcount or 0
- deleted["observations"] = conn.execute("delete from observations where concern_id = ?", (concern_id,)).rowcount or 0
- deleted["states"] = conn.execute("delete from states where concern_id = ?", (concern_id,)).rowcount or 0
- deleted["narratives"] = conn.execute("delete from narratives where concern_id = ?", (concern_id,)).rowcount or 0
- deleted["coverage_gaps"] = conn.execute("delete from coverage_gaps where concern_id = ?", (concern_id,)).rowcount or 0
- deleted["regime_samples"] = conn.execute("delete from regime_samples where concern_id = ?", (concern_id,)).rowcount or 0
- deleted["decisions"] = conn.execute("delete from decisions where concern_id = ?", (concern_id,)).rowcount or 0
- deleted["concerns"] = conn.execute("delete from concerns where id = ?", (concern_id,)).rowcount or 0
- return deleted
- def prune_older_than(days: int) -> dict[str, int]:
- init_db()
- cutoff = datetime.now(timezone.utc).timestamp() - (days * 86400)
- cutoff_iso = datetime.fromtimestamp(cutoff, tz=timezone.utc).isoformat()
- with _connect() as conn:
- deleted = {}
- for table in ("actions", "decisions", "narratives", "states", "observations", "coverage_gaps", "cycles"):
- if table == "actions":
- where = "executed_at is not null and executed_at < ?"
- elif table in {"decisions", "narratives", "states", "observations", "coverage_gaps", "cycles"}:
- where = "created_at < ?" if table != "cycles" else "started_at < ?"
- else:
- continue
- cur = conn.execute(f"delete from {table} where {where}", (cutoff_iso,))
- deleted[table] = cur.rowcount if cur.rowcount is not None else 0
- return deleted
- def sync_concerns_from_strategies(strategies: list[dict[str, Any]]) -> list[dict[str, Any]]:
- seen: set[str] = set()
- synced: list[dict[str, Any]] = []
- existing = {str(c.get("id") or ""): c for c in list_concerns()}
- for s in strategies:
- account_id = str(s.get("account_id") or "").strip() or None
- market_symbol = str(s.get("market_symbol") or "").strip() or None
- base_currency = str(s.get("base_currency") or "").strip() or None
- quote_currency = str(s.get("counter_currency") or s.get("quote_currency") or "").strip() or None
- strategy_id = str(s.get("id") or s.get("name") or s.get("strategy_type") or "").strip() or None
- if not account_id or not market_symbol:
- continue
- concern_id = f"{account_id}:{market_symbol}"
- if concern_id in seen:
- continue
- seen.add(concern_id)
- current = existing.get(concern_id, {})
- upsert_concern(
- id=concern_id,
- account_id=account_id,
- market_symbol=market_symbol,
- base_currency=base_currency,
- quote_currency=quote_currency,
- strategy_id=strategy_id,
- decision_profile_id=str(current.get("decision_profile_id") or "").strip() or None,
- source=str(current.get("source") or "trader_inventory"),
- status=str(current.get("status") or "active"),
- notes=str(current.get("notes") or "mirrored from trader strategy inventory").strip() or None,
- )
- synced.append({"id": concern_id, "account_id": account_id, "market_symbol": market_symbol, "base_currency": base_currency, "quote_currency": quote_currency, "strategy_id": strategy_id})
- return synced
- def table_counts() -> dict[str, int]:
- init_db()
- tables = ["concerns", "decision_profiles", "strategy_groups", "strategy_assignments", "cycles", "observations", "states", "narratives", "decisions", "actions", "coverage_gaps", "regime_samples"]
- out: dict[str, int] = {}
- with _connect() as conn:
- for table in tables:
- out[table] = int(conn.execute(f"select count(*) as n from {table}").fetchone()["n"])
- return out
- def latest_cycle() -> dict[str, Any] | None:
- init_db()
- with _connect() as conn:
- row = conn.execute("select * from cycles order by started_at desc limit 1").fetchone()
- return dict(row) if row else None
- def latest_cycles(limit: int = 20) -> list[dict[str, Any]]:
- init_db()
- with _connect() as conn:
- rows = conn.execute("select * from cycles order by started_at desc limit ?", (limit,)).fetchall()
- return [dict(r) for r in rows]
- def upsert_cycle(*, id: str, started_at: str, finished_at: str | None, status: str, trigger: str, notes: str | None = None) -> None:
- init_db()
- with _connect() as conn:
- conn.execute(
- """
- insert into cycles(id, started_at, finished_at, status, trigger, notes)
- values(?, ?, ?, ?, ?, ?)
- on conflict(id) do update set
- started_at=excluded.started_at,
- finished_at=excluded.finished_at,
- status=excluded.status,
- trigger=excluded.trigger,
- notes=excluded.notes
- """,
- (id, started_at, finished_at, status, trigger, notes),
- )
- def upsert_regime_sample(*, id: str, cycle_id: str, concern_id: str, timeframe: str, regime_json: str, captured_at: str) -> None:
- init_db()
- with _connect() as conn:
- conn.execute(
- """
- insert into regime_samples(id, cycle_id, concern_id, timeframe, regime_json, captured_at)
- values(?, ?, ?, ?, ?, ?)
- on conflict(id) do update set
- cycle_id=excluded.cycle_id,
- concern_id=excluded.concern_id,
- timeframe=excluded.timeframe,
- regime_json=excluded.regime_json,
- captured_at=excluded.captured_at
- """,
- (id, cycle_id, concern_id, timeframe, regime_json, captured_at),
- )
- def upsert_observation(*, id: str, cycle_id: str, concern_id: str | None, source: str, kind: str, payload_json: str, observed_at: str | None = None) -> None:
- init_db()
- observed_at = observed_at or _now()
- with _connect() as conn:
- conn.execute(
- """
- insert into observations(id, cycle_id, concern_id, source, kind, payload_json, observed_at)
- values(?, ?, ?, ?, ?, ?, ?)
- on conflict(id) do update set
- cycle_id=excluded.cycle_id,
- concern_id=excluded.concern_id,
- source=excluded.source,
- kind=excluded.kind,
- payload_json=excluded.payload_json,
- observed_at=excluded.observed_at
- """,
- (id, cycle_id, concern_id, source, kind, payload_json, observed_at),
- )
- def upsert_state(*, id: str, cycle_id: str, concern_id: str, market_regime: str | None, volatility_state: str | None, liquidity_state: str | None, sentiment_pressure: str | None, event_risk: str | None, execution_quality: str | None, confidence: float | None, payload_json: str, created_at: str | None = None) -> None:
- init_db()
- created_at = created_at or _now()
- with _connect() as conn:
- conn.execute(
- """
- insert into states(id, cycle_id, concern_id, market_regime, volatility_state, liquidity_state, sentiment_pressure, event_risk, execution_quality, confidence, payload_json, created_at)
- values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
- on conflict(id) do update set
- cycle_id=excluded.cycle_id,
- concern_id=excluded.concern_id,
- market_regime=excluded.market_regime,
- volatility_state=excluded.volatility_state,
- liquidity_state=excluded.liquidity_state,
- sentiment_pressure=excluded.sentiment_pressure,
- event_risk=excluded.event_risk,
- execution_quality=excluded.execution_quality,
- confidence=excluded.confidence,
- payload_json=excluded.payload_json,
- created_at=excluded.created_at
- """,
- (id, cycle_id, concern_id, market_regime, volatility_state, liquidity_state, sentiment_pressure, event_risk, execution_quality, confidence, payload_json, created_at),
- )
- def upsert_narrative(*, id: str, cycle_id: str, concern_id: str, summary: str, key_drivers_json: str, risk_flags_json: str, uncertainties_json: str, confidence: float | None, created_at: str | None = None) -> None:
- init_db()
- created_at = created_at or _now()
- with _connect() as conn:
- conn.execute(
- """
- insert into narratives(id, cycle_id, concern_id, summary, key_drivers_json, risk_flags_json, uncertainties_json, confidence, created_at)
- values(?, ?, ?, ?, ?, ?, ?, ?, ?)
- on conflict(id) do update set
- cycle_id=excluded.cycle_id,
- concern_id=excluded.concern_id,
- summary=excluded.summary,
- key_drivers_json=excluded.key_drivers_json,
- risk_flags_json=excluded.risk_flags_json,
- uncertainties_json=excluded.uncertainties_json,
- confidence=excluded.confidence,
- created_at=excluded.created_at
- """,
- (id, cycle_id, concern_id, summary, key_drivers_json, risk_flags_json, uncertainties_json, confidence, created_at),
- )
- def upsert_decision(*, id: str, cycle_id: str, concern_id: str, mode: str, action: str, target_strategy: str | None, target_policy_json: str | None, reason_summary: str | None, confidence: float | None, requires_action: bool, created_at: str | None = None) -> None:
- init_db()
- created_at = created_at or _now()
- with _connect() as conn:
- conn.execute(
- """
- insert into decisions(id, cycle_id, concern_id, mode, action, target_strategy, target_policy_json, reason_summary, confidence, requires_action, created_at)
- values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
- on conflict(id) do update set
- cycle_id=excluded.cycle_id,
- concern_id=excluded.concern_id,
- mode=excluded.mode,
- action=excluded.action,
- target_strategy=excluded.target_strategy,
- target_policy_json=excluded.target_policy_json,
- reason_summary=excluded.reason_summary,
- confidence=excluded.confidence,
- requires_action=excluded.requires_action,
- created_at=excluded.created_at
- """,
- (id, cycle_id, concern_id, mode, action, target_strategy, target_policy_json, reason_summary, confidence, 1 if requires_action else 0, created_at),
- )
- def latest_states(limit: int = 20) -> list[dict[str, Any]]:
- init_db()
- with _connect() as conn:
- rows = conn.execute("select * from states order by created_at desc limit ?", (limit,)).fetchall()
- return [dict(r) for r in rows]
- def recent_states_for_concern(*, concern_id: str, since_seconds: int, limit: int = 24) -> list[dict[str, Any]]:
- init_db()
- cutoff = datetime.now(timezone.utc).timestamp() - max(1, since_seconds)
- cutoff_iso = datetime.fromtimestamp(cutoff, tz=timezone.utc).isoformat()
- with _connect() as conn:
- rows = conn.execute(
- "select * from states where concern_id = ? and created_at >= ? order by created_at asc limit ?",
- (concern_id, cutoff_iso, limit),
- ).fetchall()
- return [dict(r) for r in rows]
- def latest_decisions(limit: int = 20) -> list[dict[str, Any]]:
- init_db()
- with _connect() as conn:
- rows = conn.execute("select * from decisions order by created_at desc limit ?", (limit,)).fetchall()
- return [dict(r) for r in rows]
- def latest_narratives(limit: int = 20) -> list[dict[str, Any]]:
- init_db()
- with _connect() as conn:
- rows = conn.execute("select * from narratives order by created_at desc limit ?", (limit,)).fetchall()
- return [dict(r) for r in rows]
- def latest_regime_samples(limit: int = 20) -> list[dict[str, Any]]:
- init_db()
- with _connect() as conn:
- rows = conn.execute("select * from regime_samples order by captured_at desc limit ?", (limit,)).fetchall()
- return [dict(r) for r in rows]
- def latest_observations(limit: int = 20, source: str | None = None) -> list[dict[str, Any]]:
- init_db()
- with _connect() as conn:
- if source:
- rows = conn.execute(
- "select * from observations where source = ? order by observed_at desc limit ?",
- (source, limit),
- ).fetchall()
- else:
- rows = conn.execute("select * from observations order by observed_at desc limit ?", (limit,)).fetchall()
- return [dict(r) for r in rows]
- def recent_regime_samples(limit: int = 200) -> list[dict[str, Any]]:
- init_db()
- with _connect() as conn:
- rows = conn.execute("select * from regime_samples order by captured_at desc limit ?", (limit,)).fetchall()
- return [dict(r) for r in rows]
|