Bladeren bron

Refactor trends-mcp into MID-first entity context server with SQLite snapshot history

Lukas Goldschmidt 1 maand geleden
commit
40b0e41d7f
19 gewijzigde bestanden met toevoegingen van 1397 en 0 verwijderingen
  1. 7 0
      .gitignore
  2. 359 0
      OUTLOOK.md
  3. 81 0
      PROJECT.md
  4. 128 0
      README.md
  5. 39 0
      killserver.sh
  6. 20 0
      live_tests.sh
  7. 20 0
      pyproject.toml
  8. 7 0
      requirements.txt
  9. 5 0
      restart.sh
  10. 33 0
      run.sh
  11. 31 0
      test_trends_mcp.py
  12. 16 0
      tests.sh
  13. 1 0
      trends_mcp/__init__.py
  14. 16 0
      trends_mcp/aliases.py
  15. 25 0
      trends_mcp/cache.py
  16. 7 0
      trends_mcp/entity_map.json
  17. 208 0
      trends_mcp/ledger.py
  18. 292 0
      trends_mcp/mcp_server_fastmcp.py
  19. 102 0
      trends_mcp/providers/google_trends.py

+ 7 - 0
.gitignore

@@ -0,0 +1,7 @@
+__pycache__/
+*.pyc
+.venv/
+logs/
+data/
+server.pid
+.env

+ 359 - 0
OUTLOOK.md

@@ -0,0 +1,359 @@
+# 📈 Trends MCP Server — Requirements Spec
+
+## 🎯 Goal
+
+Provide **free, normalized entity intelligence**
+for:
+
+* keywords
+* entities (BTC, AI, ETFs, etc.)
+* topics
+
+👉 This is not a paid trend terminal
+👉 It’s an **entity resolution + attention context engine**
+
+## First-order priority
+
+The most reliable value in this project is no longer historical trend curves.
+It is **entity normalization**, **topic disambiguation**, and **related-query discovery**.
+
+If Google refuses time-series requests, the MCP should still remain useful by:
+
+* returning Knowledge Graph MID candidates
+* resolving canonical entity labels
+* surfacing related queries and topics
+* providing topic-aware context for downstream MCPs
+* keeping a SQLite snapshot history for temporal inspection and later diffing
+
+---
+
+# 🧠 Core Insight
+
+> Markets often move *after* attention increases.
+
+So this MCP helps answer:
+
+* “Is something gaining attention?”
+* “Is this narrative heating up?”
+* “Is attention diverging from price?”
+
+---
+
+# 🏗️ 1. Internal Architecture
+
+## 📊 Data Sources (`providers/`)
+
+Primary:
+
+* Google Trends (via unofficial APIs like `pytrends`)
+
+Optional later:
+
+* Twitter/X trends
+* Reddit mentions
+* YouTube search trends
+
+---
+
+## 🔄 Normalization Layer (CRITICAL)
+
+### Problem:
+
+Google Trends data is:
+
+* relative (0–100)
+* inconsistent across queries
+
+### Solution:
+
+Normalize across:
+
+* timeframes
+* keywords
+
+Techniques:
+
+* anchor keywords (e.g. always compare vs “bitcoin”)
+* rescaling across batches
+
+---
+
+## 🧠 Entity Mapping Layer
+
+Map:
+
+* “btc”, “bitcoin”, “bitcoin price” → **BTC**
+
+👉 This is essential for consistency with your other MCPs
+
+---
+
+## 🗃️ Storage / Cache
+
+Cache trend series:
+
+Key:
+
+```id="t1"
+trends:{keyword}:{timeframe}
+```
+
+TTL:
+
+* 15–60 minutes (trends don’t change second-by-second)
+
+---
+
+# 🧰 2. Agent-Facing Tools
+
+Keep them **interpretive, not raw**
+
+---
+
+## 1. `get_interest_over_time`
+
+> “Show attention trend”
+
+```json id="t2"
+{
+  "keyword": "bitcoin",
+  "timeframe": "7d"
+}
+```
+
+Output:
+
+```json id="t3"
+{
+  "series": [12, 18, 25, 40, 65, 80],
+  "trend": "rising"
+}
+```
+
+👉 Include simple interpretation (“rising”, “falling”, “flat”)
+
+---
+
+## 2. `compare_interest`
+
+> “Which topic is hotter?”
+
+```json id="t4"
+{
+  "keywords": ["bitcoin", "ethereum"],
+  "timeframe": "7d"
+}
+```
+
+Output:
+
+```json id="t5"
+{
+  "winner": "bitcoin",
+  "ratios": {
+    "bitcoin": 1.0,
+    "ethereum": 0.72
+  }
+}
+```
+
+---
+
+## 3. `detect_trending_entities`
+
+> “What is gaining attention?”
+
+```json id="t6"
+{
+  "category": "crypto"
+}
+```
+
+Output:
+
+```json id="t7"
+[
+  {
+    "entity": "Solana",
+    "trend_score": 0.91,
+    "velocity": "high"
+  }
+]
+```
+
+---
+
+## 4. `get_related_queries`
+
+> “What are people searching around this?”
+
+```json id="t8"
+{
+  "keyword": "bitcoin"
+}
+```
+
+Output:
+
+```json id="t9"
+[
+  "bitcoin etf",
+  "bitcoin price prediction",
+  "btc news"
+]
+```
+
+---
+
+## 5. `get_attention_score` (VERY useful)
+
+> “How much attention does X have right now?”
+
+```json id="t10"
+{
+  "entity": "BTC"
+}
+```
+
+Output:
+
+```json id="t11"
+{
+  "score": 0.78,
+  "relative_to_baseline": 1.4
+}
+```
+
+---
+
+# ⚠️ 3. What NOT to expose
+
+Avoid:
+
+* raw Google Trends responses
+* overly granular time series
+* unnormalized keyword data
+
+❌ Bad:
+
+```id="bad-trends"
+get_raw_trends(keyword)
+```
+
+---
+
+# 🧠 4. Key Challenges
+
+## 1. Normalization (hardest problem)
+
+Trends are:
+
+* relative per query
+* not directly comparable
+
+👉 If you skip normalization:
+your MCP becomes misleading
+
+---
+
+## 2. Keyword ambiguity
+
+Example:
+
+* “apple” → company or fruit?
+
+👉 You need:
+
+* context-aware mapping
+* or restrict to known entities (better for v1)
+
+---
+
+## 3. Sparse data
+
+Some queries:
+
+* have low volume
+* return noisy signals
+
+👉 filter aggressively
+
+---
+
+# ⚡ 5. Signal Engineering (where value comes from)
+
+Raw trend data is weak.
+
+Value comes from:
+
+---
+
+## 📈 Trend direction
+
+* rising / falling / flat
+
+---
+
+## 🚀 Velocity
+
+* how fast attention increases
+
+---
+
+## 🔥 Spike detection
+
+* sudden jumps
+
+---
+
+## ⚖️ Relative strength
+
+* vs other entities
+
+---
+
+# 🧩 6. Relationship to Other MCPs
+
+This is your **early warning system**
+
+Combined with:
+
+* Crypto MCP → confirms movement
+* News MCP → explains movement
+
+---
+
+## 🔥 Example synergy:
+
+Trends MCP:
+→ “Ethereum ETF” searches spiking
+
+News MCP:
+→ few articles yet
+
+Crypto MCP:
+→ price still flat
+
+👉 This is:
+
+> **pre-news signal**
+
+---
+
+# 🧭 7. Design Philosophy
+
+Each tool should answer:
+
+> “Where is attention moving?”
+
+---
+
+# 🚀 8. Suggested Build Order
+
+1. basic keyword trend fetch
+2. caching
+3. simple slope detection (rising/falling)
+4. entity mapping (BTC, ETH, etc.)
+5. comparison tool
+
+👉 normalization improvements can come later

