|
|
@@ -1,13 +1,174 @@
|
|
|
from __future__ import annotations
|
|
|
|
|
|
+from datetime import datetime, timezone
|
|
|
+from statistics import pstdev
|
|
|
from typing import Any
|
|
|
|
|
|
-from .config import DB_PATH
|
|
|
+from .config import DB_PATH, METALS_PAIRS
|
|
|
from .storage import last_candle, latest_candles, stats
|
|
|
from .swissquote import SwissquoteClient
|
|
|
|
|
|
client = SwissquoteClient()
|
|
|
|
|
|
+WATCHLIST = tuple(METALS_PAIRS)
|
|
|
+
|
|
|
+
|
|
|
+def _canonical_symbol(symbol: str) -> str:
|
|
|
+ return client.normalize_symbol(symbol).upper()
|
|
|
+
|
|
|
+
|
|
|
+def _to_float(value: Any) -> float | None:
|
|
|
+ if value is None:
|
|
|
+ return None
|
|
|
+ try:
|
|
|
+ return float(value)
|
|
|
+ except (TypeError, ValueError):
|
|
|
+ return None
|
|
|
+
|
|
|
+
|
|
|
+def _closes(candles: list[dict[str, Any]]) -> list[float]:
|
|
|
+ closes: list[float] = []
|
|
|
+ for candle in candles:
|
|
|
+ close = _to_float(candle.get("close"))
|
|
|
+ if close is not None:
|
|
|
+ closes.append(close)
|
|
|
+ return closes
|
|
|
+
|
|
|
+
|
|
|
+def _sma(values: list[float], period: int) -> float | None:
|
|
|
+ if period <= 0 or len(values) < period:
|
|
|
+ return None
|
|
|
+ window = values[-period:]
|
|
|
+ return sum(window) / len(window)
|
|
|
+
|
|
|
+
|
|
|
+def _ema(values: list[float], period: int) -> float | None:
|
|
|
+ if period <= 0 or len(values) < period:
|
|
|
+ return None
|
|
|
+ k = 2.0 / (period + 1.0)
|
|
|
+ ema = sum(values[:period]) / period
|
|
|
+ for value in values[period:]:
|
|
|
+ ema = value * k + ema * (1.0 - k)
|
|
|
+ return ema
|
|
|
+
|
|
|
+
|
|
|
+def _rsi(values: list[float], period: int = 14) -> float | None:
|
|
|
+ if period <= 0 or len(values) < period + 1:
|
|
|
+ return None
|
|
|
+ gains = 0.0
|
|
|
+ losses = 0.0
|
|
|
+ for previous, current in zip(values[-(period + 1) : -1], values[-period:]):
|
|
|
+ delta = current - previous
|
|
|
+ if delta >= 0:
|
|
|
+ gains += delta
|
|
|
+ else:
|
|
|
+ losses -= delta
|
|
|
+ if gains == 0 and losses == 0:
|
|
|
+ return 50.0
|
|
|
+ if losses == 0:
|
|
|
+ return 100.0
|
|
|
+ rs = gains / losses
|
|
|
+ return 100.0 - (100.0 / (1.0 + rs))
|
|
|
+
|
|
|
+
|
|
|
+def _atr_pct(candles: list[dict[str, Any]], period: int = 14) -> float | None:
|
|
|
+ if period <= 0 or len(candles) < period + 1:
|
|
|
+ return None
|
|
|
+ true_ranges: list[float] = []
|
|
|
+ for idx in range(1, len(candles)):
|
|
|
+ high = _to_float(candles[idx].get("high"))
|
|
|
+ low = _to_float(candles[idx].get("low"))
|
|
|
+ prev_close = _to_float(candles[idx - 1].get("close"))
|
|
|
+ if high is None or low is None or prev_close is None:
|
|
|
+ continue
|
|
|
+ true_ranges.append(max(high - low, abs(high - prev_close), abs(low - prev_close)))
|
|
|
+ if len(true_ranges) < period:
|
|
|
+ return None
|
|
|
+ atr = sum(true_ranges[-period:]) / period
|
|
|
+ last_close = _to_float(candles[-1].get("close"))
|
|
|
+ if last_close in (None, 0):
|
|
|
+ return None
|
|
|
+ return (atr / last_close) * 100.0
|
|
|
+
|
|
|
+
|
|
|
+def _trend_pct(closes: list[float]) -> float | None:
|
|
|
+ if len(closes) < 2:
|
|
|
+ return None
|
|
|
+ first = closes[0]
|
|
|
+ last = closes[-1]
|
|
|
+ if first == 0:
|
|
|
+ return None
|
|
|
+ return ((last - first) / first) * 100.0
|
|
|
+
|
|
|
+
|
|
|
+def _range_position(candles: list[dict[str, Any]]) -> float | None:
|
|
|
+ highs = [_to_float(c.get("high")) for c in candles]
|
|
|
+ lows = [_to_float(c.get("low")) for c in candles]
|
|
|
+ closes = [_to_float(c.get("close")) for c in candles]
|
|
|
+ highs = [v for v in highs if v is not None]
|
|
|
+ lows = [v for v in lows if v is not None]
|
|
|
+ closes = [v for v in closes if v is not None]
|
|
|
+ if not highs or not lows or not closes:
|
|
|
+ return None
|
|
|
+ low = min(lows)
|
|
|
+ high = max(highs)
|
|
|
+ if high <= low:
|
|
|
+ return 0.5
|
|
|
+ return (closes[-1] - low) / (high - low)
|
|
|
+
|
|
|
+
|
|
|
+def _derive_regime(trend_pct: float | None, rsi: float | None, atr_pct: float | None) -> tuple[str, float, str]:
|
|
|
+ if trend_pct is None:
|
|
|
+ return "no_data", 0.0, "Not enough candle history to classify the market."
|
|
|
+
|
|
|
+ compression = atr_pct is not None and atr_pct < 0.25
|
|
|
+ bullish = trend_pct >= 0.35 and (rsi is None or rsi >= 55)
|
|
|
+ bearish = trend_pct <= -0.35 and (rsi is None or rsi <= 45)
|
|
|
+
|
|
|
+ if compression:
|
|
|
+ confidence = min(0.65, max(0.2, 0.35 + (0.25 - (atr_pct or 0.0))))
|
|
|
+ return "compression", confidence, "Price action is compressed and range-like."
|
|
|
+ if bullish:
|
|
|
+ confidence = min(0.9, 0.5 + min(abs(trend_pct) / 4.0, 0.4))
|
|
|
+ return "bullish", confidence, "Momentum is supportive and the trend is holding up."
|
|
|
+ if bearish:
|
|
|
+ confidence = min(0.9, 0.5 + min(abs(trend_pct) / 4.0, 0.4))
|
|
|
+ return "bearish", confidence, "Momentum is weak and sellers are in control."
|
|
|
+
|
|
|
+ confidence = 0.35
|
|
|
+ if rsi is not None:
|
|
|
+ confidence += min(abs(rsi - 50.0) / 100.0, 0.25)
|
|
|
+ return "neutral", confidence, "Signals are mixed and the market is not strongly directional."
|
|
|
+
|
|
|
+
|
|
|
+def _symbol_snapshot(symbol: str, timeframe: str = "5m", limit: int = 20) -> dict[str, Any]:
|
|
|
+ canonical = _canonical_symbol(symbol)
|
|
|
+ candles = latest_candles(DB_PATH, canonical, timeframe, limit)
|
|
|
+ last = last_candle(DB_PATH, canonical, timeframe)
|
|
|
+ closes = _closes(candles)
|
|
|
+ trend_pct = _trend_pct(closes)
|
|
|
+ rsi = _rsi(closes)
|
|
|
+ atr_pct = _atr_pct(candles)
|
|
|
+ range_position = _range_position(candles)
|
|
|
+ regime, confidence, summary = _derive_regime(trend_pct, rsi, atr_pct)
|
|
|
+
|
|
|
+ return {
|
|
|
+ "symbol": symbol.upper(),
|
|
|
+ "pair": canonical,
|
|
|
+ "timeframe": timeframe,
|
|
|
+ "candle_count": len(candles),
|
|
|
+ "price": last["close"] if last else None,
|
|
|
+ "change_pct": trend_pct,
|
|
|
+ "rsi": rsi,
|
|
|
+ "atr_pct": atr_pct,
|
|
|
+ "range_position": range_position,
|
|
|
+ "regime": regime,
|
|
|
+ "confidence": confidence,
|
|
|
+ "summary": summary,
|
|
|
+ "last_candle": last,
|
|
|
+ "candles": candles,
|
|
|
+ }
|
|
|
+
|
|
|
|
|
|
def get_capabilities() -> dict[str, Any]:
|
|
|
return {
|
|
|
@@ -19,6 +180,7 @@ def get_capabilities() -> dict[str, Any]:
|
|
|
"get_indicator",
|
|
|
"get_market_snapshot",
|
|
|
"get_top_movers",
|
|
|
+ "get_last_candle",
|
|
|
"get_capabilities",
|
|
|
"get_regime",
|
|
|
],
|
|
|
@@ -63,8 +225,9 @@ def get_price(symbol: str, counter_currency: str | None = None) -> dict[str, Any
|
|
|
|
|
|
|
|
|
def get_ohlcv(symbol: str, timeframe: str = "5m", limit: int = 100) -> dict[str, Any]:
|
|
|
- candles = latest_candles(DB_PATH, symbol.upper(), timeframe, limit)
|
|
|
- return {"symbol": symbol.upper(), "timeframe": timeframe, "limit": limit, "candles": candles}
|
|
|
+ canonical = _canonical_symbol(symbol)
|
|
|
+ candles = latest_candles(DB_PATH, canonical, timeframe, limit)
|
|
|
+ return {"symbol": canonical, "timeframe": timeframe, "limit": limit, "candles": candles}
|
|
|
|
|
|
|
|
|
def get_candles(symbol: str, timeframe: str = "5m", limit: int = 100) -> dict[str, Any]:
|
|
|
@@ -72,26 +235,150 @@ def get_candles(symbol: str, timeframe: str = "5m", limit: int = 100) -> dict[st
|
|
|
|
|
|
|
|
|
def get_last_candle(symbol: str, timeframe: str = "5m") -> dict[str, Any]:
|
|
|
- return {"symbol": symbol.upper(), "timeframe": timeframe, "candle": last_candle(DB_PATH, symbol.upper(), timeframe)}
|
|
|
+ canonical = _canonical_symbol(symbol)
|
|
|
+ return {"symbol": canonical, "timeframe": timeframe, "candle": last_candle(DB_PATH, canonical, timeframe)}
|
|
|
|
|
|
|
|
|
def get_indicator(symbol: str, indicator: str, timeframe: str = "5m", params: dict[str, Any] | None = None) -> dict[str, Any]:
|
|
|
- return {"symbol": symbol, "indicator": indicator, "timeframe": timeframe, "params": params or {}, "value": None, "status": "scaffolded"}
|
|
|
+ params = params or {}
|
|
|
+ limit = int(params.get("limit", 50))
|
|
|
+ canonical = _canonical_symbol(symbol)
|
|
|
+ candles = latest_candles(DB_PATH, canonical, timeframe, max(limit, 2))
|
|
|
+ closes = _closes(candles)
|
|
|
+ name = indicator.lower().strip()
|
|
|
+
|
|
|
+ value: float | None
|
|
|
+ if name == "sma":
|
|
|
+ value = _sma(closes, int(params.get("period", 14)))
|
|
|
+ elif name == "ema":
|
|
|
+ value = _ema(closes, int(params.get("period", 14)))
|
|
|
+ elif name == "rsi":
|
|
|
+ value = _rsi(closes, int(params.get("period", 14)))
|
|
|
+ elif name == "atr":
|
|
|
+ value = _atr_pct(candles, int(params.get("period", 14)))
|
|
|
+ elif name == "return_pct":
|
|
|
+ value = _trend_pct(closes)
|
|
|
+ elif name == "volatility":
|
|
|
+ returns = [((curr - prev) / prev) * 100.0 for prev, curr in zip(closes, closes[1:]) if prev]
|
|
|
+ value = pstdev(returns) if len(returns) >= 2 else None
|
|
|
+ else:
|
|
|
+ value = None
|
|
|
+
|
|
|
+ status = "ok" if value is not None else "unavailable"
|
|
|
+ return {
|
|
|
+ "symbol": canonical,
|
|
|
+ "indicator": name,
|
|
|
+ "timeframe": timeframe,
|
|
|
+ "params": params,
|
|
|
+ "value": value,
|
|
|
+ "status": status,
|
|
|
+ }
|
|
|
|
|
|
|
|
|
def get_market_snapshot(symbol: str) -> dict[str, Any]:
|
|
|
- candle = last_candle(DB_PATH, symbol.upper(), "5m")
|
|
|
- price = candle["close"] if candle else None
|
|
|
- return {"symbol": symbol.upper(), "price": price, "trend_bias": "range", "status": "scaffolded"}
|
|
|
+ snapshot = _symbol_snapshot(symbol, "5m", 20)
|
|
|
+ return {
|
|
|
+ "symbol": snapshot["symbol"],
|
|
|
+ "timeframe": snapshot["timeframe"],
|
|
|
+ "price": snapshot["price"],
|
|
|
+ "change_pct": snapshot["change_pct"],
|
|
|
+ "trend_bias": snapshot["regime"],
|
|
|
+ "confidence": snapshot["confidence"],
|
|
|
+ "summary": snapshot["summary"],
|
|
|
+ "components": {
|
|
|
+ "trend_pct": snapshot["change_pct"],
|
|
|
+ "rsi": snapshot["rsi"],
|
|
|
+ "atr_pct": snapshot["atr_pct"],
|
|
|
+ "range_position": snapshot["range_position"],
|
|
|
+ },
|
|
|
+ "last_candle": snapshot["last_candle"],
|
|
|
+ "status": "ok" if snapshot["candle_count"] else "no_data",
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+def get_combined_snapshot(timeframe: str = "5m", limit: int = 20) -> dict[str, Any]:
|
|
|
+ symbols = {symbol: _symbol_snapshot(symbol, timeframe, limit) for symbol in WATCHLIST}
|
|
|
+ gold = symbols.get("XAU/USD")
|
|
|
+ silver = symbols.get("XAG/USD")
|
|
|
+
|
|
|
+ ratio = None
|
|
|
+ ratio_change = None
|
|
|
+ if gold and silver and gold["price"] and silver["price"]:
|
|
|
+ ratio = gold["price"] / silver["price"]
|
|
|
+ if gold["change_pct"] is not None and silver["change_pct"] is not None:
|
|
|
+ ratio_change = gold["change_pct"] - silver["change_pct"]
|
|
|
+
|
|
|
+ regimes = [item["regime"] for item in symbols.values() if item["regime"] != "no_data"]
|
|
|
+ if not regimes:
|
|
|
+ regime = "no_data"
|
|
|
+ confidence = 0.0
|
|
|
+ summary = "No metals data available yet."
|
|
|
+ elif all(r == "bullish" for r in regimes):
|
|
|
+ regime = "hard_asset_bid"
|
|
|
+ confidence = min(0.95, sum(item["confidence"] for item in symbols.values()) / len(symbols))
|
|
|
+ summary = "Gold and silver are both trending higher."
|
|
|
+ elif all(r == "bearish" for r in regimes):
|
|
|
+ regime = "hard_asset_pressure"
|
|
|
+ confidence = min(0.95, sum(item["confidence"] for item in symbols.values()) / len(symbols))
|
|
|
+ summary = "Gold and silver are both trending lower."
|
|
|
+ elif any(r == "compression" for r in regimes):
|
|
|
+ regime = "compression"
|
|
|
+ confidence = 0.45
|
|
|
+ summary = "Metals are compressed and waiting for a break."
|
|
|
+ else:
|
|
|
+ regime = "neutral"
|
|
|
+ confidence = 0.4
|
|
|
+ summary = "Metals are mixed and not giving a clean directional read."
|
|
|
+
|
|
|
+ return {
|
|
|
+ "server": "metals-mcp",
|
|
|
+ "generated_at": datetime.now(timezone.utc).isoformat(),
|
|
|
+ "timeframe": timeframe,
|
|
|
+ "symbols": symbols,
|
|
|
+ "cross_asset": {
|
|
|
+ "gold_silver_ratio": ratio,
|
|
|
+ "gold_silver_ratio_change_pct": ratio_change,
|
|
|
+ },
|
|
|
+ "regime": regime,
|
|
|
+ "confidence": confidence,
|
|
|
+ "summary": summary,
|
|
|
+ "status": "ok" if symbols else "no_data",
|
|
|
+ }
|
|
|
|
|
|
|
|
|
def get_top_movers(limit: int = 10) -> dict[str, Any]:
|
|
|
- return {"limit": limit, "movers": [], "status": "scaffolded"}
|
|
|
+ movers = []
|
|
|
+ for symbol in WATCHLIST:
|
|
|
+ snapshot = _symbol_snapshot(symbol, "5m", 20)
|
|
|
+ movers.append(
|
|
|
+ {
|
|
|
+ "symbol": symbol,
|
|
|
+ "change_pct": snapshot["change_pct"],
|
|
|
+ "regime": snapshot["regime"],
|
|
|
+ "confidence": snapshot["confidence"],
|
|
|
+ }
|
|
|
+ )
|
|
|
+ movers = sorted(movers, key=lambda item: abs(item["change_pct"] or 0.0), reverse=True)[:limit]
|
|
|
+ return {"limit": limit, "movers": movers, "status": "ok"}
|
|
|
|
|
|
|
|
|
def get_regime(symbol: str, timeframe: str = "5m") -> dict[str, Any]:
|
|
|
- candles = latest_candles(DB_PATH, symbol.upper(), timeframe, 20)
|
|
|
- return {"symbol": symbol.upper(), "timeframe": timeframe, "candles": candles, "regime": None, "status": "scaffolded"}
|
|
|
+ snapshot = _symbol_snapshot(symbol, timeframe, 20)
|
|
|
+ return {
|
|
|
+ "symbol": snapshot["symbol"],
|
|
|
+ "timeframe": snapshot["timeframe"],
|
|
|
+ "regime": snapshot["regime"],
|
|
|
+ "confidence": snapshot["confidence"],
|
|
|
+ "summary": snapshot["summary"],
|
|
|
+ "components": {
|
|
|
+ "trend_pct": snapshot["change_pct"],
|
|
|
+ "rsi": snapshot["rsi"],
|
|
|
+ "atr_pct": snapshot["atr_pct"],
|
|
|
+ "range_position": snapshot["range_position"],
|
|
|
+ },
|
|
|
+ "candles": snapshot["candles"],
|
|
|
+ "status": "ok" if snapshot["candle_count"] else "no_data",
|
|
|
+ }
|
|
|
|
|
|
|
|
|
def get_health() -> dict[str, Any]:
|