trends_related.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. from __future__ import annotations
  2. import json
  3. from functools import lru_cache
  4. from typing import Any
  5. import httpx
  6. from news_mcp.entity_normalize import normalize_entity
  7. class GoogleTrendsRelatedError(RuntimeError):
  8. pass
  9. class GoogleTrendsRelatedProvider:
  10. _EXPLORE_URL = "https://trends.google.com/trends/api/explore"
  11. _RELATED_URL = "https://trends.google.com/trends/api/widgetdata/relatedsearches/"
  12. def __init__(self, *, hl: str = "en-US", tz: int = 120, timeout: float = 10.0):
  13. self.hl = hl
  14. self.tz = tz
  15. self.timeout = timeout
  16. self._headers = {
  17. "User-Agent": (
  18. "Mozilla/5.0 (X11; Linux x86_64) "
  19. "AppleWebKit/537.36 (KHTML, like Gecko) "
  20. "Chrome/135.0.0.0 Safari/537.36"
  21. ),
  22. "Accept": "application/json,text/javascript,*/*;q=0.1",
  23. }
  24. def _request(self, url: str, params: dict[str, Any]) -> dict[str, Any]:
  25. response = httpx.get(
  26. url,
  27. params=params,
  28. headers=self._headers,
  29. timeout=self.timeout,
  30. follow_redirects=True,
  31. )
  32. response.raise_for_status()
  33. text = response.text.strip()
  34. if text.startswith(")]}',"):
  35. text = text[5:]
  36. return json.loads(text)
  37. def _fetch_widget(self, keyword: str, time_window: str) -> dict[str, Any] | None:
  38. req_payload = {
  39. "comparisonItem": [
  40. {
  41. "keyword": keyword,
  42. "geo": "",
  43. "time": time_window,
  44. }
  45. ],
  46. "category": 0,
  47. "property": "",
  48. }
  49. params = {
  50. "hl": self.hl,
  51. "tz": str(self.tz),
  52. "req": json.dumps(req_payload, separators=(",", ":")),
  53. "property": "",
  54. }
  55. data = self._request(self._EXPLORE_URL, params)
  56. widgets = (data.get("widgets") or []) if isinstance(data, dict) else []
  57. for widget in widgets:
  58. if widget.get("id") == "RELATED_QUERIES":
  59. return widget
  60. return None
  61. def related_topics(self, keyword: str, *, time_window: str = "now 7-d", limit: int = 10) -> list[dict[str, Any]]:
  62. widget = self._fetch_widget(keyword, time_window)
  63. if not widget:
  64. return []
  65. request_payload = widget.get("request") or {}
  66. token = widget.get("token")
  67. if not request_payload or not token:
  68. return []
  69. params = {
  70. "hl": self.hl,
  71. "tz": str(self.tz),
  72. "req": json.dumps(request_payload, separators=(",", ":")),
  73. "token": token,
  74. }
  75. data = self._request(self._RELATED_URL, params)
  76. ranked = []
  77. ranked_lists = data.get("default", {}).get("rankedList", []) if isinstance(data, dict) else []
  78. for ranked_list in ranked_lists:
  79. for item in ranked_list.get("rankedKeyword", []):
  80. topic = item.get("topic") or {}
  81. title = topic.get("title") or item.get("query")
  82. if not title:
  83. continue
  84. ranked.append(
  85. {
  86. "canonical_label": title,
  87. "normalized": normalize_entity(title),
  88. "mid": topic.get("mid"),
  89. "type": topic.get("type"),
  90. "value": item.get("value"),
  91. }
  92. )
  93. if len(ranked) >= limit:
  94. return ranked
  95. return ranked
  96. @lru_cache(maxsize=256)
  97. def get_related_topics(keyword: str, *, time_window: str = "now 7-d", limit: int = 10) -> list[dict[str, Any]]:
  98. normalized = normalize_entity(keyword)
  99. if not normalized:
  100. return []
  101. provider = GoogleTrendsRelatedProvider()
  102. try:
  103. return provider.related_topics(normalized, time_window=time_window, limit=limit)
  104. except Exception:
  105. return []