__init__.py 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758
  1. """Service layer."""
  2. import time
  3. from config import DEFAULT_OHLCV_LIMIT, MAX_OHLCV_LIMIT
  4. from cache import get_cached_price, set_cached_price, get_cached_ohlcv, set_cached_ohlcv
  5. import providers
  6. import indicators as ind_module
  7. async def get_price(symbol: str) -> dict:
  8. symbol = symbol.upper()
  9. cached = get_cached_price(symbol)
  10. if cached:
  11. return cached
  12. data = await providers.fetch_price(symbol)
  13. set_cached_price(symbol, data)
  14. return data
  15. async def get_ohlcv(symbol: str, timeframe: str, limit: int = DEFAULT_OHLCV_LIMIT) -> dict:
  16. symbol = symbol.upper()
  17. limit = min(max(limit, 1), MAX_OHLCV_LIMIT)
  18. cached = get_cached_ohlcv(symbol, timeframe)
  19. if cached:
  20. result = dict(cached)
  21. result["candles"] = cached["candles"][-limit:]
  22. return result
  23. data = await providers.fetch_ohlcv(symbol, timeframe, limit=MAX_OHLCV_LIMIT)
  24. set_cached_ohlcv(symbol, timeframe, data)
  25. result = dict(data)
  26. result["candles"] = data["candles"][-limit:]
  27. return result
  28. async def get_indicator(symbol: str, indicator: str, timeframe: str = "1h", params: dict = None, limit: int = 200) -> dict:
  29. params = params or {}
  30. symbol = symbol.upper()
  31. ohlcv_data = await get_ohlcv(symbol, timeframe, limit=limit)
  32. result = ind_module.compute_indicator(ohlcv_data["candles"], indicator, params)
  33. return {"symbol": symbol, "indicator": result["indicator"], "timeframe": timeframe, "value": result["value"], "timestamp": int(time.time())}
  34. async def get_market_snapshot(symbol: str) -> dict:
  35. symbol = symbol.upper()
  36. price_data = await get_price(symbol)
  37. ohlcv_data = await get_ohlcv(symbol, "1h", limit=200)
  38. candles = ohlcv_data["candles"]
  39. snapshot = {"symbol": symbol, "price": price_data["price"], "rsi_1h": None, "ema_20_1h": None, "ema_50_1h": None, "timestamp": price_data["timestamp"]}
  40. for key, ind, params in [("rsi_1h", "rsi", {"period": 14}), ("ema_20_1h", "ema", {"period": 20}), ("ema_50_1h", "ema", {"period": 50})]:
  41. try:
  42. snapshot[key] = ind_module.compute_indicator(candles, ind, params)["value"]
  43. except Exception:
  44. pass
  45. return snapshot
  46. async def get_top_movers(limit: int = 10) -> dict:
  47. return await providers.fetch_top_movers(min(max(limit, 1), 50))