"""Technical indicators and metadata for the crypto MCP.""" from collections import OrderedDict from math import sqrt from errors import InsufficientDataError, UnsupportedIndicatorError def _closes(candles: list) -> list[float]: return [float(c[4]) for c in candles] def _highs(candles: list) -> list[float]: return [float(c[2]) for c in candles] def _lows(candles: list) -> list[float]: return [float(c[3]) for c in candles] def _volumes(candles: list) -> list[float]: return [float(c[5]) for c in candles] def _ema_series(values: list[float], period: int) -> list[float]: if len(values) < period: raise InsufficientDataError(f"EMA({period}) requires at least {period} candles, got {len(values)}") k = 2 / (period + 1) ema_vals = [sum(values[:period]) / period] for v in values[period:]: ema_vals.append(v * k + ema_vals[-1] * (1 - k)) return ema_vals def ema(candles: list, period: int = 20) -> float: return round(_ema_series(_closes(candles), period)[-1], 6) def rsi(candles: list, period: int = 14) -> float: closes = _closes(candles) if len(closes) < period + 1: raise InsufficientDataError(f"RSI({period}) requires at least {period + 1} candles, got {len(closes)}") deltas = [closes[i] - closes[i - 1] for i in range(1, len(closes))] gains = [max(d, 0.0) for d in deltas] losses = [abs(min(d, 0.0)) for d in deltas] avg_gain = sum(gains[:period]) / period avg_loss = sum(losses[:period]) / period for i in range(period, len(deltas)): avg_gain = (avg_gain * (period - 1) + gains[i]) / period avg_loss = (avg_loss * (period - 1) + losses[i]) / period if avg_loss == 0: return 100.0 rs = avg_gain / avg_loss return round(100 - (100 / (1 + rs)), 2) def macd(candles: list, fast_period: int = 12, slow_period: int = 26, signal_period: int = 9) -> dict: closes = _closes(candles) if len(closes) < slow_period + signal_period: raise InsufficientDataError("MACD requires more candles") fast_ema = _ema_series(closes, fast_period) slow_ema = _ema_series(closes, slow_period) offset = len(fast_ema) - len(slow_ema) macd_line = [fast_ema[i + offset] - slow_ema[i] for i in range(len(slow_ema))] signal_line = _ema_series(macd_line, signal_period) macd_val = round(macd_line[-1], 6) signal_val = round(signal_line[-1], 6) return {"macd": macd_val, "signal": signal_val, "histogram": round(macd_val - signal_val, 6)} def sma(candles: list, period: int = 20) -> float: closes = _closes(candles) if len(closes) < period: raise InsufficientDataError(f"SMA({period}) requires at least {period} candles, got {len(closes)}") return round(sum(closes[-period:]) / period, 6) def atr(candles: list, period: int = 14) -> float: highs = _highs(candles) lows = _lows(candles) closes = _closes(candles) if len(closes) < period + 1: raise InsufficientDataError(f"ATR({period}) requires at least {period + 1} candles, got {len(closes)}") true_ranges = [] for i in range(1, len(candles)): high = highs[i] low = lows[i] prev_close = closes[i - 1] tr = max(high - low, abs(high - prev_close), abs(low - prev_close)) true_ranges.append(tr) first_atr = sum(true_ranges[:period]) / period atr_val = first_atr for tr in true_ranges[period:]: atr_val = ((atr_val * (period - 1)) + tr) / period return round(atr_val, 6) def bollinger_bands(candles: list, period: int = 20, multiplier: float = 2.0) -> dict: closes = _closes(candles) if len(closes) < period: raise InsufficientDataError(f"Bollinger Bands require at least {period} candles, got {len(closes)}") window = closes[-period:] middle = sum(window) / period variance = sum((price - middle) ** 2 for price in window) / period std_dev = sqrt(variance) upper = middle + multiplier * std_dev lower = middle - multiplier * std_dev return {"middle": round(middle, 6), "upper": round(upper, 6), "lower": round(lower, 6)} def vwap(candles: list, period: int | None = None) -> float: if period is not None and period <= 0: raise InsufficientDataError("VWAP period must be positive") subset = candles[-period:] if period is not None else candles if len(subset) < 1: raise InsufficientDataError("VWAP requires at least 1 candle") volumes = _volumes(subset) if sum(volumes) == 0: raise InsufficientDataError("VWAP requires non-zero volume") typical_prices = [((float(c[2]) + float(c[3]) + float(c[4])) / 3) for c in subset] pv = sum(tp * vol for tp, vol in zip(typical_prices, volumes)) total_vol = sum(volumes) return round(pv / total_vol, 6) def _rsi_handler(candles: list, params: dict): return rsi(candles, period=int(params.get("period", 14))) def _ema_handler(candles: list, params: dict): return ema(candles, period=int(params.get("period", 20))) def _macd_handler(candles: list, params: dict): return macd( candles, fast_period=int(params.get("fast_period", 12)), slow_period=int(params.get("slow_period", 26)), signal_period=int(params.get("signal_period", 9)), ) def _sma_handler(candles: list, params: dict): return sma(candles, period=int(params.get("period", 20))) def _atr_handler(candles: list, params: dict): return atr(candles, period=int(params.get("period", 14))) def _bollinger_handler(candles: list, params: dict): return bollinger_bands( candles, period=int(params.get("period", 20)), multiplier=float(params.get("multiplier", 2.0)), ) def _vwap_handler(candles: list, params: dict): period = params.get("period") if period is not None: period = int(period) return vwap(candles, period=period) SUPPORTED_INDICATORS = OrderedDict( { "rsi": { "description": "Relative Strength Index (RSI) — momentum oscillator (0-100) derived from closing price gains/losses; higher values indicate stronger upward pressure.", "handler": _rsi_handler, "params": {"period": {"type": "integer", "default": 14, "min": 2}}, "value_type": "number", }, "ema": { "description": "Exponential Moving Average (EMA) — weighted moving average emphasizing recent closes to highlight near-term direction.", "handler": _ema_handler, "params": {"period": {"type": "integer", "default": 20, "min": 2}}, "value_type": "number", }, "sma": { "description": "Simple Moving Average (SMA) — unweighted rolling average of closes, useful for longer-term baselines (e.g., 200-period trend).", "handler": _sma_handler, "params": {"period": {"type": "integer", "default": 20, "min": 2}}, "value_type": "number", }, "macd": { "description": "Moving Average Convergence Divergence (MACD) — returns MACD, signal, and histogram values for spotting momentum shifts between fast/slow EMAs.", "handler": _macd_handler, "params": { "fast_period": {"type": "integer", "default": 12, "min": 2}, "slow_period": {"type": "integer", "default": 26, "min": 3}, "signal_period": {"type": "integer", "default": 9, "min": 2}, }, "value_type": "object", }, "atr": { "description": "Average True Range (ATR) — classic volatility gauge derived from true range; higher values imply wider expected movement.", "handler": _atr_handler, "params": {"period": {"type": "integer", "default": 14, "min": 2}}, "value_type": "number", }, "bollinger": { "description": "Bollinger Bands — middle SMA with upper/lower bands offset by standard deviations; helpful for squeeze/mean-reversion checks.", "handler": _bollinger_handler, "params": { "period": {"type": "integer", "default": 20, "min": 2}, "multiplier": {"type": "number", "default": 2.0, "min": 0.5}, }, "value_type": "object", }, "vwap": { "description": "Volume Weighted Average Price (VWAP) — rolling average price weighted by traded volume; useful as an intraday fair-value anchor.", "handler": _vwap_handler, "params": {"period": {"type": "integer", "default": None, "nullable": True}}, "value_type": "number", }, } ) def get_supported_indicators() -> list[dict]: entries: list[dict] = [] for name, meta in SUPPORTED_INDICATORS.items(): entries.append( { "name": name, "description": meta["description"], "params": meta["params"], "value_type": meta.get("value_type", "number"), } ) return entries def compute_indicator(candles: list, indicator: str, params: dict) -> dict: ind = indicator.lower() if ind not in SUPPORTED_INDICATORS: raise UnsupportedIndicatorError(f"Unsupported indicator: {indicator}") handler = SUPPORTED_INDICATORS[ind]["handler"] value = handler(candles, params) return {"indicator": ind, "value": value}