from __future__ import annotations from dataclasses import dataclass, field from datetime import datetime, timezone from argus_mcp.config import ArgusConfig, load_config from argus_mcp.models import MarketQuote, RegimeSnapshot from argus_mcp.providers.finnhub import FinnhubProvider from argus_mcp.providers.twelve_data import TwelveDataProvider from argus_mcp.regime import build_regime_snapshot from argus_mcp.storage import SnapshotStore SYMBOL_ALIASES: dict[str, dict[str, str]] = { "QQQ": {"finnhub": "QQQ", "twelve_data": "QQQ"}, "SPY": {"finnhub": "SPY", "twelve_data": "SPY"}, "HYG": {"finnhub": "HYG", "twelve_data": "HYG"}, "DXY": {"finnhub": "UUP", "twelve_data": "DXY"}, "UUP": {"finnhub": "UUP", "twelve_data": "UUP"}, "VXX": {"finnhub": "VXX", "twelve_data": "VXX"}, "BTCUSD": {"finnhub": "BINANCE:BTCUSDT", "twelve_data": "BTC/USD"}, "BTC/USD": {"finnhub": "BINANCE:BTCUSDT", "twelve_data": "BTC/USD"}, "ETHUSD": {"finnhub": "BINANCE:ETHUSDT", "twelve_data": "ETH/USD"}, "ETH/USD": {"finnhub": "BINANCE:ETHUSDT", "twelve_data": "ETH/USD"}, } @dataclass(slots=True) class ArgusService: config: ArgusConfig store: SnapshotStore finnhub: FinnhubProvider twelve_data: TwelveDataProvider _last_source_status: dict[str, str] = field(default_factory=dict, init=False, repr=False) @classmethod def create(cls, config: ArgusConfig | None = None) -> "ArgusService": cfg = config or load_config() return cls( config=cfg, store=SnapshotStore(cfg.sqlite_path), finnhub=FinnhubProvider(cfg.finnhub_token), twelve_data=TwelveDataProvider(cfg.twelve_data_key), ) def provider_summary(self) -> dict[str, bool]: return { "finnhub": self.finnhub.enabled, "twelve_data": self.twelve_data.enabled, } def _cached_quote_is_fresh(self, symbol: str, source: str, ttl_seconds: int) -> tuple[MarketQuote | None, bool]: cached = self.store.latest_quote(symbol, source) if cached is None: return None, False quote, fetched_at = cached age_seconds = (datetime.now(timezone.utc) - fetched_at).total_seconds() return quote, age_seconds <= ttl_seconds async def _fetch_or_cache( self, canonical_symbol: str, provider, provider_symbol: str, ttl_seconds: int, ) -> tuple[MarketQuote | None, str]: cached_quote, is_fresh = self._cached_quote_is_fresh(canonical_symbol, provider.name, ttl_seconds) if cached_quote is not None and is_fresh: return cached_quote, f"cached:{provider.name}" try: quote = await provider.fetch_quote(provider_symbol) except Exception: quote = None if quote is not None: quote.symbol = canonical_symbol self.store.save_quote(quote) return quote, f"fetched:{provider.name}" if cached_quote is not None: return cached_quote, f"stale_cache:{provider.name}" return None, f"missing:{provider.name}" async def fetch_quotes(self) -> list[MarketQuote]: quotes: list[MarketQuote] = [] source_status: dict[str, str] = {} for symbol in self.config.symbols: aliases = SYMBOL_ALIASES.get(symbol.upper(), {"finnhub": symbol, "twelve_data": symbol}) quote, status = await self._fetch_or_cache( symbol, self.finnhub, aliases["finnhub"], self.config.finnhub_ttl_seconds, ) source_status[f"{symbol}:finnhub"] = status if quote is None: quote, status = await self._fetch_or_cache( symbol, self.twelve_data, aliases["twelve_data"], self.config.twelve_data_ttl_seconds, ) source_status[f"{symbol}:twelve_data"] = status if quote is not None: quote.symbol = symbol quotes.append(quote) self._last_source_status = source_status return quotes async def build_snapshot(self) -> RegimeSnapshot: quotes = await self.fetch_quotes() snapshot = build_regime_snapshot(quotes) snapshot.source_status = dict(self._last_source_status) self.store.save(snapshot) return snapshot async def get_snapshot(self, refresh: bool = False) -> RegimeSnapshot: latest = self.store.latest() if refresh or latest is None: return await self.build_snapshot() fetched_at = latest.generated_at.astimezone(timezone.utc) age_seconds = (datetime.now(timezone.utc) - fetched_at).total_seconds() if age_seconds > self.config.snapshot_ttl_seconds: return await self.build_snapshot() return latest async def get_regime(self, refresh: bool = False) -> dict: snapshot = await self.get_snapshot(refresh=refresh) return { "snapshot_id": snapshot.snapshot_id, "generated_at": snapshot.generated_at, "regime": snapshot.regime, "confidence": snapshot.confidence, "summary": snapshot.summary, "components": snapshot.components, } def health(self) -> dict: latest = self.store.latest() return { "status": "ok", "providers": self.provider_summary(), "snapshot_count": self.store.count(), "latest_snapshot_at": latest.generated_at.isoformat() if latest else None, }