| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147 |
- from __future__ import annotations
- from dataclasses import dataclass, field
- from datetime import datetime, timezone
- from argus_mcp.config import ArgusConfig, load_config
- from argus_mcp.models import MarketQuote, RegimeSnapshot
- from argus_mcp.providers.finnhub import FinnhubProvider
- from argus_mcp.providers.twelve_data import TwelveDataProvider
- from argus_mcp.regime import build_regime_snapshot
- from argus_mcp.storage import SnapshotStore
- SYMBOL_ALIASES: dict[str, dict[str, str]] = {
- "QQQ": {"finnhub": "QQQ", "twelve_data": "QQQ"},
- "SPY": {"finnhub": "SPY", "twelve_data": "SPY"},
- "HYG": {"finnhub": "HYG", "twelve_data": "HYG"},
- "DXY": {"finnhub": "UUP", "twelve_data": "DXY"},
- "UUP": {"finnhub": "UUP", "twelve_data": "UUP"},
- "VXX": {"finnhub": "VXX", "twelve_data": "VXX"},
- "BTCUSD": {"finnhub": "BINANCE:BTCUSDT", "twelve_data": "BTC/USD"},
- "BTC/USD": {"finnhub": "BINANCE:BTCUSDT", "twelve_data": "BTC/USD"},
- "ETHUSD": {"finnhub": "BINANCE:ETHUSDT", "twelve_data": "ETH/USD"},
- "ETH/USD": {"finnhub": "BINANCE:ETHUSDT", "twelve_data": "ETH/USD"},
- }
- @dataclass(slots=True)
- class ArgusService:
- config: ArgusConfig
- store: SnapshotStore
- finnhub: FinnhubProvider
- twelve_data: TwelveDataProvider
- _last_source_status: dict[str, str] = field(default_factory=dict, init=False, repr=False)
- @classmethod
- def create(cls, config: ArgusConfig | None = None) -> "ArgusService":
- cfg = config or load_config()
- return cls(
- config=cfg,
- store=SnapshotStore(cfg.sqlite_path),
- finnhub=FinnhubProvider(cfg.finnhub_token),
- twelve_data=TwelveDataProvider(cfg.twelve_data_key),
- )
- def provider_summary(self) -> dict[str, bool]:
- return {
- "finnhub": self.finnhub.enabled,
- "twelve_data": self.twelve_data.enabled,
- }
- def _cached_quote_is_fresh(self, symbol: str, source: str, ttl_seconds: int) -> tuple[MarketQuote | None, bool]:
- cached = self.store.latest_quote(symbol, source)
- if cached is None:
- return None, False
- quote, fetched_at = cached
- age_seconds = (datetime.now(timezone.utc) - fetched_at).total_seconds()
- return quote, age_seconds <= ttl_seconds
- async def _fetch_or_cache(
- self,
- canonical_symbol: str,
- provider,
- provider_symbol: str,
- ttl_seconds: int,
- ) -> tuple[MarketQuote | None, str]:
- cached_quote, is_fresh = self._cached_quote_is_fresh(canonical_symbol, provider.name, ttl_seconds)
- if cached_quote is not None and is_fresh:
- return cached_quote, f"cached:{provider.name}"
- try:
- quote = await provider.fetch_quote(provider_symbol)
- except Exception:
- quote = None
- if quote is not None:
- quote.symbol = canonical_symbol
- self.store.save_quote(quote)
- return quote, f"fetched:{provider.name}"
- if cached_quote is not None:
- return cached_quote, f"stale_cache:{provider.name}"
- return None, f"missing:{provider.name}"
- async def fetch_quotes(self) -> list[MarketQuote]:
- quotes: list[MarketQuote] = []
- source_status: dict[str, str] = {}
- for symbol in self.config.symbols:
- aliases = SYMBOL_ALIASES.get(symbol.upper(), {"finnhub": symbol, "twelve_data": symbol})
- quote, status = await self._fetch_or_cache(
- symbol,
- self.finnhub,
- aliases["finnhub"],
- self.config.finnhub_ttl_seconds,
- )
- source_status[f"{symbol}:finnhub"] = status
- if quote is None:
- quote, status = await self._fetch_or_cache(
- symbol,
- self.twelve_data,
- aliases["twelve_data"],
- self.config.twelve_data_ttl_seconds,
- )
- source_status[f"{symbol}:twelve_data"] = status
- if quote is not None:
- quote.symbol = symbol
- quotes.append(quote)
- self._last_source_status = source_status
- return quotes
- async def build_snapshot(self) -> RegimeSnapshot:
- quotes = await self.fetch_quotes()
- snapshot = build_regime_snapshot(quotes)
- snapshot.source_status = dict(self._last_source_status)
- self.store.save(snapshot)
- return snapshot
- async def get_snapshot(self, refresh: bool = False) -> RegimeSnapshot:
- latest = self.store.latest()
- if refresh or latest is None:
- return await self.build_snapshot()
- fetched_at = latest.generated_at.astimezone(timezone.utc)
- age_seconds = (datetime.now(timezone.utc) - fetched_at).total_seconds()
- if age_seconds > self.config.snapshot_ttl_seconds:
- return await self.build_snapshot()
- return latest
- async def get_regime(self, refresh: bool = False) -> dict:
- snapshot = await self.get_snapshot(refresh=refresh)
- return {
- "snapshot_id": snapshot.snapshot_id,
- "generated_at": snapshot.generated_at,
- "regime": snapshot.regime,
- "confidence": snapshot.confidence,
- "summary": snapshot.summary,
- "components": snapshot.components,
- }
- def health(self) -> dict:
- latest = self.store.latest()
- return {
- "status": "ok",
- "providers": self.provider_summary(),
- "snapshot_count": self.store.count(),
- "latest_snapshot_at": latest.generated_at.isoformat() if latest else None,
- }
|