+ 81 - 0
PROJECT.md

@@ -0,0 +1,81 @@
+# Trends MCP — Project Notes
+
+## Purpose
+
+Turn noisy search/trend data into **clean entity intelligence** that other agents can reason about.
+
+This is not a raw data API. It should answer questions like:
+- What entity is this mention most likely referring to?
+- What is the canonical Google MID?
+- What related queries/topics surround this entity?
+- When trend data is available, is interest rising or falling?
+
+The project now keeps a SQLite snapshot history so we can inspect how related-query/topic
+surfaces change over time, before wiring anything into news-mcp.
+
+## Core principles
+
+- Prefer **normalized** outputs over raw provider payloads.
+- Prefer **semantic tools** over low-level endpoints.
+- Keep the response shape compact and predictable.
+- Treat entity mapping as foundational, not optional.
+
+## Tech stack
+
+- Python
+- FastMCP
+- FastAPI
+- Uvicorn
+- `pytrends` for the first provider
+
+## Current server contract
+
+- host: `0.0.0.0`
+- port: `8507`
+- transport: FastMCP over SSE
+- mount: `/mcp`
+
+## Scripts
+
+- `run.sh` starts the server
+- `killserver.sh` stops a PID stored in `server.pid`
+- `restart.sh` performs kill + run only
+
+## Initial tool set
+
+- `get_interest_over_time(keyword, timeframe)`
+- `compare_interest(keywords, timeframe)`
+- `get_attention_score(entity, timeframe)`
+- `resolve_entity(keyword)`
+- `get_related_queries(keyword)`
+- `get_related_topics(keyword)`
+- `get_ledger_recent(limit)`
+- `get_ledger_summary(limit)`
+- `get_entity_history(entity, limit)`
+- `prune_history(retention_days)`
+
+## History storage
+
+History is stored in a single SQLite table (`snapshots`) under `data/trends_history.db`.
+Each row stores the full normalized tool payload so later analysis can diff change over time,
+instead of only counting that a lookup happened.
+
+Pruning is automatic: the history store checks once per day whether retention cleanup is due,
+and removes rows older than the configured retention window.
+
+## Recommended build order
+
+1. real provider adapter
+2. cache layer
+3. entity aliases / normalization
+4. trend direction heuristics
+5. comparison and spike detection
+6. broader trend discovery tools
+
+## Notes for future readers
+
+The right implementation style here is boring on purpose:
+- small modules
+- predictable data flow
+- no heavy abstraction until it earns its keep
+- interpretive output, not provider noise

+ 128 - 0
README.md

