| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272 |
- """Service layer."""
- import time
- from config import DEFAULT_OHLCV_LIMIT, MAX_OHLCV_LIMIT, TIMEFRAME_TO_BINANCE
- from cache import get_cached_price, set_cached_price, get_cached_ohlcv, set_cached_ohlcv
- import providers
- import indicators as ind_module
- async def get_price(symbol: str) -> dict:
- symbol = symbol.upper()
- cached = get_cached_price(symbol)
- if cached:
- return cached
- data = await providers.fetch_price(symbol)
- set_cached_price(symbol, data)
- return data
- async def get_ohlcv(symbol: str, timeframe: str, limit: int = DEFAULT_OHLCV_LIMIT) -> dict:
- symbol = symbol.upper()
- limit = min(max(limit, 1), MAX_OHLCV_LIMIT)
- cached = get_cached_ohlcv(symbol, timeframe)
- if cached:
- result = dict(cached)
- result["candles"] = cached["candles"][-limit:]
- return result
- data = await providers.fetch_ohlcv(symbol, timeframe, limit=MAX_OHLCV_LIMIT)
- set_cached_ohlcv(symbol, timeframe, data)
- result = dict(data)
- result["candles"] = data["candles"][-limit:]
- return result
- async def get_indicator(symbol: str, indicator: str, timeframe: str = "1h", params: dict = None, limit: int = 200) -> dict:
- params = params or {}
- symbol = symbol.upper()
- ohlcv_data = await get_ohlcv(symbol, timeframe, limit=limit)
- result = ind_module.compute_indicator(ohlcv_data["candles"], indicator, params)
- return {"symbol": symbol, "indicator": result["indicator"], "timeframe": timeframe, "value": result["value"], "timestamp": int(time.time())}
- async def get_market_snapshot(symbol: str) -> dict:
- symbol = symbol.upper()
- price_data = await get_price(symbol)
- ohlcv_data = await get_ohlcv(symbol, "1h", limit=200)
- candles = ohlcv_data["candles"]
- price = price_data["price"]
- snapshot = {
- "symbol": symbol,
- "price": price,
- "rsi_1h": None,
- "ema_20_1h": None,
- "ema_50_1h": None,
- "ema_200_1h": None,
- "macd_histogram_1h": None,
- "atr_1h": None,
- "atr_percent_1h": None,
- "bollinger_1h": None,
- "vwap_1h": None,
- "trend_bias": None,
- "timestamp": price_data["timestamp"],
- }
- def _compute(name: str, params: dict):
- return ind_module.compute_indicator(candles, name, params)["value"]
- for key, ind, params in [
- ("rsi_1h", "rsi", {"period": 14}),
- ("ema_20_1h", "ema", {"period": 20}),
- ("ema_50_1h", "ema", {"period": 50}),
- ("ema_200_1h", "sma", {"period": 200}),
- ]:
- try:
- snapshot[key] = _compute(ind, params)
- except Exception:
- continue
- try:
- macd_vals = _compute("macd", {"fast_period": 12, "slow_period": 26, "signal_period": 9})
- if isinstance(macd_vals, dict):
- snapshot["macd_histogram_1h"] = macd_vals.get("histogram")
- except Exception:
- pass
- try:
- atr_val = _compute("atr", {"period": 14})
- snapshot["atr_1h"] = atr_val
- if atr_val is not None and price:
- snapshot["atr_percent_1h"] = round((atr_val / price) * 100, 4)
- except Exception:
- pass
- try:
- snapshot["bollinger_1h"] = _compute("bollinger", {"period": 20, "multiplier": 2.0})
- except Exception:
- pass
- try:
- snapshot["vwap_1h"] = _compute("vwap", {"period": 48})
- except Exception:
- pass
- ema_fast = snapshot.get("ema_20_1h")
- ema_slow = snapshot.get("ema_50_1h")
- if ema_fast is not None and ema_slow not in (None, 0):
- delta = (ema_fast - ema_slow) / ema_slow
- if delta > 0.002:
- snapshot["trend_bias"] = "bull"
- elif delta < -0.002:
- snapshot["trend_bias"] = "bear"
- else:
- snapshot["trend_bias"] = "range"
- return snapshot
- async def get_top_movers(limit: int = 10) -> dict:
- return await providers.fetch_top_movers(min(max(limit, 1), 50))
- async def get_capabilities() -> dict:
- timeframe_descriptions = {
- "1m": "1-minute candles — ultra-fast scalping / heartbeat data (highest volatility, shortest TTL)",
- "5m": "5-minute candles — intraday momentum and micro-structure",
- "15m": "15-minute candles — short-term swings, aligns with many bot cycles",
- "1h": "1-hour candles — default for balanced trend/momentum reads",
- "4h": "4-hour candles — swing/position traders",
- "1d": "1-day candles — macro trend / higher timeframe context",
- }
- timeframes = []
- for tf, interval in TIMEFRAME_TO_BINANCE.items():
- timeframes.append(
- {
- "timeframe": tf,
- "provider_interval": interval,
- "description": timeframe_descriptions.get(tf, ""),
- }
- )
- return {
- "description": "Supported technical indicators (with params/defaults) and the timeframes you can request via get_ohlcv / get_indicator / get_regime.",
- "indicators": ind_module.get_supported_indicators(),
- "timeframes": timeframes,
- }
- async def get_regime(symbol: str, timeframe: str = "1h", limit: int = 200) -> dict:
- symbol = symbol.upper()
- ohlcv_data = await get_ohlcv(symbol, timeframe, limit=limit)
- candles = ohlcv_data["candles"]
- close_price = float(candles[-1][4]) if candles else None
- timestamp = int(time.time())
- def _compute(name: str, params: dict | None = None):
- try:
- return ind_module.compute_indicator(candles, name, params or {})["value"]
- except Exception:
- return None
- ema_fast = _compute("ema", {"period": 20})
- ema_slow = _compute("ema", {"period": 50})
- sma_long = _compute("sma", {"period": 200})
- atr_val = _compute("atr", {"period": 14})
- boll = _compute("bollinger", {"period": 20, "multiplier": 2.0})
- vwap_val = _compute("vwap", {"period": 48})
- rsi_val = _compute("rsi", {"period": 14})
- macd_vals = _compute("macd", {"fast_period": 12, "slow_period": 26, "signal_period": 9})
- regime_delta = None
- if ema_fast is not None and ema_slow is not None and ema_slow != 0:
- regime_delta = (ema_fast - ema_slow) / ema_slow
- if regime_delta is None:
- trend_state = "unknown"
- elif regime_delta > 0.002:
- trend_state = "bull"
- elif regime_delta < -0.002:
- trend_state = "bear"
- else:
- trend_state = "range"
- if rsi_val is None:
- momentum_state = "unknown"
- elif rsi_val >= 60:
- momentum_state = "bull"
- elif rsi_val <= 40:
- momentum_state = "bear"
- else:
- momentum_state = "neutral"
- atr_percent = None
- if atr_val is not None and close_price:
- atr_percent = round((atr_val / close_price) * 100, 4)
- macd_hist = macd_vals.get("histogram") if isinstance(macd_vals, dict) else None
- # --- Early reversal score (heuristic) ---
- # Goal: flag plausible “pivot attempts” before full trend confirmation.
- # Note: without previous-bar indicator deltas, this is a conservative, snapshot-based score.
- reversal = {"direction": "none", "score": 0, "triggers": []}
- if close_price is not None and vwap_val is not None and isinstance(boll, dict):
- upper = boll.get("upper")
- lower = boll.get("lower")
- if upper and lower and (upper > lower):
- dist_to_upper = (upper - close_price) / close_price if close_price else None
- dist_to_lower = (close_price - lower) / close_price if close_price else None
- else:
- dist_to_upper = None
- dist_to_lower = None
- # Bullish reversal attempt while trend is bearish
- if trend_state == "bear" and macd_hist is not None and rsi_val is not None and macd_hist > 0:
- score = 0
- if rsi_val >= 45:
- score += 25
- reversal["triggers"].append("RSI rising/near-neutral (>=45)")
- if vwap_val is not None and close_price > vwap_val:
- score += 25
- reversal["triggers"].append("Price reclaimed above VWAP")
- if dist_to_lower is not None and dist_to_lower <= 0.01:
- score += 20
- reversal["triggers"].append("Near lower Bollinger band")
- if atr_percent is not None and atr_percent >= 0.8:
- score += 10
- reversal["triggers"].append("Volatility elevated (ATR% >= 0.8)")
- if score >= 40:
- reversal.update({"direction": "bullish", "score": min(score, 95)})
- # Bearish reversal attempt while trend is bullish
- if trend_state == "bull" and macd_hist is not None and rsi_val is not None and macd_hist < 0:
- score = 0
- if rsi_val <= 55:
- score += 25
- reversal["triggers"].append("RSI falling/near-neutral (<=55)")
- if vwap_val is not None and close_price < vwap_val:
- score += 25
- reversal["triggers"].append("Price rejected below VWAP")
- if dist_to_upper is not None and dist_to_upper <= 0.01:
- score += 20
- reversal["triggers"].append("Near upper Bollinger band")
- if atr_percent is not None and atr_percent >= 0.8:
- score += 10
- reversal["triggers"].append("Volatility elevated (ATR% >= 0.8)")
- if score >= 40:
- reversal.update({"direction": "bearish", "score": min(score, 95)})
- return {
- "symbol": symbol,
- "timeframe": timeframe,
- "timestamp": timestamp,
- "price": close_price,
- "trend": {
- "ema_fast": ema_fast,
- "ema_slow": ema_slow,
- "sma_long": sma_long,
- "state": trend_state,
- },
- "momentum": {
- "rsi": rsi_val,
- "macd_histogram": macd_hist,
- "state": momentum_state,
- },
- "volatility": {
- "atr": atr_val,
- "atr_percent": atr_percent,
- },
- "bands": {
- "bollinger": boll,
- },
- "vwap": vwap_val,
- "reversal": reversal,
- }
|