| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231 |
- from __future__ import annotations
- from fastapi import FastAPI
- from mcp.server.fastmcp import FastMCP
- from mcp.server.transport_security import TransportSecuritySettings
- from news_mcp.config import CLUSTERS_TTL_HOURS, DEFAULT_TOPICS, DB_PATH
- from news_mcp.config import NEWS_REFRESH_INTERVAL_SECONDS, NEWS_BACKGROUND_REFRESH_ENABLED, NEWS_BACKGROUND_REFRESH_ON_START
- from news_mcp.jobs.poller import refresh_clusters
- from news_mcp.storage.sqlite_store import SQLiteClusterStore
- from news_mcp.enrichment.groq_enrich import summarize_cluster_groq
- mcp = FastMCP(
- "news-mcp",
- transport_security=TransportSecuritySettings(enable_dns_rebinding_protection=False),
- )
- @mcp.tool(description="What is happening right now? Return the latest deduplicated news clusters for a topic.")
- async def get_latest_events(topic: str = "crypto", limit: int = 5):
- limit = max(1, min(int(limit), 20))
- # In v1, `topic` is a coarse category. If the caller passes an entity name
- # (e.g. "trump"/"iran"), gracefully fall back to `other`.
- topic_norm = str(topic).strip().lower()
- allowed = {t.lower() for t in DEFAULT_TOPICS}
- if topic_norm not in allowed:
- topic_norm = "other"
- store = SQLiteClusterStore(DB_PATH)
- # Cache-first: only refresh if we currently have no fresh clusters for this topic.
- clusters = store.get_latest_clusters(topic=topic_norm, ttl_hours=CLUSTERS_TTL_HOURS, limit=limit)
- if not clusters:
- await refresh_clusters(topic=topic_norm, limit=200)
- clusters = store.get_latest_clusters(topic=topic_norm, ttl_hours=CLUSTERS_TTL_HOURS, limit=limit)
- # Ensure the response is compact and agent-friendly.
- clusters_sorted = sorted(clusters, key=lambda x: float(x.get("importance", 0.0)), reverse=True)
- out = []
- for c in clusters_sorted:
- out.append(
- {
- "cluster_id": c.get("cluster_id"),
- "headline": c.get("headline"),
- "summary": c.get("summary"),
- "entities": c.get("entities", []),
- "sentiment": c.get("sentiment", "neutral"),
- "importance": c.get("importance", 0.0),
- "sources": c.get("sources", []),
- "timestamp": c.get("timestamp"),
- }
- )
- return out
- @mcp.tool(description="What's happening with X? Filter latest clusters by extracted entity substring (case-insensitive).")
- async def get_events_for_entity(entity: str, limit: int = 10):
- limit = max(1, min(int(limit), 30))
- query = str(entity).strip().lower()
- if not query:
- return []
- # Cache-first: search recent clusters across all topics.
- store = SQLiteClusterStore(DB_PATH)
- clusters = store.get_latest_clusters_all_topics(ttl_hours=CLUSTERS_TTL_HOURS, limit=limit * 5)
- hits = []
- for c in clusters:
- ents = c.get("entities") or []
- if any(query in str(e).lower() for e in ents):
- hits.append(c)
- if len(hits) >= limit:
- break
- # Compress to tool response shape.
- out = []
- for c in hits:
- out.append(
- {
- "cluster_id": c.get("cluster_id"),
- "headline": c.get("headline"),
- "summary": c.get("summary"),
- "entities": c.get("entities", []),
- "sentiment": c.get("sentiment", "neutral"),
- "importance": c.get("importance", 0.0),
- "sources": c.get("sources", []),
- "timestamp": c.get("timestamp"),
- }
- )
- return out
- @mcp.tool(description="Explain an event clearly by cluster_id (Groq summary).")
- async def get_event_summary(event_id: str):
- store = SQLiteClusterStore(DB_PATH)
- # Summary cache: reuse if present within TTL.
- cached_summary = store.get_cluster_summary(
- cluster_id=event_id,
- ttl_hours=CLUSTERS_TTL_HOURS,
- )
- if cached_summary:
- return {
- "event_id": event_id,
- "headline": cached_summary.get("headline"),
- "mergedSummary": cached_summary.get("mergedSummary"),
- "keyFacts": cached_summary.get("keyFacts", []),
- "sources": cached_summary.get("sources", []),
- }
- cluster = store.get_cluster_by_id(event_id)
- if not cluster:
- return {
- "event_id": event_id,
- "error": "NOT_FOUND",
- }
- summary = await summarize_cluster_groq(cluster)
- store.upsert_cluster_summary(event_id, summary)
- return {
- "event_id": event_id,
- "headline": summary.get("headline"),
- "mergedSummary": summary.get("mergedSummary"),
- "keyFacts": summary.get("keyFacts", []),
- "sources": summary.get("sources", []),
- }
- @mcp.tool(description="Detect emerging topics/entities from recent cached news clusters.")
- async def detect_emerging_topics(limit: int = 10):
- limit = max(1, min(int(limit), 20))
- store = SQLiteClusterStore(DB_PATH)
- clusters = store.get_latest_clusters_all_topics(ttl_hours=CLUSTERS_TTL_HOURS, limit=200)
- from collections import Counter
- import re
- entity_counts = Counter()
- phrase_counts = Counter()
- topic_counts = Counter()
- for c in clusters:
- topic_counts[c.get("topic", "other")] += 1
- for ent in c.get("entities", []) or []:
- key = str(ent).strip().lower()
- if key:
- entity_counts[key] += 1
- text = f"{c.get('headline','')} {c.get('summary','')}"
- words = [w for w in re.findall(r"[A-Za-z][A-Za-z0-9\-]{2,}", text.lower())]
- for i in range(len(words) - 1):
- phrase = f"{words[i]} {words[i+1]}"
- if len(phrase) > 6:
- phrase_counts[phrase] += 1
- emerging = []
- for ent, count in entity_counts.most_common(limit):
- emerging.append({
- "topic": ent,
- "trend_score": min(0.99, round(0.25 + 0.15 * count, 2)),
- "related_entities": [ent],
- "signal_type": "entity",
- "count": count,
- })
- for phrase, count in phrase_counts.most_common(limit * 2):
- if any(item["topic"] == phrase for item in emerging):
- continue
- emerging.append({
- "topic": phrase.title(),
- "trend_score": min(0.99, round(0.20 + 0.10 * count, 2)),
- "related_entities": [],
- "signal_type": "phrase",
- "count": count,
- })
- if len(emerging) >= limit:
- break
- return emerging[:limit]
- app = FastAPI(title="News MCP Server")
- app.mount("/mcp", mcp.sse_app())
- _background_task_started = False
- @app.on_event("startup")
- async def _start_background_refresh():
- global _background_task_started
- if _background_task_started:
- return
- if not NEWS_BACKGROUND_REFRESH_ENABLED:
- return
- _background_task_started = True
- async def _loop():
- if not NEWS_BACKGROUND_REFRESH_ON_START:
- await asyncio.sleep(float(NEWS_REFRESH_INTERVAL_SECONDS))
- while True:
- try:
- # Refresh all topics by passing topic=None
- await refresh_clusters(topic=None, limit=200)
- except Exception:
- # Avoid crashing the server on network errors.
- pass
- await asyncio.sleep(float(NEWS_REFRESH_INTERVAL_SECONDS))
- import asyncio
- asyncio.create_task(_loop())
- @app.get("/")
- def root():
- return {"status": "ok", "transport": "fastmcp+sse", "mount": "/mcp", "tools": ["get_latest_events"]}
- @app.get("/health")
- def health():
- store = SQLiteClusterStore(DB_PATH)
- return {
- "status": "ok",
- "ttl_hours": CLUSTERS_TTL_HOURS,
- "db": str(DB_PATH),
- "refresh": store.get_feed_state("breakingthenews"),
- }
|