| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294 |
- from __future__ import annotations
- from fastapi import FastAPI
- from mcp.server.fastmcp import FastMCP
- from mcp.server.transport_security import TransportSecuritySettings
- from trends_mcp.aliases import normalize_entity
- from trends_mcp.cache import cache_stats, get_cache, set_cache
- from trends_mcp.ledger import entity_history, prune_snapshots, read_recent, store_snapshot, summarize
- from trends_mcp.providers.google_trends import GoogleTrendsError, GoogleTrendsProvider
- mcp = FastMCP(
- "trends-mcp",
- transport_security=TransportSecuritySettings(enable_dns_rebinding_protection=False),
- )
- provider = GoogleTrendsProvider()
- CACHE_TTL_SECONDS = 30 * 60
- def _trend_label(series: list[int]) -> str:
- if not series:
- return "flat"
- first, last = series[0], series[-1]
- if last - first >= 10:
- return "rising"
- if first - last >= 10:
- return "falling"
- return "flat"
- @mcp.tool(description="Experimental: show attention trend for a keyword or entity over time.")
- async def get_interest_over_time(keyword: str, timeframe: str = "7d"):
- keyword_norm = normalize_entity(keyword)
- cache_key = f"interest:{keyword_norm}:{timeframe}"
- cached = get_cache(cache_key)
- if cached:
- return cached
- try:
- result = provider.interest_over_time(keyword, timeframe)
- except GoogleTrendsError as exc:
- return {
- "keyword": keyword,
- "normalized_keyword": keyword_norm,
- "timeframe": timeframe,
- "error": str(exc),
- }
- payload = {
- "keyword": keyword,
- "normalized_keyword": keyword_norm,
- "timeframe": timeframe,
- "series": result.series,
- "trend": _trend_label(result.series),
- "fetched_at": result.fetched_at,
- }
- set_cache(cache_key, payload, CACHE_TTL_SECONDS)
- return payload
- @mcp.tool(description="Resolve an entity to Knowledge Graph MID candidates and a best canonical label.")
- async def resolve_entity(keyword: str):
- cache_key = f"resolve:{normalize_entity(keyword)}"
- cached = get_cache(cache_key)
- if cached:
- return cached
- suggestions = provider.suggestions(keyword)
- best = suggestions[0] if suggestions else None
- payload = {
- "keyword": keyword,
- "canonical_label": best.get("title") if best else normalize_entity(keyword),
- "mid": best.get("mid") if best else None,
- "type": best.get("type") if best else None,
- "candidates": suggestions,
- }
- set_cache(cache_key, payload, 24 * 60 * 60)
- store_snapshot(
- tool="resolve_entity",
- keyword=keyword,
- normalized_keyword=normalize_entity(keyword),
- mid=payload["mid"],
- canonical_label=payload["canonical_label"],
- payload=payload,
- )
- return payload
- @mcp.tool(description="Get related search queries for an entity.")
- async def get_related_queries(keyword: str):
- cache_key = f"related:{normalize_entity(keyword)}"
- cached = get_cache(cache_key)
- if cached:
- return cached
- entity_info = await resolve_entity(keyword)
- related = provider.related_queries(keyword)
- out = related.get(keyword) or related.get(normalize_entity(keyword)) or {}
- def _rows(df):
- if df is None:
- return []
- try:
- return df.reset_index().to_dict(orient="records")
- except Exception:
- return []
- payload = {
- "keyword": keyword,
- "top": _rows(out.get("top") if isinstance(out, dict) else None),
- "rising": _rows(out.get("rising") if isinstance(out, dict) else None),
- }
- set_cache(cache_key, payload, 24 * 60 * 60)
- store_snapshot(
- tool="get_related_queries",
- keyword=keyword,
- normalized_keyword=normalize_entity(keyword),
- mid=entity_info.get("mid"),
- canonical_label=entity_info.get("canonical_label"),
- payload=payload,
- )
- return payload
- @mcp.tool(description="Get related topics for an entity.")
- async def get_related_topics(keyword: str):
- cache_key = f"topics:{normalize_entity(keyword)}"
- cached = get_cache(cache_key)
- if cached:
- return cached
- entity_info = await resolve_entity(keyword)
- try:
- related = provider.related_topics(keyword)
- out = related.get(keyword) or related.get(normalize_entity(keyword)) or {}
- except GoogleTrendsError:
- # pytrends' related_topics is flaky; fall back to related_queries so the tool stays useful.
- related = provider.related_queries(keyword)
- out = related.get(keyword) or related.get(normalize_entity(keyword)) or {}
- def _rows(df):
- if df is None:
- return []
- try:
- return df.reset_index().to_dict(orient="records")
- except Exception:
- return []
- payload = {
- "keyword": keyword,
- "top": _rows(out.get("top") if isinstance(out, dict) else None),
- "rising": _rows(out.get("rising") if isinstance(out, dict) else None),
- }
- set_cache(cache_key, payload, 24 * 60 * 60)
- store_snapshot(
- tool="get_related_topics",
- keyword=keyword,
- normalized_keyword=normalize_entity(keyword),
- mid=entity_info.get("mid"),
- canonical_label=entity_info.get("canonical_label"),
- payload=payload,
- )
- return payload
- @mcp.tool(description="Read the most recent ledger events.")
- async def get_ledger_recent(limit: int = 50):
- return read_recent(limit=max(1, min(int(limit), 200)))
- @mcp.tool(description="Summarize what the ledger is saying.")
- async def get_ledger_summary(limit: int = 500):
- return summarize(limit=max(1, min(int(limit), 2000)))
- @mcp.tool(description="Show the ledger history for one entity or MID.")
- async def get_entity_history(entity: str, limit: int = 500):
- return entity_history(entity, limit=max(1, min(int(limit), 2000)))
- @mcp.tool(description="Prune stored snapshots older than the configured retention window.")
- async def prune_history(retention_days: int = 30):
- deleted = prune_snapshots(retention_days=max(1, min(int(retention_days), 3650)))
- return {"deleted": deleted, "retention_days": retention_days}
- @mcp.tool(description="Compare attention between multiple keywords or entities.")
- async def compare_interest(keywords: list[str], timeframe: str = "7d"):
- if not keywords:
- return {"winner": None, "ratios": {}}
- normalized = [normalize_entity(k) for k in keywords]
- series_map = {}
- for original, keyword in zip(keywords, normalized):
- cache_key = f"interest:{keyword}:{timeframe}"
- cached = get_cache(cache_key)
- if cached:
- series = cached["series"]
- else:
- series = provider.interest_over_time(original, timeframe).series
- set_cache(
- cache_key,
- {
- "keyword": original,
- "normalized_keyword": keyword,
- "timeframe": timeframe,
- "series": series,
- "trend": _trend_label(series),
- },
- CACHE_TTL_SECONDS,
- )
- series_map[keyword] = series
- scores = {k: sum(v) for k, v in series_map.items()}
- winner = max(scores, key=scores.get)
- top_score = float(scores[winner]) or 1.0
- ratios = {k: round(v / top_score, 3) for k, v in scores.items()}
- return {"winner": winner, "ratios": ratios, "timeframe": timeframe}
- @mcp.tool(description="Get a compact attention score for a known entity.")
- async def get_attention_score(entity: str, timeframe: str = "24h"):
- normalized = normalize_entity(entity)
- try:
- series = provider.interest_over_time(entity, timeframe).series
- except GoogleTrendsError as exc:
- return {"entity": entity, "normalized_entity": normalized, "error": str(exc), "timeframe": timeframe}
- score = round(sum(series) / (len(series) * 100), 3)
- baseline = round(series[-1] / max(1, series[0]), 3) if series[0] else float(series[-1])
- return {
- "entity": entity,
- "normalized_entity": normalized,
- "score": score,
- "relative_to_baseline": baseline,
- "timeframe": timeframe,
- }
- app = FastAPI(title="Trends MCP Server")
- app.mount("/mcp", mcp.sse_app())
- @app.get("/")
- def root():
- return {
- "status": "ok",
- "transport": "fastmcp+sse",
- "mount": "/mcp",
- "tools": ["resolve_entity", "get_related_queries", "get_related_topics", "get_ledger_recent", "get_ledger_summary", "get_entity_history", "prune_history", "get_interest_over_time", "compare_interest", "get_attention_score"],
- }
- @mcp.tool(description="Debug Google Trends connectivity, suggestions, and timeframe handling.")
- async def debug_google_trends(keyword: str, timeframe: str = "7d"):
- keyword_norm = normalize_entity(keyword)
- try:
- suggestions = provider.suggestions(keyword)
- except GoogleTrendsError as exc:
- suggestions = {"error": str(exc)}
- try:
- result = provider.interest_over_time(keyword, timeframe)
- payload = {
- "keyword": keyword,
- "normalized_keyword": keyword_norm,
- "timeframe": timeframe,
- "suggestions": suggestions,
- "series": result.series,
- "trend": _trend_label(result.series),
- "fetched_at": result.fetched_at,
- }
- except GoogleTrendsError as exc:
- payload = {
- "keyword": keyword,
- "normalized_keyword": keyword_norm,
- "timeframe": timeframe,
- "suggestions": suggestions,
- "error": str(exc),
- }
- return payload
- @app.get("/health")
- def health():
- return {"status": "ok", "service": "trends-mcp", "cache": cache_stats()}
- def main():
- import uvicorn
- uvicorn.run("trends_mcp.mcp_server_fastmcp:app", host="0.0.0.0", port=8507, reload=False)
- if __name__ == "__main__":
- main()
|