@@ -0,0 +1,128 @@
+# Trends MCP Server
+
+A FastMCP-based server for **free entity normalization and attention context**.
+
+This project is intentionally opinionated:
+- it exposes **semantic tools**, not raw provider dumps
+- it treats Google Knowledge Graph MIDs as canonical entity hints
+- it is meant to pair with news and crypto MCPs
+- it should remain useful even when historical trend requests are flaky
+
+## What it does
+
+Current tools:
+- `resolve_entity` — return candidate entity matches and the best Knowledge Graph MID
+- `get_related_queries` — show the search terms surrounding an entity
+- `get_related_topics` — show the topic neighborhood around an entity
+- `get_ledger_recent` — inspect the newest history entries
+- `get_ledger_summary` — summarize what the ledger is saying
+- `get_entity_history` — inspect history for one entity or MID
+- `prune_history` — delete stored snapshots older than the retention window
+- `get_interest_over_time` — show attention over time for a keyword or entity
+- `compare_interest` — compare attention between multiple keywords or entities
+- `get_attention_score` — compact attention score for a known entity
+
+## Run
+
+### With uv
+```bash
+uv sync
+./run.sh
+```
+
+### With pip
+```bash
+pip install -r requirements.txt
+./run.sh
+```
+
+## mcporter
+
+If you use `mcporter`, point it at:
+
+```bash
+--conf ~/.openclaw/workspace/conf/mcporter.json
+```
+
+Example smoke test:
+
+```bash
+$ mcporter --config ~/.openclaw/workspace/config/mcporter.json call trends.get_interest_over_time keyword=bitcoin timeframe=7d
+```
+
+Entity resolution example:
+
+```bash
+$ mcporter --config ~/.openclaw/workspace/config/mcporter.json call trends.resolve_entity keyword=bitcoin
+```
+
+Related queries example:
+
+```bash
+mcporter --config ~/.openclaw/workspace/config/mcporter.json call trends.get_related_queries keyword=bitcoin
+```
+
+Related topics example:
+
+```bash
+mcporter --config ~/.openclaw/workspace/config/mcporter.json call trends.get_related_topics keyword=bitcoin
+```
+
+History example:
+
+```bash
+mcporter --config ~/.openclaw/workspace/config/mcporter.json call trends.get_entity_history entity=BTC
+```
+
+Suggested smoke tests:
+
+```bash
+python -m pytest
+mcporter --config ~/.openclaw/workspace/config/mcporter.json call trends.resolve_entity keyword=bitcoin
+mcporter --config ~/.openclaw/workspace/config/mcporter.json call trends.get_related_queries keyword=bitcoin
+mcporter --config ~/.openclaw/workspace/config/mcporter.json call trends.get_related_topics keyword=bitcoin
+```
+
+Ledger note:
+
+```bash
+sqlite3 data/trends_history.db 'select id, ts, tool, keyword, normalized_keyword, mid from snapshots order by id desc limit 20;'
+```
+
+Prune history example:
+
+```bash
+mcporter --config ~/.openclaw/workspace/config/mcporter.json call trends.prune_history retention_days=30
+```
+
+History pruning runs automatically once per day when snapshots are being written.
+
+The server listens on:
+- `http://0.0.0.0:8507`
+- health: `http://127.0.0.1:8507/health`
+- MCP SSE mount: `http://127.0.0.1:8507/mcp`
+
+## Scripts
+
+- `run.sh` — start the server in the background
+- `killserver.sh` — stop a PID recorded in `logs/server.pid`
+- `restart.sh` — kill, then run again
+
+Logs are written to `logs/server.log`.
+
+## Design notes
+
+The long-term shape of this MCP should be:
+1. provider adapters in `providers/`
+2. entity normalization and aliasing
+3. cache / TTL for recent trend series
+4. simple signal heuristics
+5. MCP tools that return compact, interpretable outputs
+
+## Next obvious work
+
+- replace the fake series generator with a real Google Trends adapter
+- add cache storage
+- load aliases from JSON
+- add `detect_trending_entities`
+- add `get_related_queries`

+ 39 - 0
killserver.sh

@@ -0,0 +1,39 @@
+#!/usr/bin/env bash
+set -euo pipefail
+
+cd "$(dirname "$0")"
+mkdir -p logs
+
+PID_FILE="logs/server.pid"
+TERM_TARGETS=()
+
+if [ -f "$PID_FILE" ]; then
+  pid=$(cat "$PID_FILE" 2>/dev/null || true)
+  if [ -n "${pid:-}" ]; then
+    TERM_TARGETS+=("$pid")
+  fi
+fi
+
+# Catch stale instances that lost their PID file or were started manually.
+while IFS= read -r pid; do
+  [ -n "$pid" ] && TERM_TARGETS+=("$pid")
+done < <(pgrep -f 'uvicorn trends_mcp\.mcp_server_fastmcp:app|trends_mcp\.mcp_server_fastmcp:app' || true)
+
+# De-duplicate and terminate gently first, then force if needed.
+if [ "${#TERM_TARGETS[@]}" -gt 0 ]; then
+  printf '%s\n' "${TERM_TARGETS[@]}" | awk 'NF && !seen[$0]++' | while read -r pid; do
+    if kill -0 "$pid" 2>/dev/null; then
+      kill "$pid" 2>/dev/null || true
+    fi
+  done
+
+  sleep 1
+
+  printf '%s\n' "${TERM_TARGETS[@]}" | awk 'NF && !seen[$0]++' | while read -r pid; do
+    if kill -0 "$pid" 2>/dev/null; then
+      kill -9 "$pid" 2>/dev/null || true
+    fi
+  done
+fi
+
+rm -f "$PID_FILE"

+ 20 - 0
live_tests.sh

@@ -0,0 +1,20 @@
+#!/usr/bin/env bash
+set -euo pipefail
+
+cd "$(dirname "$0")"
+
+if [ -f .env ]; then
+  # shellcheck disable=SC1091
+  source .env
+fi
+
+if [ -f .venv/bin/activate ]; then
+  # shellcheck disable=SC1091
+  source .venv/bin/activate
+fi
+
+CONF="${MCPORTER_CONF:-$HOME/.openclaw/workspace/config/mcporter.json}"
+
+mcporter --config "$CONF" call trends.resolve_entity keyword=bitcoin
+mcporter --config "$CONF" call trends.get_related_queries keyword=bitcoin
+mcporter --config "$CONF" call trends.get_related_topics keyword=bitcoin

+ 20 - 0
pyproject.toml

