"""Technical indicators.""" from errors import InsufficientDataError, UnsupportedIndicatorError def _closes(candles: list) -> list[float]: return [float(c[4]) for c in candles] def _ema_series(values: list[float], period: int) -> list[float]: if len(values) < period: raise InsufficientDataError(f"EMA({period}) requires at least {period} candles, got {len(values)}") k = 2 / (period + 1) ema_vals = [sum(values[:period]) / period] for v in values[period:]: ema_vals.append(v * k + ema_vals[-1] * (1 - k)) return ema_vals def ema(candles: list, period: int = 20) -> float: return round(_ema_series(_closes(candles), period)[-1], 6) def rsi(candles: list, period: int = 14) -> float: closes = _closes(candles) if len(closes) < period + 1: raise InsufficientDataError(f"RSI({period}) requires at least {period + 1} candles, got {len(closes)}") deltas = [closes[i] - closes[i - 1] for i in range(1, len(closes))] gains = [max(d, 0.0) for d in deltas] losses = [abs(min(d, 0.0)) for d in deltas] avg_gain = sum(gains[:period]) / period avg_loss = sum(losses[:period]) / period for i in range(period, len(deltas)): avg_gain = (avg_gain * (period - 1) + gains[i]) / period avg_loss = (avg_loss * (period - 1) + losses[i]) / period if avg_loss == 0: return 100.0 rs = avg_gain / avg_loss return round(100 - (100 / (1 + rs)), 2) def macd(candles: list, fast_period: int = 12, slow_period: int = 26, signal_period: int = 9) -> dict: closes = _closes(candles) if len(closes) < slow_period + signal_period: raise InsufficientDataError("MACD requires more candles") fast_ema = _ema_series(closes, fast_period) slow_ema = _ema_series(closes, slow_period) offset = len(fast_ema) - len(slow_ema) macd_line = [fast_ema[i + offset] - slow_ema[i] for i in range(len(slow_ema))] signal_line = _ema_series(macd_line, signal_period) macd_val = round(macd_line[-1], 6) signal_val = round(signal_line[-1], 6) return {"macd": macd_val, "signal": signal_val, "histogram": round(macd_val - signal_val, 6)} def sma(candles: list, period: int = 20) -> float: closes = _closes(candles) if len(closes) < period: raise InsufficientDataError(f"SMA({period}) requires at least {period} candles, got {len(closes)}") return round(sum(closes[-period:]) / period, 6) def compute_indicator(candles: list, indicator: str, params: dict) -> dict: ind = indicator.lower() if ind == "rsi": value = rsi(candles, period=int(params.get("period", 14))) elif ind == "ema": value = ema(candles, period=int(params.get("period", 20))) elif ind == "macd": value = macd(candles, fast_period=int(params.get("fast_period", 12)), slow_period=int(params.get("slow_period", 26)), signal_period=int(params.get("signal_period", 9))) elif ind == "sma": value = sma(candles, period=int(params.get("period", 20))) else: raise UnsupportedIndicatorError(f"Unsupported indicator: {indicator}") return {"indicator": ind, "value": value}