service.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. from __future__ import annotations
  2. from dataclasses import dataclass, field
  3. from datetime import datetime, timezone
  4. from argus_mcp.config import ArgusConfig, load_config
  5. from argus_mcp.models import MarketQuote, RegimeSnapshot
  6. from argus_mcp.providers.finnhub import FinnhubProvider
  7. from argus_mcp.providers.twelve_data import TwelveDataProvider
  8. from argus_mcp.regime import build_regime_snapshot
  9. from argus_mcp.storage import SnapshotStore
  10. SYMBOL_ALIASES: dict[str, dict[str, str]] = {
  11. "QQQ": {"finnhub": "QQQ", "twelve_data": "QQQ"},
  12. "SPY": {"finnhub": "SPY", "twelve_data": "SPY"},
  13. "HYG": {"finnhub": "HYG", "twelve_data": "HYG"},
  14. "DXY": {"finnhub": "UUP", "twelve_data": "DXY"},
  15. "UUP": {"finnhub": "UUP", "twelve_data": "UUP"},
  16. "VXX": {"finnhub": "VXX", "twelve_data": "VXX"},
  17. "BTCUSD": {"finnhub": "BINANCE:BTCUSDT", "twelve_data": "BTC/USD"},
  18. "BTC/USD": {"finnhub": "BINANCE:BTCUSDT", "twelve_data": "BTC/USD"},
  19. "ETHUSD": {"finnhub": "BINANCE:ETHUSDT", "twelve_data": "ETH/USD"},
  20. "ETH/USD": {"finnhub": "BINANCE:ETHUSDT", "twelve_data": "ETH/USD"},
  21. }
  22. @dataclass(slots=True)
  23. class ArgusService:
  24. config: ArgusConfig
  25. store: SnapshotStore
  26. finnhub: FinnhubProvider
  27. twelve_data: TwelveDataProvider
  28. _last_source_status: dict[str, str] = field(default_factory=dict, init=False, repr=False)
  29. @classmethod
  30. def create(cls, config: ArgusConfig | None = None) -> "ArgusService":
  31. cfg = config or load_config()
  32. return cls(
  33. config=cfg,
  34. store=SnapshotStore(cfg.sqlite_path),
  35. finnhub=FinnhubProvider(cfg.finnhub_token),
  36. twelve_data=TwelveDataProvider(cfg.twelve_data_key),
  37. )
  38. def provider_summary(self) -> dict[str, bool]:
  39. return {
  40. "finnhub": self.finnhub.enabled,
  41. "twelve_data": self.twelve_data.enabled,
  42. }
  43. def _cached_quote_is_fresh(self, symbol: str, source: str, ttl_seconds: int) -> tuple[MarketQuote | None, bool]:
  44. cached = self.store.latest_quote(symbol, source)
  45. if cached is None:
  46. return None, False
  47. quote, fetched_at = cached
  48. age_seconds = (datetime.now(timezone.utc) - fetched_at).total_seconds()
  49. return quote, age_seconds <= ttl_seconds
  50. async def _fetch_or_cache(
  51. self,
  52. canonical_symbol: str,
  53. provider,
  54. provider_symbol: str,
  55. ttl_seconds: int,
  56. ) -> tuple[MarketQuote | None, str]:
  57. cached_quote, is_fresh = self._cached_quote_is_fresh(canonical_symbol, provider.name, ttl_seconds)
  58. if cached_quote is not None and is_fresh:
  59. return cached_quote, f"cached:{provider.name}"
  60. try:
  61. quote = await provider.fetch_quote(provider_symbol)
  62. except Exception:
  63. quote = None
  64. if quote is not None:
  65. quote.symbol = canonical_symbol
  66. self.store.save_quote(quote)
  67. return quote, f"fetched:{provider.name}"
  68. if cached_quote is not None:
  69. return cached_quote, f"stale_cache:{provider.name}"
  70. return None, f"missing:{provider.name}"
  71. async def fetch_quotes(self) -> list[MarketQuote]:
  72. quotes: list[MarketQuote] = []
  73. source_status: dict[str, str] = {}
  74. for symbol in self.config.symbols:
  75. aliases = SYMBOL_ALIASES.get(symbol.upper(), {"finnhub": symbol, "twelve_data": symbol})
  76. quote, status = await self._fetch_or_cache(
  77. symbol,
  78. self.finnhub,
  79. aliases["finnhub"],
  80. self.config.finnhub_ttl_seconds,
  81. )
  82. source_status[f"{symbol}:finnhub"] = status
  83. if quote is None:
  84. quote, status = await self._fetch_or_cache(
  85. symbol,
  86. self.twelve_data,
  87. aliases["twelve_data"],
  88. self.config.twelve_data_ttl_seconds,
  89. )
  90. source_status[f"{symbol}:twelve_data"] = status
  91. if quote is not None:
  92. quote.symbol = symbol
  93. quotes.append(quote)
  94. self._last_source_status = source_status
  95. return quotes
  96. async def build_snapshot(self) -> RegimeSnapshot:
  97. quotes = await self.fetch_quotes()
  98. snapshot = build_regime_snapshot(quotes)
  99. snapshot.source_status = dict(self._last_source_status)
  100. self.store.save(snapshot)
  101. return snapshot
  102. async def get_snapshot(self, refresh: bool = False) -> RegimeSnapshot:
  103. latest = self.store.latest()
  104. if refresh or latest is None:
  105. return await self.build_snapshot()
  106. fetched_at = latest.generated_at.astimezone(timezone.utc)
  107. age_seconds = (datetime.now(timezone.utc) - fetched_at).total_seconds()
  108. if age_seconds > self.config.snapshot_ttl_seconds:
  109. return await self.build_snapshot()
  110. return latest
  111. async def get_regime(self, refresh: bool = False) -> dict:
  112. snapshot = await self.get_snapshot(refresh=refresh)
  113. return {
  114. "snapshot_id": snapshot.snapshot_id,
  115. "generated_at": snapshot.generated_at,
  116. "regime": snapshot.regime,
  117. "confidence": snapshot.confidence,
  118. "summary": snapshot.summary,
  119. "components": snapshot.components,
  120. }
  121. def health(self) -> dict:
  122. latest = self.store.latest()
  123. return {
  124. "status": "ok",
  125. "providers": self.provider_summary(),
  126. "snapshot_count": self.store.count(),
  127. "latest_snapshot_at": latest.generated_at.isoformat() if latest else None,
  128. }