@@ -0,0 +1,20 @@
+[project]
+name = "trends-mcp"
+version = "0.1.0"
+description = "MCP server for normalized attention signals"
+requires-python = ">=3.11"
+dependencies = [
+  "fastapi>=0.115,<1.0",
+  "mcp>=1.2.0",
+  "pytrends>=4.9.2",
+  "uvicorn>=0.30",
+  "urllib3<2",
+  "numpy<2",
+  "pytest>=8",
+]
+
+[tool.uv]
+package = true
+
+[project.scripts]
+trends-mcp = "trends_mcp.mcp_server_fastmcp:main"

+ 7 - 0
requirements.txt

@@ -0,0 +1,7 @@
+fastapi>=0.115,<1.0
+mcp>=1.2.0
+pytrends>=4.9.2
+uvicorn>=0.30
+urllib3<2
+numpy<2
+pytest>=8

+ 5 - 0
restart.sh

@@ -0,0 +1,5 @@
+#!/usr/bin/env bash
+set -euo pipefail
+
+"$(dirname "$0")/killserver.sh"
+"$(dirname "$0")/run.sh"

+ 33 - 0
run.sh

@@ -0,0 +1,33 @@
+#!/usr/bin/env bash
+set -euo pipefail
+
+cd "$(dirname "$0")"
+mkdir -p logs
+
+if [ -f .venv/bin/activate ]; then
+  # Activate the local environment when present so uvicorn and deps resolve consistently.
+  # shellcheck disable=SC1091
+  source .venv/bin/activate
+fi
+
+PID_FILE="logs/server.pid"
+
+if [ -f "$PID_FILE" ] && kill -0 "$(cat "$PID_FILE")" 2>/dev/null; then
+  echo "server already running with pid $(cat "$PID_FILE")" >&2
+  exit 0
+fi
+
+if command -v uv >/dev/null 2>&1; then
+  nohup uv run uvicorn trends_mcp.mcp_server_fastmcp:app --host 0.0.0.0 --port 8507 > logs/server.log 2>&1 &
+  echo $! > "$PID_FILE"
+  exit 0
+fi
+
+if [ -x .venv/bin/python ]; then
+  PYTHON=.venv/bin/python
+else
+  PYTHON=python3
+fi
+
+nohup "$PYTHON" -m uvicorn trends_mcp.mcp_server_fastmcp:app --host 0.0.0.0 --port 8507 > logs/server.log 2>&1 &
+echo $! > "$PID_FILE"

+ 31 - 0
test_trends_mcp.py

@@ -0,0 +1,31 @@
+from pathlib import Path
+
+from trends_mcp.aliases import normalize_entity
+from trends_mcp import ledger
+
+
+def test_normalize_entity_maps_bitcoin_to_btc():
+    assert normalize_entity("bitcoin") == "BTC"
+
+
+def test_store_snapshot_persists_payload(tmp_path, monkeypatch):
+    monkeypatch.setattr(ledger, "DB_PATH", tmp_path / "trends_history.db")
+    ledger.store_snapshot(
+        tool="resolve_entity",
+        keyword="bitcoin",
+        normalized_keyword="BTC",
+        mid="/m/05p0rrx",
+        canonical_label="Bitcoin",
+        payload={"keyword": "bitcoin", "canonical_label": "Bitcoin"},
+    )
+    rows = ledger.read_recent(10)
+    assert rows[0]["tool"] == "resolve_entity"
+    assert rows[0]["payload"]["canonical_label"] == "Bitcoin"
+
+
+def test_entity_history_matches_keyword(tmp_path, monkeypatch):
+    monkeypatch.setattr(ledger, "DB_PATH", tmp_path / "trends_history.db")
+    ledger.store_snapshot(tool="resolve_entity", keyword="bitcoin", normalized_keyword="BTC", mid="/m/05p0rrx", canonical_label="Bitcoin", payload={"keyword": "bitcoin"})
+    ledger.store_snapshot(tool="get_related_queries", keyword="bitcoin", normalized_keyword="BTC", mid=None, canonical_label=None, payload={"keyword": "bitcoin"})
+    history = ledger.entity_history("BTC")
+    assert history["count"] == 2

+ 16 - 0
tests.sh

@@ -0,0 +1,16 @@
+#!/usr/bin/env bash
+set -euo pipefail
+
+cd "$(dirname "$0")"
+
+if [ -f .env ]; then
+  # shellcheck disable=SC1091
+  source .env
+fi
+
+if [ -f .venv/bin/activate ]; then
+  # shellcheck disable=SC1091
+  source .venv/bin/activate
+fi
+
+python -m pytest

+ 1 - 0
trends_mcp/__init__.py

@@ -0,0 +1 @@
+"""Trends MCP package."""

+ 16 - 0
trends_mcp/aliases.py

@@ -0,0 +1,16 @@
+from __future__ import annotations
+
+import json
+from pathlib import Path
+
+_ALIASES_PATH = Path(__file__).with_name("entity_map.json")
+
+
+def load_aliases() -> dict[str, str]:
+    return {k.lower(): v for k, v in json.loads(_ALIASES_PATH.read_text()).items()}
+
+
+def normalize_entity(name: str) -> str:
+    aliases = load_aliases()
+    key = " ".join(str(name).strip().lower().split())
+    return aliases.get(key, str(name).strip())

+ 25 - 0
trends_mcp/cache.py

