from __future__ import annotations from fastapi import FastAPI from mcp.server.fastmcp import FastMCP from mcp.server.transport_security import TransportSecuritySettings from trends_mcp.aliases import normalize_entity from trends_mcp.cache import cache_stats, get_cache, set_cache from trends_mcp.ledger import entity_history, prune_snapshots, read_recent, store_snapshot, summarize from trends_mcp.providers.google_trends import GoogleTrendsError, GoogleTrendsProvider mcp = FastMCP( "trends-mcp", transport_security=TransportSecuritySettings(enable_dns_rebinding_protection=False), ) provider = GoogleTrendsProvider() CACHE_TTL_SECONDS = 30 * 60 def _trend_label(series: list[int]) -> str: if not series: return "flat" first, last = series[0], series[-1] if last - first >= 10: return "rising" if first - last >= 10: return "falling" return "flat" @mcp.tool(description="Experimental: show attention trend for a keyword or entity over time.") async def get_interest_over_time(keyword: str, timeframe: str = "7d"): keyword_norm = normalize_entity(keyword) cache_key = f"interest:{keyword_norm}:{timeframe}" cached = get_cache(cache_key) if cached: return cached try: result = provider.interest_over_time(keyword, timeframe) except GoogleTrendsError as exc: return { "keyword": keyword, "normalized_keyword": keyword_norm, "timeframe": timeframe, "error": str(exc), } payload = { "keyword": keyword, "normalized_keyword": keyword_norm, "timeframe": timeframe, "series": result.series, "trend": _trend_label(result.series), "fetched_at": result.fetched_at, } set_cache(cache_key, payload, CACHE_TTL_SECONDS) return payload @mcp.tool(description="Resolve an entity to Knowledge Graph MID candidates and a best canonical label.") async def resolve_entity(keyword: str): cache_key = f"resolve:{normalize_entity(keyword)}" cached = get_cache(cache_key) if cached: return cached suggestions = provider.suggestions(keyword) best = suggestions[0] if suggestions else None payload = { "keyword": keyword, "canonical_label": best.get("title") if best else normalize_entity(keyword), "mid": best.get("mid") if best else None, "type": best.get("type") if best else None, "candidates": suggestions, } set_cache(cache_key, payload, 24 * 60 * 60) store_snapshot( tool="resolve_entity", keyword=keyword, normalized_keyword=normalize_entity(keyword), mid=payload["mid"], canonical_label=payload["canonical_label"], payload=payload, ) return payload @mcp.tool(description="Get related search queries for an entity.") async def get_related_queries(keyword: str): cache_key = f"related:{normalize_entity(keyword)}" cached = get_cache(cache_key) if cached: return cached related = provider.related_queries(keyword) out = related.get(keyword) or related.get(normalize_entity(keyword)) or {} def _rows(df): if df is None: return [] try: return df.reset_index().to_dict(orient="records") except Exception: return [] payload = { "keyword": keyword, "top": _rows(out.get("top") if isinstance(out, dict) else None), "rising": _rows(out.get("rising") if isinstance(out, dict) else None), } set_cache(cache_key, payload, 24 * 60 * 60) store_snapshot( tool="get_related_queries", keyword=keyword, normalized_keyword=normalize_entity(keyword), mid=None, canonical_label=None, payload=payload, ) return payload @mcp.tool(description="Get related topics for an entity.") async def get_related_topics(keyword: str): cache_key = f"topics:{normalize_entity(keyword)}" cached = get_cache(cache_key) if cached: return cached try: related = provider.related_topics(keyword) out = related.get(keyword) or related.get(normalize_entity(keyword)) or {} except GoogleTrendsError: # pytrends' related_topics is flaky; fall back to related_queries so the tool stays useful. related = provider.related_queries(keyword) out = related.get(keyword) or related.get(normalize_entity(keyword)) or {} def _rows(df): if df is None: return [] try: return df.reset_index().to_dict(orient="records") except Exception: return [] payload = { "keyword": keyword, "top": _rows(out.get("top") if isinstance(out, dict) else None), "rising": _rows(out.get("rising") if isinstance(out, dict) else None), } set_cache(cache_key, payload, 24 * 60 * 60) store_snapshot( tool="get_related_topics", keyword=keyword, normalized_keyword=normalize_entity(keyword), mid=None, canonical_label=None, payload=payload, ) return payload @mcp.tool(description="Read the most recent ledger events.") async def get_ledger_recent(limit: int = 50): return read_recent(limit=max(1, min(int(limit), 200))) @mcp.tool(description="Summarize what the ledger is saying.") async def get_ledger_summary(limit: int = 500): return summarize(limit=max(1, min(int(limit), 2000))) @mcp.tool(description="Show the ledger history for one entity or MID.") async def get_entity_history(entity: str, limit: int = 500): return entity_history(entity, limit=max(1, min(int(limit), 2000))) @mcp.tool(description="Prune stored snapshots older than the configured retention window.") async def prune_history(retention_days: int = 30): deleted = prune_snapshots(retention_days=max(1, min(int(retention_days), 3650))) return {"deleted": deleted, "retention_days": retention_days} @mcp.tool(description="Compare attention between multiple keywords or entities.") async def compare_interest(keywords: list[str], timeframe: str = "7d"): if not keywords: return {"winner": None, "ratios": {}} normalized = [normalize_entity(k) for k in keywords] series_map = {} for original, keyword in zip(keywords, normalized): cache_key = f"interest:{keyword}:{timeframe}" cached = get_cache(cache_key) if cached: series = cached["series"] else: series = provider.interest_over_time(original, timeframe).series set_cache( cache_key, { "keyword": original, "normalized_keyword": keyword, "timeframe": timeframe, "series": series, "trend": _trend_label(series), }, CACHE_TTL_SECONDS, ) series_map[keyword] = series scores = {k: sum(v) for k, v in series_map.items()} winner = max(scores, key=scores.get) top_score = float(scores[winner]) or 1.0 ratios = {k: round(v / top_score, 3) for k, v in scores.items()} return {"winner": winner, "ratios": ratios, "timeframe": timeframe} @mcp.tool(description="Get a compact attention score for a known entity.") async def get_attention_score(entity: str, timeframe: str = "24h"): normalized = normalize_entity(entity) try: series = provider.interest_over_time(entity, timeframe).series except GoogleTrendsError as exc: return {"entity": entity, "normalized_entity": normalized, "error": str(exc), "timeframe": timeframe} score = round(sum(series) / (len(series) * 100), 3) baseline = round(series[-1] / max(1, series[0]), 3) if series[0] else float(series[-1]) return { "entity": entity, "normalized_entity": normalized, "score": score, "relative_to_baseline": baseline, "timeframe": timeframe, } app = FastAPI(title="Trends MCP Server") app.mount("/mcp", mcp.sse_app()) @app.get("/") def root(): return { "status": "ok", "transport": "fastmcp+sse", "mount": "/mcp", "tools": ["resolve_entity", "get_related_queries", "get_related_topics", "get_ledger_recent", "get_ledger_summary", "get_entity_history", "prune_history", "get_interest_over_time", "compare_interest", "get_attention_score"], } @mcp.tool(description="Debug Google Trends connectivity, suggestions, and timeframe handling.") async def debug_google_trends(keyword: str, timeframe: str = "7d"): keyword_norm = normalize_entity(keyword) try: suggestions = provider.suggestions(keyword) except GoogleTrendsError as exc: suggestions = {"error": str(exc)} try: result = provider.interest_over_time(keyword, timeframe) payload = { "keyword": keyword, "normalized_keyword": keyword_norm, "timeframe": timeframe, "suggestions": suggestions, "series": result.series, "trend": _trend_label(result.series), "fetched_at": result.fetched_at, } except GoogleTrendsError as exc: payload = { "keyword": keyword, "normalized_keyword": keyword_norm, "timeframe": timeframe, "suggestions": suggestions, "error": str(exc), } return payload @app.get("/health") def health(): return {"status": "ok", "service": "trends-mcp", "cache": cache_stats()} def main(): import uvicorn uvicorn.run("trends_mcp.mcp_server_fastmcp:app", host="0.0.0.0", port=8507, reload=False) if __name__ == "__main__": main()