| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118 |
- from __future__ import annotations
- import json
- from functools import lru_cache
- from typing import Any
- import httpx
- from news_mcp.entity_normalize import normalize_entity
- class GoogleTrendsRelatedError(RuntimeError):
- pass
- class GoogleTrendsRelatedProvider:
- _EXPLORE_URL = "https://trends.google.com/trends/api/explore"
- _RELATED_URL = "https://trends.google.com/trends/api/widgetdata/relatedsearches/"
- def __init__(self, *, hl: str = "en-US", tz: int = 120, timeout: float = 10.0):
- self.hl = hl
- self.tz = tz
- self.timeout = timeout
- self._headers = {
- "User-Agent": (
- "Mozilla/5.0 (X11; Linux x86_64) "
- "AppleWebKit/537.36 (KHTML, like Gecko) "
- "Chrome/135.0.0.0 Safari/537.36"
- ),
- "Accept": "application/json,text/javascript,*/*;q=0.1",
- }
- def _request(self, url: str, params: dict[str, Any]) -> dict[str, Any]:
- response = httpx.get(
- url,
- params=params,
- headers=self._headers,
- timeout=self.timeout,
- follow_redirects=True,
- )
- response.raise_for_status()
- text = response.text.strip()
- if text.startswith(")]}',"):
- text = text[5:]
- return json.loads(text)
- def _fetch_widget(self, keyword: str, time_window: str) -> dict[str, Any] | None:
- req_payload = {
- "comparisonItem": [
- {
- "keyword": keyword,
- "geo": "",
- "time": time_window,
- }
- ],
- "category": 0,
- "property": "",
- }
- params = {
- "hl": self.hl,
- "tz": str(self.tz),
- "req": json.dumps(req_payload, separators=(",", ":")),
- "property": "",
- }
- data = self._request(self._EXPLORE_URL, params)
- widgets = (data.get("widgets") or []) if isinstance(data, dict) else []
- for widget in widgets:
- if widget.get("id") == "RELATED_QUERIES":
- return widget
- return None
- def related_topics(self, keyword: str, *, time_window: str = "now 7-d", limit: int = 10) -> list[dict[str, Any]]:
- widget = self._fetch_widget(keyword, time_window)
- if not widget:
- return []
- request_payload = widget.get("request") or {}
- token = widget.get("token")
- if not request_payload or not token:
- return []
- params = {
- "hl": self.hl,
- "tz": str(self.tz),
- "req": json.dumps(request_payload, separators=(",", ":")),
- "token": token,
- }
- data = self._request(self._RELATED_URL, params)
- ranked = []
- ranked_lists = data.get("default", {}).get("rankedList", []) if isinstance(data, dict) else []
- for ranked_list in ranked_lists:
- for item in ranked_list.get("rankedKeyword", []):
- topic = item.get("topic") or {}
- title = topic.get("title") or item.get("query")
- if not title:
- continue
- ranked.append(
- {
- "canonical_label": title,
- "normalized": normalize_entity(title),
- "mid": topic.get("mid"),
- "type": topic.get("type"),
- "value": item.get("value"),
- }
- )
- if len(ranked) >= limit:
- return ranked
- return ranked
- @lru_cache(maxsize=256)
- def get_related_topics(keyword: str, *, time_window: str = "now 7-d", limit: int = 10) -> list[dict[str, Any]]:
- normalized = normalize_entity(keyword)
- if not normalized:
- return []
- provider = GoogleTrendsRelatedProvider()
- try:
- return provider.related_topics(normalized, time_window=time_window, limit=limit)
- except Exception:
- return []
|