@@ -0,0 +1,25 @@
+from __future__ import annotations
+
+from time import time
+from typing import Any
+
+_CACHE: dict[str, tuple[float, Any]] = {}
+
+
+def get_cache(key: str):
+    item = _CACHE.get(key)
+    if not item:
+        return None
+    expires_at, value = item
+    if time() >= expires_at:
+        _CACHE.pop(key, None)
+        return None
+    return value
+
+
+def set_cache(key: str, value: Any, ttl_seconds: int):
+    _CACHE[key] = (time() + ttl_seconds, value)
+
+
+def cache_stats() -> dict[str, int]:
+    return {"entries": len(_CACHE)}

+ 7 - 0
trends_mcp/entity_map.json

@@ -0,0 +1,7 @@
+{
+  "btc": "BTC",
+  "bitcoin": "BTC",
+  "bitcoin price": "BTC",
+  "eth": "ETH",
+  "ethereum": "ETH"
+}

+ 208 - 0
trends_mcp/ledger.py

@@ -0,0 +1,208 @@
+from __future__ import annotations
+
+import json
+import sqlite3
+from datetime import datetime, timedelta, timezone
+from pathlib import Path
+from threading import Lock
+from typing import Any
+
+DB_PATH = Path(__file__).resolve().parent.parent / "data" / "trends_history.db"
+DEFAULT_RETENTION_DAYS = 30
+AUTO_PRUNE_INTERVAL_HOURS = 24
+_LOCK = Lock()
+
+
+def _connect() -> sqlite3.Connection:
+    DB_PATH.parent.mkdir(parents=True, exist_ok=True)
+    conn = sqlite3.connect(DB_PATH)
+    conn.row_factory = sqlite3.Row
+    return conn
+
+
+def init_db() -> None:
+    with _LOCK:
+        conn = _connect()
+        try:
+            conn.execute(
+                """
+                CREATE TABLE IF NOT EXISTS snapshots (
+                    id INTEGER PRIMARY KEY AUTOINCREMENT,
+                    ts TEXT NOT NULL,
+                    tool TEXT NOT NULL,
+                    keyword TEXT,
+                    normalized_keyword TEXT,
+                    mid TEXT,
+                    canonical_label TEXT,
+                    payload_json TEXT NOT NULL
+                )
+                """
+            )
+            conn.execute(
+                """
+                CREATE TABLE IF NOT EXISTS meta (
+                    key TEXT PRIMARY KEY,
+                    value TEXT NOT NULL
+                )
+                """
+            )
+            conn.execute("CREATE INDEX IF NOT EXISTS idx_snapshots_ts ON snapshots(ts)")
+            conn.execute("CREATE INDEX IF NOT EXISTS idx_snapshots_tool ON snapshots(tool)")
+            conn.execute("CREATE INDEX IF NOT EXISTS idx_snapshots_keyword ON snapshots(keyword)")
+            conn.execute("CREATE INDEX IF NOT EXISTS idx_snapshots_norm_keyword ON snapshots(normalized_keyword)")
+            conn.execute("CREATE INDEX IF NOT EXISTS idx_snapshots_mid ON snapshots(mid)")
+            conn.commit()
+        finally:
+            conn.close()
+
+
+def _get_meta(conn: sqlite3.Connection, key: str) -> str | None:
+    row = conn.execute("SELECT value FROM meta WHERE key = ?", (key,)).fetchone()
+    return row[0] if row else None
+
+
+def _set_meta(conn: sqlite3.Connection, key: str, value: str) -> None:
+    conn.execute(
+        "INSERT INTO meta (key, value) VALUES (?, ?) ON CONFLICT(key) DO UPDATE SET value = excluded.value",
+        (key, value),
+    )
+
+
+def prune_snapshots(retention_days: int = DEFAULT_RETENTION_DAYS) -> int:
+    cutoff = datetime.now(timezone.utc) - timedelta(days=max(1, int(retention_days)))
+    with _LOCK:
+        conn = _connect()
+        try:
+            cur = conn.execute("DELETE FROM snapshots WHERE ts < ?", (cutoff.isoformat(),))
+            _set_meta(conn, "last_prune_ts", datetime.now(timezone.utc).isoformat())
+            conn.commit()
+            return int(cur.rowcount or 0)
+        finally:
+            conn.close()
+
+
+def auto_prune_if_due(retention_days: int = DEFAULT_RETENTION_DAYS) -> int:
+    init_db()
+    with _LOCK:
+        conn = _connect()
+        try:
+            last_prune_ts = _get_meta(conn, "last_prune_ts")
+            if last_prune_ts:
+                try:
+                    last_prune = datetime.fromisoformat(last_prune_ts)
+                    if datetime.now(timezone.utc) - last_prune < timedelta(hours=AUTO_PRUNE_INTERVAL_HOURS):
+                        return 0
+                except ValueError:
+                    pass
+        finally:
+            conn.close()
+    return prune_snapshots(retention_days)
+
+
+def store_snapshot(
+    *,
+    tool: str,
+    keyword: str | None,
+    normalized_keyword: str | None,
+    mid: str | None,
+    canonical_label: str | None,
+    payload: dict[str, Any],
+) -> None:
+    init_db()
+    auto_prune_if_due()
+    ts = datetime.now(timezone.utc).isoformat()
+    with _LOCK:
+        conn = _connect()
+        try:
+            conn.execute(
+                """
+                INSERT INTO snapshots (ts, tool, keyword, normalized_keyword, mid, canonical_label, payload_json)
+                VALUES (?, ?, ?, ?, ?, ?, ?)
+                """,
+                (
+                    ts,
+                    tool,
+                    keyword,
+                    normalized_keyword,
+                    mid,
+                    canonical_label,
+                    json.dumps(payload, ensure_ascii=False),
+                ),
+            )
+            conn.commit()
+        finally:
+            conn.close()
+
+
+def _row_to_dict(row: sqlite3.Row) -> dict[str, Any]:
+    payload = json.loads(row["payload_json"])
+    return {
+        "id": row["id"],
+        "ts": row["ts"],
+        "tool": row["tool"],
+        "keyword": row["keyword"],
+        "normalized_keyword": row["normalized_keyword"],
+        "mid": row["mid"],
+        "canonical_label": row["canonical_label"],
+        "payload": payload,
+    }
+
+
+def read_recent(limit: int = 50) -> list[dict[str, Any]]:
+    init_db()
+    conn = _connect()
+    try:
+        rows = conn.execute(
+            "SELECT * FROM snapshots ORDER BY ts DESC LIMIT ?",
+            (max(1, min(int(limit), 200)),),
+        ).fetchall()
+        return [_row_to_dict(row) for row in rows]
+    finally:
+        conn.close()
+
+
+def summarize(limit: int = 500) -> dict[str, Any]:
+    rows = read_recent(limit)
+    tools: dict[str, int] = {}
+    keywords: dict[str, int] = {}
+    mids: dict[str, int] = {}
+    for row in rows:
+        if row["tool"]:
+            tools[row["tool"]] = tools.get(row["tool"], 0) + 1
+        keyword = row["normalized_keyword"] or row["keyword"]
+        if keyword:
+            keywords[keyword] = keywords.get(keyword, 0) + 1
+        if row["mid"]:
+            mids[row["mid"]] = mids.get(row["mid"], 0) + 1
+    return {
+        "entries": len(rows),
+        "top_tools": sorted(tools.items(), key=lambda x: x[1], reverse=True)[:10],
+        "top_keywords": sorted(keywords.items(), key=lambda x: x[1], reverse=True)[:10],
+        "top_mids": sorted(mids.items(), key=lambda x: x[1], reverse=True)[:10],
+    }
+
+
+def entity_history(entity: str, limit: int = 500) -> dict[str, Any]:
+    init_db()
+    entity_key = entity.strip().lower()
+    conn = _connect()
+    try:
+        rows = conn.execute(
+            """
+            SELECT * FROM snapshots
+            WHERE lower(coalesce(normalized_keyword, '')) = ?
+               OR lower(coalesce(keyword, '')) = ?
+               OR lower(coalesce(mid, '')) = ?
+               OR lower(coalesce(canonical_label, '')) = ?
+            ORDER BY ts DESC
+            LIMIT ?
+            """,
+            (entity_key, entity_key, entity_key, entity_key, max(1, min(int(limit), 2000))),
+        ).fetchall()
+        return {
+            "entity": entity,
+            "count": len(rows),
+            "entries": [_row_to_dict(row) for row in rows],
+        }
+    finally:
+        conn.close()

