| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102 |
- """FastMCP transport wrapper for crypto tools (compliance-first)."""
- from fastapi import FastAPI
- from mcp.server.fastmcp import FastMCP
- from mcp.server.transport_security import TransportSecuritySettings
- import services
- from mcp_tools import MCP_TOOLS
- from cache import get_cache_stats
- mcp = FastMCP(
- "crypto-mcp",
- transport_security=TransportSecuritySettings(
- enable_dns_rebinding_protection=False,
- ),
- )
- @mcp.tool(
- description=(
- "Return the latest USD spot price for a symbol (e.g., BTC, ETH). Uses Binance as primary data source "
- "with CoinGecko fallback; payload includes symbol, price, and provider timestamp."
- )
- )
- async def get_price(symbol: str):
- return await services.get_price(symbol)
- @mcp.tool(
- description=(
- "Fetch OHLCV candles for a symbol/timeframe (1m, 5m, 15m, 1h, 4h, 1d). "
- "Limit can range from 1-500 candles; candles are returned oldest→newest with [timestamp, open, high, low, close, volume]."
- )
- )
- async def get_ohlcv(symbol: str, timeframe: str = "1h", limit: int = 100):
- return await services.get_ohlcv(symbol, timeframe, limit)
- @mcp.tool(
- description=(
- "Compute a technical indicator for a symbol/timeframe. Supported names: rsi, ema, sma, macd, atr, bollinger, vwap. "
- "Pass params such as period, fast_period/slow_period/signal_period (MACD) or multiplier (Bollinger). Output echoes the indicator name and value object."
- )
- )
- async def get_indicator(symbol: str, indicator: str, timeframe: str = "1h", params: dict | None = None):
- return await services.get_indicator(symbol, indicator, timeframe, params or {})
- @mcp.tool(
- description=(
- "Return a lightweight 1h snapshot: price, RSI14, EMA20/50/200, MACD histogram, ATR (abs & %), Bollinger(20,2) bands, "
- "rolling VWAP, and a simple trend_bias derived from EMA20 vs EMA50. Ideal for dashboards needing core stats."
- )
- )
- async def get_market_snapshot(symbol: str):
- return await services.get_market_snapshot(symbol)
- @mcp.tool(
- description=(
- "List top 24h movers (gainers/losers) from the provider feed. Limit 1-50 entries; each item includes symbol, price change %, "
- "and supporting metadata for quick leaderboard views."
- )
- )
- async def get_top_movers(limit: int = 10):
- return await services.get_top_movers(limit)
- @mcp.tool(
- description=(
- "Describe what this server supports: indicator catalog (with params/defaults) plus allowed timeframes "
- "for OHLCV/indicator/regime calls. Use this to self-discover valid inputs before invoking other tools."
- )
- )
- async def get_capabilities():
- return await services.get_capabilities()
- @mcp.tool(
- description=(
- "Return a composite regime snapshot that merges trend (EMA20/50, SMA200), momentum (RSI, MACD histogram), "
- "volatility (ATR + % of price), Bollinger bands, and VWAP for the requested symbol/timeframe. "
- "Fields include derived bull/bear/range states for quick downstream logic."
- )
- )
- async def get_regime(symbol: str, timeframe: str = "1h"):
- return await services.get_regime(symbol, timeframe)
- app = FastAPI(title="Crypto MCP Server")
- app.mount("/mcp", mcp.sse_app())
- @app.get("/")
- def root():
- return {"status": "ok", "transport": "fastmcp+sse", "mount": "/mcp", "tools": [t["name"] for t in MCP_TOOLS]}
- @app.get("/health")
- def health():
- return {"status": "ok", "cache": get_cache_stats(), "tools": [t["name"] for t in MCP_TOOLS]}
|