|
@@ -2,6 +2,8 @@ from __future__ import annotations
|
|
|
|
|
|
|
|
from dataclasses import dataclass
|
|
from dataclasses import dataclass
|
|
|
from datetime import datetime, timezone
|
|
from datetime import datetime, timezone
|
|
|
|
|
+import json
|
|
|
|
|
+from pathlib import Path
|
|
|
from typing import Any
|
|
from typing import Any
|
|
|
|
|
|
|
|
import requests
|
|
import requests
|
|
@@ -11,6 +13,7 @@ HEADERS = {
|
|
|
"User-Agent": "Mozilla/5.0",
|
|
"User-Agent": "Mozilla/5.0",
|
|
|
"Accept": "application/json, text/plain, */*",
|
|
"Accept": "application/json, text/plain, */*",
|
|
|
}
|
|
}
|
|
|
|
|
+PAIRS_PATH = Path(__file__).resolve().parents[3] / "swissquote_pairs.json"
|
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
@dataclass(frozen=True)
|
|
@@ -26,6 +29,9 @@ class Quote:
|
|
|
|
|
|
|
|
|
|
|
|
|
class SwissquoteClient:
|
|
class SwissquoteClient:
|
|
|
|
|
+ def __init__(self) -> None:
|
|
|
|
|
+ self._pairs_cache: set[str] | None = None
|
|
|
|
|
+
|
|
|
def normalize_symbol(self, symbol: str) -> str:
|
|
def normalize_symbol(self, symbol: str) -> str:
|
|
|
cleaned = symbol.replace("/", "").upper()
|
|
cleaned = symbol.replace("/", "").upper()
|
|
|
if cleaned in {"XAU", "XAG", "XPT", "XPD"}:
|
|
if cleaned in {"XAU", "XAG", "XPT", "XPD"}:
|
|
@@ -34,6 +40,27 @@ class SwissquoteClient:
|
|
|
return symbol.upper()
|
|
return symbol.upper()
|
|
|
return symbol.upper()
|
|
return symbol.upper()
|
|
|
|
|
|
|
|
|
|
+ def supported_pairs(self) -> set[str]:
|
|
|
|
|
+ if self._pairs_cache is not None:
|
|
|
|
|
+ return self._pairs_cache
|
|
|
|
|
+ try:
|
|
|
|
|
+ raw = json.loads(PAIRS_PATH.read_text())
|
|
|
|
|
+ pairs = raw.get("pairs", []) if isinstance(raw, dict) else []
|
|
|
|
|
+ self._pairs_cache = {str(item.get("symbol", "")).upper() for item in pairs if isinstance(item, dict) and item.get("symbol")}
|
|
|
|
|
+ except Exception:
|
|
|
|
|
+ self._pairs_cache = set()
|
|
|
|
|
+ return self._pairs_cache
|
|
|
|
|
+
|
|
|
|
|
+ def pair_is_supported(self, symbol: str, counter_currency: str | None = None) -> bool:
|
|
|
|
|
+ base = symbol.replace("/", "").upper() if "/" not in symbol else symbol.split("/", 1)[0].upper()
|
|
|
|
|
+ quote = (counter_currency or "USD").upper()
|
|
|
|
|
+ return f"{base}/{quote}" in self.supported_pairs()
|
|
|
|
|
+
|
|
|
|
|
+ def normalize_pair(self, symbol: str, counter_currency: str | None = None) -> str:
|
|
|
|
|
+ base = symbol.replace("/", "").upper() if "/" not in symbol else symbol.split("/", 1)[0].upper()
|
|
|
|
|
+ quote = (counter_currency or "USD").upper()
|
|
|
|
|
+ return f"{base}/{quote}"
|
|
|
|
|
+
|
|
|
def fetch_quote(self, symbol: str) -> Quote | None:
|
|
def fetch_quote(self, symbol: str) -> Quote | None:
|
|
|
normalized = self.normalize_symbol(symbol)
|
|
normalized = self.normalize_symbol(symbol)
|
|
|
response = requests.get(BASE_URL.format(normalized), headers=HEADERS, timeout=5)
|
|
response = requests.get(BASE_URL.format(normalized), headers=HEADERS, timeout=5)
|