Parcourir la source

initial version

Lukas Goldschmidt il y a 3 semaines
commit
5d4ce0beef

+ 11 - 0
.env.example

@@ -0,0 +1,11 @@
+# Argus MCP local config example
+# Copy this to .env and paste your real API keys.
+
+ARGUS_SQLITE_PATH=data/argus_mcp.sqlite3
+ARGUS_SYMBOLS=QQQ,SPY,DXY,HYG,BTCUSD,ETHUSD,VXX
+ARGUS_INTERVAL=1d
+ARGUS_FINNHUB_TTL_SECONDS=60
+ARGUS_TWELVE_DATA_TTL_SECONDS=900
+ARGUS_SNAPSHOT_TTL_SECONDS=60
+FINNHUB_TOKEN=
+TWELVE_DATA_KEY=

+ 7 - 0
.gitignore

@@ -0,0 +1,7 @@
+.venv/
+__pycache__/
+.pytest_cache/
+logs/
+data/*.sqlite3
+.env
+*.pyc

+ 17 - 0
PROJECT.md

@@ -0,0 +1,17 @@
+# Project
+
+## Goal
+Provide Hermes with a small, explainable, read-only regime feed for market sentiment and macro risk dynamics.
+
+## v1 shape
+- read-only regime snapshot tool
+- read-only snapshot history tool
+- FastAPI + FastMCP on one port
+- SQLite persistence for snapshots
+- provider adapters for Finnhub and Twelve Data
+
+## Scope guardrails
+- no trading decisions
+- no action dispatch
+- no dashboard in v1
+- keep the regime schema small and stable

+ 31 - 0
README.md

@@ -0,0 +1,31 @@
+# Argus MCP
+
+Argus MCP is a narrow, read-only market context server for Hermes.
+
+## What it does
+
+- ingests context from Finnhub and Twelve Data
+- classifies a market regime
+- stores immutable snapshots in SQLite
+- exposes read-only MCP tools
+
+## Surface
+
+- `GET /`
+- `GET /health`
+- `GET /mcp/sse`
+- MCP tools, `get_snapshot()` and `get_regime()`
+
+## Run
+
+```bash
+./run.sh
+```
+
+Default port: `8520`.
+
+## Test
+
+```bash
+./tests.sh
+```

+ 405 - 0
argus_first_idea.md

@@ -0,0 +1,405 @@
+Perfect—that’s the right move. I’ll integrate the volatility proxy logic cleanly into the main paper and attach the API details as a separate addendum.
+
+---
+
+# **ARGUS-MCP: A Market Context Sensor System for Crypto Trading (Revised)**
+
+## 1. Purpose
+
+**Argus-MCP** is a market context engine designed to observe global financial signals and derive **actionable regime information** for crypto trading systems.
+
+It does not predict prices directly. Instead, it determines:
+
+> **“What kind of market environment is currently active?”**
+
+This enables downstream systems to adapt behavior based on **regime**, not signals alone.
+
+---
+
+## 2. Conceptual Model
+
+Crypto markets are modeled as **downstream of global liquidity and risk sentiment**.
+
+Argus-MCP constructs a **compressed representation of global state** using a minimal set of cross-market signals.
+
+The system acts as a **sensor fusion layer**, combining:
+
+* macro risk signals
+* volatility structure
+* liquidity constraints
+* internal crypto dynamics
+
+---
+
+## 3. Core Signal Domains
+
+---
+
+### 3.1 Equity Risk Appetite
+
+Primary symbols:
+
+* SPY
+* QQQ
+
+Rationale:
+
+Equities represent **risk-taking behavior and liquidity availability**.
+
+QQQ is emphasized due to:
+
+* high sensitivity to liquidity
+* strong correlation with crypto
+* concentration of speculative capital
+
+Interpretation:
+
+* QQQ outperforming SPY → speculative expansion
+* both rising → broad risk-on
+* divergence → instability
+
+---
+
+### 3.2 Volatility and Market Stress (Revised)
+
+Primary symbols:
+
+* VXX
+* optional: UVXY
+
+Rationale:
+
+Direct access to the CBOE Volatility Index is often restricted.
+Instead, Argus-MCP uses **tradable volatility proxies** based on VIX futures.
+
+These instruments reflect **market demand for volatility exposure**, which is sufficient—and often advantageous—for detecting stress.
+
+---
+
+### Structural Difference (Critical)
+
+Unlike the VIX:
+
+* VXX / UVXY are based on **futures**, not options
+* they exhibit **contango decay**
+* they are influenced by tradable flows
+
+Therefore:
+
+> They measure **market stress dynamics**, not absolute implied volatility levels.
+
+---
+
+### Interpretation Model
+
+Because of decay and structure:
+
+* **absolute values are unreliable**
+* **relative changes are primary signals**
+
+---
+
+### Operational Interpretation
+
+* VXX rising → increasing stress / volatility expectations
+* VXX stable or falling → calm / compression
+* UVXY spikes → acute stress events
+
+---
+
+### Role in System
+
+* VXX → baseline volatility regime
+* UVXY → shock / spike detector
+
+---
+
+### 3.3 Currency Pressure (Global Liquidity Constraint)
+
+Primary symbol:
+
+* DXY (or proxy such as UUP)
+
+Rationale:
+
+The US dollar acts as a **global liquidity sink**.
+
+Crypto is highly sensitive to:
+
+* USD strength
+* global liquidity contraction
+
+Interpretation:
+
+* DXY rising → tightening conditions → bearish pressure
+* DXY falling → easing conditions → supportive
+
+---
+
+### 3.4 Credit and Liquidity Stress
+
+Primary symbol:
+
+* HYG
+
+Rationale:
+
+High-yield bonds reflect **real credit risk**, often preceding equity stress.
+
+Interpretation:
+
+* HYG rising → liquidity available
+* HYG falling → stress building
+
+---
+
+### 3.5 Internal Crypto Structure
+
+Primary symbols:
+
+* BTCUSD
+* ETHUSD
+
+Rationale:
+
+Internal crypto dynamics reveal **capital distribution within the ecosystem**.
+
+Interpretation:
+
+* ETH outperforming BTC → speculative expansion
+* BTC dominance → defensive positioning
+* divergence → internal regime shift
+
+---
+
+## 4. Signal Interactions
+
+---
+
+### 4.1 Liquidity Expansion Regime
+
+Characteristics:
+
+* QQQ rising
+* DXY falling
+* HYG stable or rising
+* VXX stable or declining
+
+Interpretation:
+
+* broad liquidity expansion
+* strong support for crypto
+
+---
+
+### 4.2 Liquidity Contraction / Stress Regime
+
+Characteristics:
+
+* VXX rising sharply
+* UVXY spike (optional confirmation)
+* DXY rising
+* HYG falling
+
+Interpretation:
+
+* tightening financial conditions
+* elevated systemic stress
+
+---
+
+### 4.3 Range-Bound / Compression Regime
+
+Characteristics:
+
+* VXX low and stable
+* equities sideways
+* DXY neutral
+
+Interpretation:
+
+* low volatility environment
+* high suitability for grid strategies
+
+---
+
+### 4.4 Speculative Expansion Phase
+
+Characteristics:
+
+* QQQ rising strongly
+* VXX low
+* ETH outperforming BTC
+
+Interpretation:
+
+* late-stage risk-on
+* increased volatility and instability
+
+---
+
+## 5. Design Philosophy
+
+---
+
+### 5.1 Minimalism
+
+A small number of symbols captures a large portion of global state.
+
+---
+
+### 5.2 Orthogonality
+
+Each signal represents a distinct dimension:
+
+* equities → risk
+* volatility proxies → stress dynamics
+* dollar → liquidity constraint
+* credit → funding conditions
+* crypto → internal structure
+
+---
+
+### 5.3 Relative Over Absolute
+
+Particularly for volatility proxies:
+
+> **Changes and momentum matter more than levels.**
+
+---
+
+### 5.4 Regime Awareness
+
+The system classifies **conditions**, not predictions.
+
+---
+
+## 6. Conclusion
+
+Argus-MCP models markets as **regime-driven systems shaped by liquidity and stress dynamics**.
+
+By using volatility proxies such as VXX and UVXY, it maintains functional awareness of market stress even under data access constraints.
+
+Its value lies in:
+
+> **accurately interpreting the present environment to guide adaptive behavior.**
+
+---
+
+# **ADDENDUM: Data Sources (Finnhub & Twelve Data)**
+
+## A. Finnhub
+
+### Role
+
+Realtime signal ingestion.
+
+### Key Usage
+
+* QQQ
+* SPY
+* BTCUSD
+* ETHUSD
+* VXX / UVXY
+
+---
+
+### WebSocket Endpoint
+
+```
+wss://ws.finnhub.io?token=YOUR_API_KEY
+```
+
+Subscribe:
+
+```json
+{ "type": "subscribe", "symbol": "QQQ" }
+```
+
+---
+
+### REST Endpoint
+
+```
+https://finnhub.io/api/v1/quote?symbol=QQQ&token=KEY
+```
+
+---
+
+### Free Tier Limits
+
+* ~60 requests/min
+* limited WebSocket subscriptions (~50 symbols practical)
+
+---
+
+### Notes
+
+* best source for realtime signals
+* ETFs used for volatility and macro proxies
+
+---
+
+## B. Twelve Data
+
+### Role
+
+Context and indicator enrichment.
+
+---
+
+### Key Usage
+
+* DXY (or proxy)
+* BTC/USD, ETH/USD indicators
+* optional commodities
+
+---
+
+### Time Series Endpoint
+
+```
+https://api.twelvedata.com/time_series?symbol=DXY&interval=1min&apikey=KEY
+```
+
+---
+
+### Indicator Endpoints
+
+RSI:
+
+```
+https://api.twelvedata.com/rsi?symbol=BTC/USD&interval=5min&time_period=14&apikey=KEY
+```
+
+ATR:
+
+```
+https://api.twelvedata.com/atr?symbol=BTC/USD&interval=5min&time_period=14&apikey=KEY
+```
+
+---
+
+### Free Tier Limits
+
+* ~800 requests/day
+* ~8 requests/min
+
+---
+
+### Notes
+
+* broad asset coverage
+* built-in indicators reduce computation overhead
+* REST-based (no streaming)
+
+---
+
+## Final Integration Summary
+
+* Finnhub → **fast, event-driven awareness**
+* Twelve Data → **slow, contextual understanding**
+
+Together they form a **balanced sensing architecture** aligned with Argus-MCP’s design philosophy:
+
+> **Fast signals for awareness, slower signals for meaning.**

+ 20 - 0
killserver.sh

@@ -0,0 +1,20 @@
+#!/usr/bin/env bash
+set -euo pipefail
+
+PORT="${1:-8520}"
+PID_FILE="./logs/server.pid"
+
+if [ -f "$PID_FILE" ]; then
+  PID=$(cat "$PID_FILE")
+  if kill -0 "$PID" 2>/dev/null; then
+    kill "$PID"
+    echo "Killed Argus MCP pid $PID"
+    exit 0
+  fi
+fi
+
+if [ -n "$PORT" ]; then
+  pkill -f "uvicorn argus_mcp.server:app --host 0.0.0.0 --port $PORT" || true
+fi
+
+echo "No running Argus MCP process found"

+ 6 - 0
requirements.txt

@@ -0,0 +1,6 @@
+fastapi>=0.115
+uvicorn[standard]>=0.30
+mcp>=1.0.0
+httpx>=0.27
+pydantic>=2.7
+pytest>=8.0

+ 7 - 0
restart.sh

@@ -0,0 +1,7 @@
+#!/usr/bin/env bash
+set -euo pipefail
+
+PORT="${1:-8520}"
+
+./killserver.sh "$PORT" || true
+./run.sh "$PORT"

+ 17 - 0
run.sh

@@ -0,0 +1,17 @@
+#!/usr/bin/env bash
+set -euo pipefail
+
+PORT="${1:-8520}"
+
+if [ -f .venv/bin/activate ]; then
+  # shellcheck disable=SC1091
+  source .venv/bin/activate
+fi
+
+export PYTHONPATH="${PYTHONPATH:-}:$(pwd)/src:$(pwd)"
+mkdir -p ./logs ./data
+
+uvicorn argus_mcp.server:app --host 0.0.0.0 --port "$PORT" > ./logs/server.log 2>&1 &
+PID=$!
+echo "$PID" > ./logs/server.pid
+echo "Argus MCP running on port $PORT (pid $PID)"

+ 4 - 0
src/argus_mcp/__init__.py

@@ -0,0 +1,4 @@
+"""Argus MCP package."""
+
+__all__ = ["__version__"]
+__version__ = "0.1.0"

+ 78 - 0
src/argus_mcp/config.py

@@ -0,0 +1,78 @@
+from __future__ import annotations
+
+from dataclasses import dataclass
+from pathlib import Path
+import os
+
+
+def _load_dotenv(path: Path) -> None:
+    if not path.exists():
+        return
+
+    for raw_line in path.read_text(encoding="utf-8").splitlines():
+        line = raw_line.strip()
+        if not line or line.startswith("#"):
+            continue
+        if line.startswith("export "):
+            line = line[len("export ") :].strip()
+        if "=" not in line:
+            continue
+        key, value = line.split("=", 1)
+        key = key.strip()
+        value = value.strip()
+        if not key or key in os.environ:
+            continue
+        if len(value) >= 2 and value[0] == value[-1] and value[0] in {'"', "'"}:
+            value = value[1:-1]
+        os.environ[key] = value
+
+
+def _env(name: str, default: str = "") -> str:
+    return os.getenv(name, default).strip()
+
+
+def _int_env(name: str, default: int) -> int:
+    value = _env(name)
+    if not value:
+        return default
+    try:
+        return int(value)
+    except ValueError:
+        return default
+
+
+def _split_csv(value: str, default: tuple[str, ...]) -> tuple[str, ...]:
+    if not value:
+        return default
+    items = tuple(part.strip() for part in value.split(",") if part.strip())
+    return items or default
+
+
+@dataclass(frozen=True, slots=True)
+class ArgusConfig:
+    app_name: str = "argus-mcp"
+    sqlite_path: Path = Path("data/argus_mcp.sqlite3")
+    finnhub_token: str = ""
+    twelve_data_key: str = ""
+    symbols: tuple[str, ...] = ("QQQ", "SPY", "DXY", "HYG", "BTCUSD", "ETHUSD", "VXX")
+    interval: str = "1d"
+    finnhub_ttl_seconds: int = 60
+    twelve_data_ttl_seconds: int = 900
+    snapshot_ttl_seconds: int = 60
+
+
+def load_config() -> ArgusConfig:
+    _load_dotenv(Path.cwd() / ".env")
+    return ArgusConfig(
+        sqlite_path=Path(_env("ARGUS_SQLITE_PATH", "data/argus_mcp.sqlite3")),
+        finnhub_token=_env("FINNHUB_TOKEN"),
+        twelve_data_key=_env("TWELVE_DATA_KEY"),
+        symbols=_split_csv(
+            _env("ARGUS_SYMBOLS"),
+            ("QQQ", "SPY", "DXY", "HYG", "BTCUSD", "ETHUSD", "VXX"),
+        ),
+        interval=_env("ARGUS_INTERVAL", "1d"),
+        finnhub_ttl_seconds=_int_env("ARGUS_FINNHUB_TTL_SECONDS", 60),
+        twelve_data_ttl_seconds=_int_env("ARGUS_TWELVE_DATA_TTL_SECONDS", 900),
+        snapshot_ttl_seconds=_int_env("ARGUS_SNAPSHOT_TTL_SECONDS", 60),
+    )

+ 38 - 0
src/argus_mcp/models.py

@@ -0,0 +1,38 @@
+from __future__ import annotations
+
+from datetime import datetime
+from typing import Any
+from pydantic import BaseModel, Field
+
+
+class MarketQuote(BaseModel):
+    symbol: str
+    source: str
+    timestamp: datetime | None = None
+    last: float | None = None
+    open: float | None = None
+    high: float | None = None
+    low: float | None = None
+    previous_close: float | None = None
+    change_pct: float | None = None
+    raw: dict[str, Any] = Field(default_factory=dict)
+
+
+class SignalImpact(BaseModel):
+    name: str
+    value: float
+    weight: float
+    note: str
+
+
+class RegimeSnapshot(BaseModel):
+    snapshot_id: str
+    generated_at: datetime
+    regime: str
+    confidence: float
+    summary: str
+    components: dict[str, float] = Field(default_factory=dict)
+    signals: list[MarketQuote] = Field(default_factory=list)
+    impacts: list[SignalImpact] = Field(default_factory=list)
+    source_status: dict[str, str] = Field(default_factory=dict)
+

+ 2 - 0
src/argus_mcp/providers/__init__.py

@@ -0,0 +1,2 @@
+"""Provider adapters for Argus."""
+

+ 13 - 0
src/argus_mcp/providers/base.py

@@ -0,0 +1,13 @@
+from __future__ import annotations
+
+from typing import Protocol
+
+from argus_mcp.models import MarketQuote
+
+
+class MarketProvider(Protocol):
+    name: str
+
+    async def fetch_quote(self, symbol: str) -> MarketQuote | None:
+        ...
+

+ 52 - 0
src/argus_mcp/providers/finnhub.py

@@ -0,0 +1,52 @@
+from __future__ import annotations
+
+from datetime import datetime, timezone
+
+import httpx
+
+from argus_mcp.models import MarketQuote
+
+
+class FinnhubProvider:
+    name = "finnhub"
+
+    def __init__(self, token: str) -> None:
+        self.token = token.strip()
+
+    @property
+    def enabled(self) -> bool:
+        return bool(self.token)
+
+    async def fetch_quote(self, symbol: str) -> MarketQuote | None:
+        if not self.enabled:
+            return None
+
+        url = "https://finnhub.io/api/v1/quote"
+        params = {"symbol": symbol, "token": self.token}
+
+        async with httpx.AsyncClient(timeout=15.0) as client:
+            response = await client.get(url, params=params)
+            response.raise_for_status()
+            payload = response.json()
+
+        timestamp = payload.get("t")
+        dt = datetime.fromtimestamp(timestamp, tz=timezone.utc) if timestamp else None
+        current = payload.get("c")
+        previous = payload.get("pc")
+        change_pct = None
+        if current is not None and previous not in (None, 0):
+            change_pct = ((float(current) - float(previous)) / float(previous)) * 100.0
+
+        return MarketQuote(
+            symbol=symbol,
+            source=self.name,
+            timestamp=dt,
+            last=current,
+            open=payload.get("o"),
+            high=payload.get("h"),
+            low=payload.get("l"),
+            previous_close=previous,
+            change_pct=change_pct,
+            raw=payload,
+        )
+

+ 61 - 0
src/argus_mcp/providers/twelve_data.py

@@ -0,0 +1,61 @@
+from __future__ import annotations
+
+from datetime import datetime, timezone
+
+import httpx
+
+from argus_mcp.models import MarketQuote
+
+
+class TwelveDataProvider:
+    name = "twelve_data"
+
+    def __init__(self, api_key: str) -> None:
+        self.api_key = api_key.strip()
+
+    @property
+    def enabled(self) -> bool:
+        return bool(self.api_key)
+
+    async def fetch_quote(self, symbol: str) -> MarketQuote | None:
+        if not self.enabled:
+            return None
+
+        url = "https://api.twelvedata.com/quote"
+        params = {"symbol": symbol, "apikey": self.api_key}
+
+        async with httpx.AsyncClient(timeout=15.0) as client:
+            response = await client.get(url, params=params)
+            response.raise_for_status()
+            payload = response.json()
+
+        if payload.get("status") == "error":
+            return None
+
+        timestamp = payload.get("datetime") or payload.get("timestamp")
+        dt = None
+        if isinstance(timestamp, str):
+            try:
+                dt = datetime.fromisoformat(timestamp.replace("Z", "+00:00"))
+            except ValueError:
+                dt = None
+        elif isinstance(timestamp, (int, float)):
+            dt = datetime.fromtimestamp(timestamp, tz=timezone.utc)
+
+        current = payload.get("close") or payload.get("price")
+        previous = payload.get("previous_close")
+        change_pct = payload.get("percent_change")
+
+        return MarketQuote(
+            symbol=symbol,
+            source=self.name,
+            timestamp=dt,
+            last=current,
+            open=payload.get("open"),
+            high=payload.get("high"),
+            low=payload.get("low"),
+            previous_close=previous,
+            change_pct=change_pct,
+            raw=payload,
+        )
+

+ 115 - 0
src/argus_mcp/regime.py

@@ -0,0 +1,115 @@
+from __future__ import annotations
+
+from collections.abc import Iterable
+from datetime import datetime, timezone
+from uuid import uuid4
+
+from argus_mcp.models import MarketQuote, RegimeSnapshot, SignalImpact
+
+
+def _change(quote: MarketQuote | None) -> float:
+    if quote is None or quote.change_pct is None:
+        return 0.0
+    return float(quote.change_pct)
+
+
+def _score_component(value: float, weight: float) -> float:
+    return value * weight
+
+
+def build_regime_snapshot(quotes: Iterable[MarketQuote]) -> RegimeSnapshot:
+    quote_list = list(quotes)
+    by_symbol = {quote.symbol.upper(): quote for quote in quote_list}
+
+    qqq = by_symbol.get("QQQ")
+    spy = by_symbol.get("SPY")
+    dxy = by_symbol.get("DXY") or by_symbol.get("UUP")
+    hyg = by_symbol.get("HYG")
+    btc = by_symbol.get("BTCUSD") or by_symbol.get("BTC/USD")
+    eth = by_symbol.get("ETHUSD") or by_symbol.get("ETH/USD")
+    vxx = by_symbol.get("VXX")
+
+    risk = 0.0
+    stress = 0.0
+    liquidity = 0.0
+    compression = 0.0
+    impacts: list[SignalImpact] = []
+
+    if qqq and spy:
+        spread = _change(qqq) - _change(spy)
+        delta = _score_component(spread, 0.35)
+        risk += delta
+        impacts.append(SignalImpact(name="qqq_vs_spy", value=spread, weight=0.35, note="Speculative leadership spread"))
+
+    if btc:
+        delta = _score_component(_change(btc), 0.3)
+        risk += delta
+        impacts.append(SignalImpact(name="btc_momentum", value=_change(btc), weight=0.3, note="Crypto bid strength"))
+
+    if eth:
+        delta = _score_component(_change(eth), 0.25)
+        risk += delta
+        impacts.append(SignalImpact(name="eth_momentum", value=_change(eth), weight=0.25, note="Altcoin relative strength"))
+
+    if dxy:
+        delta = _score_component(_change(dxy), 0.45)
+        stress += delta
+        liquidity -= delta
+        impacts.append(SignalImpact(name="dollar_strength", value=_change(dxy), weight=0.45, note="Dollar pressure on liquidity"))
+
+    if hyg:
+        delta = _score_component(_change(hyg), 0.35)
+        liquidity += delta
+        stress -= delta
+        impacts.append(SignalImpact(name="credit_spread_proxy", value=_change(hyg), weight=0.35, note="Credit appetite proxy"))
+
+    if vxx:
+        delta = _score_component(_change(vxx), 0.6)
+        stress += delta
+        compression -= abs(delta)
+        impacts.append(SignalImpact(name="volatility_proxy", value=_change(vxx), weight=0.6, note="Stress and vol demand proxy"))
+
+    if not quote_list:
+        regime = "no_data"
+        confidence = 0.0
+        summary = "No provider data available yet."
+    else:
+        scores = {
+            "risk_on": risk,
+            "stress": stress,
+            "liquidity": liquidity,
+            "compression": compression,
+        }
+        regime = max(scores, key=scores.get)
+        top_score = scores[regime]
+        if abs(top_score) < 0.25:
+            regime = "neutral"
+            confidence = 0.1
+            summary = "Signals are mixed or too weak to call a regime with confidence."
+        else:
+            confidence = min(1.0, max(0.1, abs(top_score) / 3.0))
+            summary = {
+                "risk_on": "Speculative risk appetite is leading.",
+                "stress": "Volatility or funding stress is dominating.",
+                "liquidity": "Liquidity support is improving.",
+                "compression": "Market conditions look compressed and range-like.",
+            }[regime]
+
+    components = {
+        "risk_on": risk,
+        "stress": stress,
+        "liquidity": liquidity,
+        "compression": compression,
+    }
+
+    return RegimeSnapshot(
+        snapshot_id=uuid4().hex,
+        generated_at=datetime.now(timezone.utc),
+        regime=regime,
+        confidence=confidence,
+        summary=summary,
+        components=components,
+        signals=quote_list,
+        impacts=impacts,
+        source_status={quote.source: "ok" for quote in quote_list},
+    )

+ 41 - 0
src/argus_mcp/server.py

@@ -0,0 +1,41 @@
+from __future__ import annotations
+
+from fastapi import FastAPI
+from mcp.server.fastmcp import FastMCP
+from mcp.server.transport_security import TransportSecuritySettings
+
+from argus_mcp.service import ArgusService
+
+
+service = ArgusService.create()
+mcp = FastMCP(
+    "argus-mcp",
+    transport_security=TransportSecuritySettings(
+        enable_dns_rebinding_protection=False,
+    ),
+)
+
+
+@mcp.tool(description="Return the latest read-only Argus snapshot, optionally refreshing from providers first.")
+async def get_snapshot(refresh: bool = False):
+    snapshot = await service.get_snapshot(refresh=refresh)
+    return snapshot.model_dump(mode="json")
+
+
+@mcp.tool(description="Return the current regime classification only, as a compact read-only context payload.")
+async def get_regime(refresh: bool = False):
+    return await service.get_regime(refresh=refresh)
+
+
+app = FastAPI(title="Argus MCP")
+app.mount("/mcp", mcp.sse_app())
+
+
+@app.get("/")
+def root():
+    return {"status": "ok", "service": "argus-mcp", "transport": "fastmcp+sse", "mount": "/mcp"}
+
+
+@app.get("/health")
+def health():
+    return service.health()

+ 147 - 0
src/argus_mcp/service.py

@@ -0,0 +1,147 @@
+from __future__ import annotations
+
+from dataclasses import dataclass, field
+from datetime import datetime, timezone
+
+from argus_mcp.config import ArgusConfig, load_config
+from argus_mcp.models import MarketQuote, RegimeSnapshot
+from argus_mcp.providers.finnhub import FinnhubProvider
+from argus_mcp.providers.twelve_data import TwelveDataProvider
+from argus_mcp.regime import build_regime_snapshot
+from argus_mcp.storage import SnapshotStore
+
+
+SYMBOL_ALIASES: dict[str, dict[str, str]] = {
+    "QQQ": {"finnhub": "QQQ", "twelve_data": "QQQ"},
+    "SPY": {"finnhub": "SPY", "twelve_data": "SPY"},
+    "HYG": {"finnhub": "HYG", "twelve_data": "HYG"},
+    "DXY": {"finnhub": "UUP", "twelve_data": "DXY"},
+    "UUP": {"finnhub": "UUP", "twelve_data": "UUP"},
+    "VXX": {"finnhub": "VXX", "twelve_data": "VXX"},
+    "BTCUSD": {"finnhub": "BINANCE:BTCUSDT", "twelve_data": "BTC/USD"},
+    "BTC/USD": {"finnhub": "BINANCE:BTCUSDT", "twelve_data": "BTC/USD"},
+    "ETHUSD": {"finnhub": "BINANCE:ETHUSDT", "twelve_data": "ETH/USD"},
+    "ETH/USD": {"finnhub": "BINANCE:ETHUSDT", "twelve_data": "ETH/USD"},
+}
+
+
+@dataclass(slots=True)
+class ArgusService:
+    config: ArgusConfig
+    store: SnapshotStore
+    finnhub: FinnhubProvider
+    twelve_data: TwelveDataProvider
+    _last_source_status: dict[str, str] = field(default_factory=dict, init=False, repr=False)
+
+    @classmethod
+    def create(cls, config: ArgusConfig | None = None) -> "ArgusService":
+        cfg = config or load_config()
+        return cls(
+            config=cfg,
+            store=SnapshotStore(cfg.sqlite_path),
+            finnhub=FinnhubProvider(cfg.finnhub_token),
+            twelve_data=TwelveDataProvider(cfg.twelve_data_key),
+        )
+
+    def provider_summary(self) -> dict[str, bool]:
+        return {
+            "finnhub": self.finnhub.enabled,
+            "twelve_data": self.twelve_data.enabled,
+        }
+
+    def _cached_quote_is_fresh(self, symbol: str, source: str, ttl_seconds: int) -> tuple[MarketQuote | None, bool]:
+        cached = self.store.latest_quote(symbol, source)
+        if cached is None:
+            return None, False
+        quote, fetched_at = cached
+        age_seconds = (datetime.now(timezone.utc) - fetched_at).total_seconds()
+        return quote, age_seconds <= ttl_seconds
+
+    async def _fetch_or_cache(
+        self,
+        canonical_symbol: str,
+        provider,
+        provider_symbol: str,
+        ttl_seconds: int,
+    ) -> tuple[MarketQuote | None, str]:
+        cached_quote, is_fresh = self._cached_quote_is_fresh(canonical_symbol, provider.name, ttl_seconds)
+        if cached_quote is not None and is_fresh:
+            return cached_quote, f"cached:{provider.name}"
+
+        try:
+            quote = await provider.fetch_quote(provider_symbol)
+        except Exception:
+            quote = None
+
+        if quote is not None:
+            quote.symbol = canonical_symbol
+            self.store.save_quote(quote)
+            return quote, f"fetched:{provider.name}"
+
+        if cached_quote is not None:
+            return cached_quote, f"stale_cache:{provider.name}"
+
+        return None, f"missing:{provider.name}"
+
+    async def fetch_quotes(self) -> list[MarketQuote]:
+        quotes: list[MarketQuote] = []
+        source_status: dict[str, str] = {}
+        for symbol in self.config.symbols:
+            aliases = SYMBOL_ALIASES.get(symbol.upper(), {"finnhub": symbol, "twelve_data": symbol})
+            quote, status = await self._fetch_or_cache(
+                symbol,
+                self.finnhub,
+                aliases["finnhub"],
+                self.config.finnhub_ttl_seconds,
+            )
+            source_status[f"{symbol}:finnhub"] = status
+            if quote is None:
+                quote, status = await self._fetch_or_cache(
+                    symbol,
+                    self.twelve_data,
+                    aliases["twelve_data"],
+                    self.config.twelve_data_ttl_seconds,
+                )
+                source_status[f"{symbol}:twelve_data"] = status
+            if quote is not None:
+                quote.symbol = symbol
+                quotes.append(quote)
+        self._last_source_status = source_status
+        return quotes
+
+    async def build_snapshot(self) -> RegimeSnapshot:
+        quotes = await self.fetch_quotes()
+        snapshot = build_regime_snapshot(quotes)
+        snapshot.source_status = dict(self._last_source_status)
+        self.store.save(snapshot)
+        return snapshot
+
+    async def get_snapshot(self, refresh: bool = False) -> RegimeSnapshot:
+        latest = self.store.latest()
+        if refresh or latest is None:
+            return await self.build_snapshot()
+        fetched_at = latest.generated_at.astimezone(timezone.utc)
+        age_seconds = (datetime.now(timezone.utc) - fetched_at).total_seconds()
+        if age_seconds > self.config.snapshot_ttl_seconds:
+            return await self.build_snapshot()
+        return latest
+
+    async def get_regime(self, refresh: bool = False) -> dict:
+        snapshot = await self.get_snapshot(refresh=refresh)
+        return {
+            "snapshot_id": snapshot.snapshot_id,
+            "generated_at": snapshot.generated_at,
+            "regime": snapshot.regime,
+            "confidence": snapshot.confidence,
+            "summary": snapshot.summary,
+            "components": snapshot.components,
+        }
+
+    def health(self) -> dict:
+        latest = self.store.latest()
+        return {
+            "status": "ok",
+            "providers": self.provider_summary(),
+            "snapshot_count": self.store.count(),
+            "latest_snapshot_at": latest.generated_at.isoformat() if latest else None,
+        }

+ 116 - 0
src/argus_mcp/storage.py

@@ -0,0 +1,116 @@
+from __future__ import annotations
+
+import json
+import sqlite3
+from pathlib import Path
+from datetime import datetime, timezone
+
+from argus_mcp.models import MarketQuote
+from argus_mcp.models import RegimeSnapshot
+
+
+class SnapshotStore:
+    def __init__(self, path: Path) -> None:
+        self.path = path
+        self.path.parent.mkdir(parents=True, exist_ok=True)
+        self._init_db()
+
+    def _connect(self) -> sqlite3.Connection:
+        conn = sqlite3.connect(self.path)
+        conn.row_factory = sqlite3.Row
+        return conn
+
+    def _init_db(self) -> None:
+        with self._connect() as conn:
+            conn.execute(
+                """
+                CREATE TABLE IF NOT EXISTS snapshots (
+                    snapshot_id TEXT PRIMARY KEY,
+                    generated_at TEXT NOT NULL,
+                    regime TEXT NOT NULL,
+                    confidence REAL NOT NULL,
+                    payload_json TEXT NOT NULL
+                )
+                """
+            )
+            conn.execute(
+                """
+                CREATE TABLE IF NOT EXISTS quote_cache (
+                    symbol TEXT NOT NULL,
+                    source TEXT NOT NULL,
+                    fetched_at TEXT NOT NULL,
+                    payload_json TEXT NOT NULL,
+                    PRIMARY KEY (symbol, source)
+                )
+                """
+            )
+            conn.commit()
+
+    def save(self, snapshot: RegimeSnapshot) -> None:
+        payload = snapshot.model_dump(mode="json")
+        with self._connect() as conn:
+            conn.execute(
+                """
+                INSERT OR REPLACE INTO snapshots
+                (snapshot_id, generated_at, regime, confidence, payload_json)
+                VALUES (?, ?, ?, ?, ?)
+                """,
+                (
+                    snapshot.snapshot_id,
+                    snapshot.generated_at.isoformat(),
+                    snapshot.regime,
+                    snapshot.confidence,
+                    json.dumps(payload, ensure_ascii=False),
+                ),
+            )
+            conn.commit()
+
+    def latest(self) -> RegimeSnapshot | None:
+        with self._connect() as conn:
+            row = conn.execute(
+                "SELECT payload_json FROM snapshots ORDER BY generated_at DESC LIMIT 1"
+            ).fetchone()
+        if row is None:
+            return None
+        return RegimeSnapshot.model_validate_json(row["payload_json"])
+
+    def count(self) -> int:
+        with self._connect() as conn:
+            row = conn.execute("SELECT COUNT(*) AS n FROM snapshots").fetchone()
+        return int(row["n"] if row else 0)
+
+    def save_quote(self, quote: MarketQuote, fetched_at: datetime | None = None) -> None:
+        cached_at = fetched_at or datetime.now(timezone.utc)
+        payload = quote.model_dump(mode="json")
+        with self._connect() as conn:
+            conn.execute(
+                """
+                INSERT OR REPLACE INTO quote_cache
+                (symbol, source, fetched_at, payload_json)
+                VALUES (?, ?, ?, ?)
+                """,
+                (
+                    quote.symbol,
+                    quote.source,
+                    cached_at.isoformat(),
+                    json.dumps(payload, ensure_ascii=False),
+                ),
+            )
+            conn.commit()
+
+    def latest_quote(self, symbol: str, source: str) -> tuple[MarketQuote, datetime] | None:
+        with self._connect() as conn:
+            row = conn.execute(
+                """
+                SELECT payload_json, fetched_at
+                FROM quote_cache
+                WHERE symbol = ? AND source = ?
+                LIMIT 1
+                """,
+                (symbol, source),
+            ).fetchone()
+        if row is None:
+            return None
+        payload = MarketQuote.model_validate_json(row["payload_json"])
+        fetched_at = datetime.fromisoformat(row["fetched_at"])
+        return payload, fetched_at

+ 10 - 0
tests.sh

@@ -0,0 +1,10 @@
+#!/usr/bin/env bash
+set -euo pipefail
+
+if [ -f .venv/bin/activate ]; then
+  # shellcheck disable=SC1091
+  source .venv/bin/activate
+fi
+
+export PYTHONPATH="${PYTHONPATH:-}:$(pwd)/src:$(pwd)"
+pytest -q

+ 22 - 0
tests/test_regime.py

@@ -0,0 +1,22 @@
+from argus_mcp.models import MarketQuote
+from argus_mcp.regime import build_regime_snapshot
+
+
+def test_regime_prefers_risk_on_when_qqq_leads():
+    snapshot = build_regime_snapshot(
+        [
+            MarketQuote(symbol="QQQ", source="test", change_pct=2.0),
+            MarketQuote(symbol="SPY", source="test", change_pct=0.5),
+            MarketQuote(symbol="BTCUSD", source="test", change_pct=1.0),
+        ]
+    )
+
+    assert snapshot.regime == "risk_on"
+    assert snapshot.confidence > 0
+
+
+def test_regime_handles_no_data():
+    snapshot = build_regime_snapshot([])
+
+    assert snapshot.regime == "no_data"
+    assert snapshot.confidence == 0.0

+ 11 - 0
tests/test_server.py

@@ -0,0 +1,11 @@
+from fastapi.testclient import TestClient
+
+from argus_mcp.server import app
+
+
+def test_health_route_exists():
+    client = TestClient(app)
+    response = client.get("/health")
+
+    assert response.status_code == 200
+    assert response.json()["status"] == "ok"

+ 38 - 0
tests/test_storage.py

@@ -0,0 +1,38 @@
+from datetime import datetime, timezone
+from pathlib import Path
+
+from argus_mcp.models import MarketQuote, RegimeSnapshot
+from argus_mcp.storage import SnapshotStore
+
+
+def test_storage_roundtrip(tmp_path: Path):
+    store = SnapshotStore(tmp_path / "argus.sqlite3")
+    snapshot = RegimeSnapshot(
+        snapshot_id="abc123",
+        generated_at=datetime.now(timezone.utc),
+        regime="risk_on",
+        confidence=0.7,
+        summary="Test",
+        signals=[MarketQuote(symbol="QQQ", source="test", change_pct=1.0)],
+    )
+
+    store.save(snapshot)
+
+    latest = store.latest()
+    assert latest is not None
+    assert latest.snapshot_id == "abc123"
+    assert store.count() == 1
+
+
+def test_quote_cache_roundtrip(tmp_path: Path):
+    store = SnapshotStore(tmp_path / "argus.sqlite3")
+    quote = MarketQuote(symbol="QQQ", source="finnhub", last=500.0, change_pct=1.2)
+
+    store.save_quote(quote, fetched_at=datetime.now(timezone.utc))
+    cached = store.latest_quote("QQQ", "finnhub")
+
+    assert cached is not None
+    cached_quote, fetched_at = cached
+    assert cached_quote.symbol == "QQQ"
+    assert cached_quote.source == "finnhub"
+    assert fetched_at.tzinfo is not None