+ 292 - 0
trends_mcp/mcp_server_fastmcp.py

@@ -0,0 +1,292 @@
+from __future__ import annotations
+
+from fastapi import FastAPI
+from mcp.server.fastmcp import FastMCP
+from mcp.server.transport_security import TransportSecuritySettings
+
+from trends_mcp.aliases import normalize_entity
+from trends_mcp.cache import cache_stats, get_cache, set_cache
+from trends_mcp.ledger import entity_history, prune_snapshots, read_recent, store_snapshot, summarize
+from trends_mcp.providers.google_trends import GoogleTrendsError, GoogleTrendsProvider
+
+
+mcp = FastMCP(
+    "trends-mcp",
+    transport_security=TransportSecuritySettings(enable_dns_rebinding_protection=False),
+)
+
+provider = GoogleTrendsProvider()
+CACHE_TTL_SECONDS = 30 * 60
+
+
+def _trend_label(series: list[int]) -> str:
+    if not series:
+        return "flat"
+    first, last = series[0], series[-1]
+    if last - first >= 10:
+        return "rising"
+    if first - last >= 10:
+        return "falling"
+    return "flat"
+
+
+@mcp.tool(description="Experimental: show attention trend for a keyword or entity over time.")
+async def get_interest_over_time(keyword: str, timeframe: str = "7d"):
+    keyword_norm = normalize_entity(keyword)
+    cache_key = f"interest:{keyword_norm}:{timeframe}"
+    cached = get_cache(cache_key)
+    if cached:
+        return cached
+
+    try:
+        result = provider.interest_over_time(keyword, timeframe)
+    except GoogleTrendsError as exc:
+        return {
+            "keyword": keyword,
+            "normalized_keyword": keyword_norm,
+            "timeframe": timeframe,
+            "error": str(exc),
+        }
+
+    payload = {
+        "keyword": keyword,
+        "normalized_keyword": keyword_norm,
+        "timeframe": timeframe,
+        "series": result.series,
+        "trend": _trend_label(result.series),
+        "fetched_at": result.fetched_at,
+    }
+    set_cache(cache_key, payload, CACHE_TTL_SECONDS)
+    return payload
+
+
+@mcp.tool(description="Resolve an entity to Knowledge Graph MID candidates and a best canonical label.")
+async def resolve_entity(keyword: str):
+    cache_key = f"resolve:{normalize_entity(keyword)}"
+    cached = get_cache(cache_key)
+    if cached:
+        return cached
+    suggestions = provider.suggestions(keyword)
+    best = suggestions[0] if suggestions else None
+    payload = {
+        "keyword": keyword,
+        "canonical_label": best.get("title") if best else normalize_entity(keyword),
+        "mid": best.get("mid") if best else None,
+        "type": best.get("type") if best else None,
+        "candidates": suggestions,
+    }
+    set_cache(cache_key, payload, 24 * 60 * 60)
+    store_snapshot(
+        tool="resolve_entity",
+        keyword=keyword,
+        normalized_keyword=normalize_entity(keyword),
+        mid=payload["mid"],
+        canonical_label=payload["canonical_label"],
+        payload=payload,
+    )
+    return payload
+
+
+@mcp.tool(description="Get related search queries for an entity.")
+async def get_related_queries(keyword: str):
+    cache_key = f"related:{normalize_entity(keyword)}"
+    cached = get_cache(cache_key)
+    if cached:
+        return cached
+    related = provider.related_queries(keyword)
+    out = related.get(keyword) or related.get(normalize_entity(keyword)) or {}
+
+    def _rows(df):
+        if df is None:
+            return []
+        try:
+            return df.reset_index().to_dict(orient="records")
+        except Exception:
+            return []
+
+    payload = {
+        "keyword": keyword,
+        "top": _rows(out.get("top") if isinstance(out, dict) else None),
+        "rising": _rows(out.get("rising") if isinstance(out, dict) else None),
+    }
+    set_cache(cache_key, payload, 24 * 60 * 60)
+    store_snapshot(
+        tool="get_related_queries",
+        keyword=keyword,
+        normalized_keyword=normalize_entity(keyword),
+        mid=None,
+        canonical_label=None,
+        payload=payload,
+    )
+    return payload
+
+
+@mcp.tool(description="Get related topics for an entity.")
+async def get_related_topics(keyword: str):
+    cache_key = f"topics:{normalize_entity(keyword)}"
+    cached = get_cache(cache_key)
+    if cached:
+        return cached
+    try:
+        related = provider.related_topics(keyword)
+        out = related.get(keyword) or related.get(normalize_entity(keyword)) or {}
+    except GoogleTrendsError:
+        # pytrends' related_topics is flaky; fall back to related_queries so the tool stays useful.
+        related = provider.related_queries(keyword)
+        out = related.get(keyword) or related.get(normalize_entity(keyword)) or {}
+
+    def _rows(df):
+        if df is None:
+            return []
+        try:
+            return df.reset_index().to_dict(orient="records")
+        except Exception:
+            return []
+
+    payload = {
+        "keyword": keyword,
+        "top": _rows(out.get("top") if isinstance(out, dict) else None),
+        "rising": _rows(out.get("rising") if isinstance(out, dict) else None),
+    }
+    set_cache(cache_key, payload, 24 * 60 * 60)
+    store_snapshot(
+        tool="get_related_topics",
+        keyword=keyword,
+        normalized_keyword=normalize_entity(keyword),
+        mid=None,
+        canonical_label=None,
+        payload=payload,
+    )
+    return payload
+
+
+@mcp.tool(description="Read the most recent ledger events.")
+async def get_ledger_recent(limit: int = 50):
+    return read_recent(limit=max(1, min(int(limit), 200)))
+
+
+@mcp.tool(description="Summarize what the ledger is saying.")
+async def get_ledger_summary(limit: int = 500):
+    return summarize(limit=max(1, min(int(limit), 2000)))
+
+
+@mcp.tool(description="Show the ledger history for one entity or MID.")
+async def get_entity_history(entity: str, limit: int = 500):
+    return entity_history(entity, limit=max(1, min(int(limit), 2000)))
+
+
+@mcp.tool(description="Prune stored snapshots older than the configured retention window.")
+async def prune_history(retention_days: int = 30):
+    deleted = prune_snapshots(retention_days=max(1, min(int(retention_days), 3650)))
+    return {"deleted": deleted, "retention_days": retention_days}
+
+
+@mcp.tool(description="Compare attention between multiple keywords or entities.")
+async def compare_interest(keywords: list[str], timeframe: str = "7d"):
+    if not keywords:
+        return {"winner": None, "ratios": {}}
+
+    normalized = [normalize_entity(k) for k in keywords]
+    series_map = {}
+    for original, keyword in zip(keywords, normalized):
+        cache_key = f"interest:{keyword}:{timeframe}"
+        cached = get_cache(cache_key)
+        if cached:
+            series = cached["series"]
+        else:
+            series = provider.interest_over_time(original, timeframe).series
+            set_cache(
+                cache_key,
+                {
+                    "keyword": original,
+                    "normalized_keyword": keyword,
+                    "timeframe": timeframe,
+                    "series": series,
+                    "trend": _trend_label(series),
+                },
+                CACHE_TTL_SECONDS,
+            )
+        series_map[keyword] = series
+
+    scores = {k: sum(v) for k, v in series_map.items()}
+    winner = max(scores, key=scores.get)
+    top_score = float(scores[winner]) or 1.0
+    ratios = {k: round(v / top_score, 3) for k, v in scores.items()}
+    return {"winner": winner, "ratios": ratios, "timeframe": timeframe}
+
+
+@mcp.tool(description="Get a compact attention score for a known entity.")
+async def get_attention_score(entity: str, timeframe: str = "24h"):
+    normalized = normalize_entity(entity)
+    try:
+        series = provider.interest_over_time(entity, timeframe).series
+    except GoogleTrendsError as exc:
+        return {"entity": entity, "normalized_entity": normalized, "error": str(exc), "timeframe": timeframe}
+    score = round(sum(series) / (len(series) * 100), 3)
+    baseline = round(series[-1] / max(1, series[0]), 3) if series[0] else float(series[-1])
+    return {
+        "entity": entity,
+        "normalized_entity": normalized,
+        "score": score,
+        "relative_to_baseline": baseline,
+        "timeframe": timeframe,
+    }
+
+
+app = FastAPI(title="Trends MCP Server")
+app.mount("/mcp", mcp.sse_app())
+
+
+@app.get("/")
+def root():
+    return {
+        "status": "ok",
+        "transport": "fastmcp+sse",
+        "mount": "/mcp",
+        "tools": ["resolve_entity", "get_related_queries", "get_related_topics", "get_ledger_recent", "get_ledger_summary", "get_entity_history", "prune_history", "get_interest_over_time", "compare_interest", "get_attention_score"],
+    }
+
+
+@mcp.tool(description="Debug Google Trends connectivity, suggestions, and timeframe handling.")
+async def debug_google_trends(keyword: str, timeframe: str = "7d"):
+    keyword_norm = normalize_entity(keyword)
+    try:
+        suggestions = provider.suggestions(keyword)
+    except GoogleTrendsError as exc:
+        suggestions = {"error": str(exc)}
+
+    try:
+        result = provider.interest_over_time(keyword, timeframe)
+        payload = {
+            "keyword": keyword,
+            "normalized_keyword": keyword_norm,
+            "timeframe": timeframe,
+            "suggestions": suggestions,
+            "series": result.series,
+            "trend": _trend_label(result.series),
+            "fetched_at": result.fetched_at,
+        }
+    except GoogleTrendsError as exc:
+        payload = {
+            "keyword": keyword,
+            "normalized_keyword": keyword_norm,
+            "timeframe": timeframe,
+            "suggestions": suggestions,
+            "error": str(exc),
+        }
+    return payload
+
+
+@app.get("/health")
+def health():
+    return {"status": "ok", "service": "trends-mcp", "cache": cache_stats()}
+
+
+def main():
+    import uvicorn
+
+    uvicorn.run("trends_mcp.mcp_server_fastmcp:app", host="0.0.0.0", port=8507, reload=False)
+
+
+if __name__ == "__main__":
+    main()

