| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241 |
- """Technical indicators and metadata for the crypto MCP."""
- from collections import OrderedDict
- from math import sqrt
- from errors import InsufficientDataError, UnsupportedIndicatorError
- def _closes(candles: list) -> list[float]:
- return [float(c[4]) for c in candles]
- def _highs(candles: list) -> list[float]:
- return [float(c[2]) for c in candles]
- def _lows(candles: list) -> list[float]:
- return [float(c[3]) for c in candles]
- def _volumes(candles: list) -> list[float]:
- return [float(c[5]) for c in candles]
- def _ema_series(values: list[float], period: int) -> list[float]:
- if len(values) < period:
- raise InsufficientDataError(f"EMA({period}) requires at least {period} candles, got {len(values)}")
- k = 2 / (period + 1)
- ema_vals = [sum(values[:period]) / period]
- for v in values[period:]:
- ema_vals.append(v * k + ema_vals[-1] * (1 - k))
- return ema_vals
- def ema(candles: list, period: int = 20) -> float:
- return round(_ema_series(_closes(candles), period)[-1], 6)
- def rsi(candles: list, period: int = 14) -> float:
- closes = _closes(candles)
- if len(closes) < period + 1:
- raise InsufficientDataError(f"RSI({period}) requires at least {period + 1} candles, got {len(closes)}")
- deltas = [closes[i] - closes[i - 1] for i in range(1, len(closes))]
- gains = [max(d, 0.0) for d in deltas]
- losses = [abs(min(d, 0.0)) for d in deltas]
- avg_gain = sum(gains[:period]) / period
- avg_loss = sum(losses[:period]) / period
- for i in range(period, len(deltas)):
- avg_gain = (avg_gain * (period - 1) + gains[i]) / period
- avg_loss = (avg_loss * (period - 1) + losses[i]) / period
- if avg_loss == 0:
- return 100.0
- rs = avg_gain / avg_loss
- return round(100 - (100 / (1 + rs)), 2)
- def macd(candles: list, fast_period: int = 12, slow_period: int = 26, signal_period: int = 9) -> dict:
- closes = _closes(candles)
- if len(closes) < slow_period + signal_period:
- raise InsufficientDataError("MACD requires more candles")
- fast_ema = _ema_series(closes, fast_period)
- slow_ema = _ema_series(closes, slow_period)
- offset = len(fast_ema) - len(slow_ema)
- macd_line = [fast_ema[i + offset] - slow_ema[i] for i in range(len(slow_ema))]
- signal_line = _ema_series(macd_line, signal_period)
- macd_val = round(macd_line[-1], 6)
- signal_val = round(signal_line[-1], 6)
- return {"macd": macd_val, "signal": signal_val, "histogram": round(macd_val - signal_val, 6)}
- def sma(candles: list, period: int = 20) -> float:
- closes = _closes(candles)
- if len(closes) < period:
- raise InsufficientDataError(f"SMA({period}) requires at least {period} candles, got {len(closes)}")
- return round(sum(closes[-period:]) / period, 6)
- def atr(candles: list, period: int = 14) -> float:
- highs = _highs(candles)
- lows = _lows(candles)
- closes = _closes(candles)
- if len(closes) < period + 1:
- raise InsufficientDataError(f"ATR({period}) requires at least {period + 1} candles, got {len(closes)}")
- true_ranges = []
- for i in range(1, len(candles)):
- high = highs[i]
- low = lows[i]
- prev_close = closes[i - 1]
- tr = max(high - low, abs(high - prev_close), abs(low - prev_close))
- true_ranges.append(tr)
- first_atr = sum(true_ranges[:period]) / period
- atr_val = first_atr
- for tr in true_ranges[period:]:
- atr_val = ((atr_val * (period - 1)) + tr) / period
- return round(atr_val, 6)
- def bollinger_bands(candles: list, period: int = 20, multiplier: float = 2.0) -> dict:
- closes = _closes(candles)
- if len(closes) < period:
- raise InsufficientDataError(f"Bollinger Bands require at least {period} candles, got {len(closes)}")
- window = closes[-period:]
- middle = sum(window) / period
- variance = sum((price - middle) ** 2 for price in window) / period
- std_dev = sqrt(variance)
- upper = middle + multiplier * std_dev
- lower = middle - multiplier * std_dev
- return {"middle": round(middle, 6), "upper": round(upper, 6), "lower": round(lower, 6)}
- def vwap(candles: list, period: int | None = None) -> float:
- if period is not None and period <= 0:
- raise InsufficientDataError("VWAP period must be positive")
- subset = candles[-period:] if period is not None else candles
- if len(subset) < 1:
- raise InsufficientDataError("VWAP requires at least 1 candle")
- volumes = _volumes(subset)
- if sum(volumes) == 0:
- raise InsufficientDataError("VWAP requires non-zero volume")
- typical_prices = [((float(c[2]) + float(c[3]) + float(c[4])) / 3) for c in subset]
- pv = sum(tp * vol for tp, vol in zip(typical_prices, volumes))
- total_vol = sum(volumes)
- return round(pv / total_vol, 6)
- def _rsi_handler(candles: list, params: dict):
- return rsi(candles, period=int(params.get("period", 14)))
- def _ema_handler(candles: list, params: dict):
- return ema(candles, period=int(params.get("period", 20)))
- def _macd_handler(candles: list, params: dict):
- return macd(
- candles,
- fast_period=int(params.get("fast_period", 12)),
- slow_period=int(params.get("slow_period", 26)),
- signal_period=int(params.get("signal_period", 9)),
- )
- def _sma_handler(candles: list, params: dict):
- return sma(candles, period=int(params.get("period", 20)))
- def _atr_handler(candles: list, params: dict):
- return atr(candles, period=int(params.get("period", 14)))
- def _bollinger_handler(candles: list, params: dict):
- return bollinger_bands(
- candles,
- period=int(params.get("period", 20)),
- multiplier=float(params.get("multiplier", 2.0)),
- )
- def _vwap_handler(candles: list, params: dict):
- period = params.get("period")
- if period is not None:
- period = int(period)
- return vwap(candles, period=period)
- SUPPORTED_INDICATORS = OrderedDict(
- {
- "rsi": {
- "description": "Relative Strength Index (RSI) — momentum oscillator (0-100) derived from closing price gains/losses; higher values indicate stronger upward pressure.",
- "handler": _rsi_handler,
- "params": {"period": {"type": "integer", "default": 14, "min": 2}},
- "value_type": "number",
- },
- "ema": {
- "description": "Exponential Moving Average (EMA) — weighted moving average emphasizing recent closes to highlight near-term direction.",
- "handler": _ema_handler,
- "params": {"period": {"type": "integer", "default": 20, "min": 2}},
- "value_type": "number",
- },
- "sma": {
- "description": "Simple Moving Average (SMA) — unweighted rolling average of closes, useful for longer-term baselines (e.g., 200-period trend).",
- "handler": _sma_handler,
- "params": {"period": {"type": "integer", "default": 20, "min": 2}},
- "value_type": "number",
- },
- "macd": {
- "description": "Moving Average Convergence Divergence (MACD) — returns MACD, signal, and histogram values for spotting momentum shifts between fast/slow EMAs.",
- "handler": _macd_handler,
- "params": {
- "fast_period": {"type": "integer", "default": 12, "min": 2},
- "slow_period": {"type": "integer", "default": 26, "min": 3},
- "signal_period": {"type": "integer", "default": 9, "min": 2},
- },
- "value_type": "object",
- },
- "atr": {
- "description": "Average True Range (ATR) — classic volatility gauge derived from true range; higher values imply wider expected movement.",
- "handler": _atr_handler,
- "params": {"period": {"type": "integer", "default": 14, "min": 2}},
- "value_type": "number",
- },
- "bollinger": {
- "description": "Bollinger Bands — middle SMA with upper/lower bands offset by standard deviations; helpful for squeeze/mean-reversion checks.",
- "handler": _bollinger_handler,
- "params": {
- "period": {"type": "integer", "default": 20, "min": 2},
- "multiplier": {"type": "number", "default": 2.0, "min": 0.5},
- },
- "value_type": "object",
- },
- "vwap": {
- "description": "Volume Weighted Average Price (VWAP) — rolling average price weighted by traded volume; useful as an intraday fair-value anchor.",
- "handler": _vwap_handler,
- "params": {"period": {"type": "integer", "default": None, "nullable": True}},
- "value_type": "number",
- },
- }
- )
- def get_supported_indicators() -> list[dict]:
- entries: list[dict] = []
- for name, meta in SUPPORTED_INDICATORS.items():
- entries.append(
- {
- "name": name,
- "description": meta["description"],
- "params": meta["params"],
- "value_type": meta.get("value_type", "number"),
- }
- )
- return entries
- def compute_indicator(candles: list, indicator: str, params: dict) -> dict:
- ind = indicator.lower()
- if ind not in SUPPORTED_INDICATORS:
- raise UnsupportedIndicatorError(f"Unsupported indicator: {indicator}")
- handler = SUPPORTED_INDICATORS[ind]["handler"]
- value = handler(candles, params)
- return {"indicator": ind, "value": value}
|