+ 102 - 0
trends_mcp/providers/google_trends.py

@@ -0,0 +1,102 @@
+from __future__ import annotations
+
+from dataclasses import dataclass
+from datetime import datetime
+
+from pytrends.request import TrendReq
+
+
+class GoogleTrendsError(RuntimeError):
+    pass
+
+
+@dataclass(frozen=True)
+class TrendSeries:
+    keyword: str
+    timeframe: str
+    series: list[int]
+    fetched_at: str
+
+
+def _normalize_timeframe(timeframe: str) -> str:
+    tf = str(timeframe).strip().lower()
+    if tf in {"7d", "7day", "7days"}:
+        return "today 7-d"
+    if tf in {"30d", "30day", "30days"}:
+        return "today 1-m"
+    if tf in {"90d", "90day", "90days"}:
+        return "today 3-m"
+    if tf in {"12m", "1y", "365d"}:
+        return "today 12-m"
+    return timeframe
+
+
+class GoogleTrendsProvider:
+    def __init__(self):
+        self._client = TrendReq(hl="en-US", tz=120, retries=2, backoff_factor=0.2)
+
+    def suggestions(self, keyword: str) -> list[dict]:
+        try:
+            return self._client.suggestions(keyword)
+        except Exception as exc:
+            raise GoogleTrendsError(f"suggestions failed for {keyword!r}: {exc}") from exc
+
+    def related_queries(self, keyword: str) -> dict:
+        try:
+            self._client.build_payload([keyword], timeframe="today 12-m")
+            return self._client.related_queries() or {}
+        except Exception as first_exc:
+            try:
+                suggestions = self.suggestions(keyword)
+                topic = next((s["mid"] for s in suggestions if s.get("mid")), None)
+                if not topic:
+                    raise first_exc
+                self._client.build_payload([topic], timeframe="today 12-m")
+                return self._client.related_queries() or {}
+            except Exception as exc:
+                raise GoogleTrendsError(f"related_queries failed for {keyword!r}: {exc}") from exc
+
+    def related_topics(self, keyword: str) -> dict:
+        try:
+            self._client.build_payload([keyword], timeframe="today 12-m")
+            return self._client.related_topics() or {}
+        except Exception as first_exc:
+            try:
+                suggestions = self.suggestions(keyword)
+                topic = next((s["mid"] for s in suggestions if s.get("mid")), None)
+                if not topic:
+                    raise first_exc
+                self._client.build_payload([topic], timeframe="today 12-m")
+                return self._client.related_topics() or {}
+            except Exception as exc:
+                raise GoogleTrendsError(f"related_topics failed for {keyword!r}: {exc}") from exc
+
+    def interest_over_time(self, keyword: str, timeframe: str = "7d") -> TrendSeries:
+        timeframe = _normalize_timeframe(timeframe)
+        try:
+            self._client.build_payload([keyword], timeframe=timeframe)
+            frame = self._client.interest_over_time()
+        except Exception as first_exc:
+            try:
+                suggestions = self.suggestions(keyword)
+                topic = next((s["mid"] for s in suggestions if s.get("mid")), None)
+                if not topic:
+                    raise first_exc
+                self._client.build_payload([topic], timeframe=timeframe)
+                frame = self._client.interest_over_time()
+                keyword = topic
+            except Exception as exc:
+                raise GoogleTrendsError(f"interest_over_time failed for {keyword!r} timeframe={timeframe!r}: {exc}") from exc
+        if frame is None or frame.empty:
+            series = [0, 0, 0, 0, 0, 0]
+        else:
+            col = keyword if keyword in frame.columns else frame.columns[0]
+            series = [int(v) for v in frame[col].tail(6).tolist()]
+            if len(series) < 6:
+                series = ([series[0]] * (6 - len(series)) + series) if series else [0] * 6
+        return TrendSeries(
+            keyword=keyword,
+            timeframe=timeframe,
+            series=series,
+            fetched_at=datetime.utcnow().isoformat() + "Z",
+        )