| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626 |
- from __future__ import annotations
- import asyncio
- import logging
- from collections import Counter
- from datetime import datetime, timezone
- from email.utils import parsedate_to_datetime
- from fastapi import FastAPI
- from mcp.server.fastmcp import FastMCP
- from mcp.server.transport_security import TransportSecuritySettings
- from news_mcp.config import DEFAULT_LOOKBACK_HOURS, DEFAULT_TOPICS, DB_PATH
- from news_mcp.config import (
- NEWS_PRUNE_INTERVAL_HOURS,
- NEWS_PRUNING_ENABLED,
- NEWS_REFRESH_INTERVAL_SECONDS,
- NEWS_BACKGROUND_REFRESH_ENABLED,
- NEWS_BACKGROUND_REFRESH_ON_START,
- NEWS_RETENTION_DAYS,
- )
- from news_mcp.jobs.poller import refresh_clusters
- from news_mcp.storage.sqlite_store import SQLiteClusterStore
- from news_mcp.enrichment.llm_enrich import summarize_cluster_llm
- from news_mcp.trends_resolution import resolve_entity_via_trends
- from news_mcp.llm import active_llm_config
- from news_mcp.entity_normalize import normalize_query
- mcp = FastMCP(
- "news-mcp",
- transport_security=TransportSecuritySettings(enable_dns_rebinding_protection=False),
- )
- def _cluster_entity_haystack(cluster: dict) -> list[str]:
- """Collect the normalized entity clues attached to a cluster."""
- values: list[str] = []
- for ent in cluster.get("entities", []) or []:
- values.append(str(ent).strip().lower())
- for res in cluster.get("entityResolutions", []) or []:
- if not isinstance(res, dict):
- continue
- for key in ("normalized", "canonical_label", "mid"):
- val = res.get(key)
- if val:
- values.append(str(val).strip().lower())
- return [v for v in values if v]
- def _parse_cluster_timestamp(value) -> datetime:
- if not value:
- return datetime.min.replace(tzinfo=timezone.utc)
- text = str(value).strip()
- if not text:
- return datetime.min.replace(tzinfo=timezone.utc)
- try:
- dt = datetime.fromisoformat(text.replace("Z", "+00:00"))
- if dt.tzinfo is None:
- dt = dt.replace(tzinfo=timezone.utc)
- return dt.astimezone(timezone.utc)
- except Exception:
- pass
- try:
- dt = parsedate_to_datetime(text)
- if dt.tzinfo is None:
- dt = dt.replace(tzinfo=timezone.utc)
- return dt.astimezone(timezone.utc)
- except Exception:
- return datetime.min.replace(tzinfo=timezone.utc)
- def _sort_clusters_by_recency(clusters: list[dict]) -> list[dict]:
- return sorted(
- clusters,
- key=lambda c: (
- _parse_cluster_timestamp(c.get("timestamp")),
- float(c.get("importance", 0.0) or 0.0),
- ),
- reverse=True,
- )
- @mcp.tool(description="Investigate a topic and return the newest deduplicated news clusters, sorted by recency.")
- async def get_latest_events(topic: str = "crypto", limit: int = 5, include_articles: bool = False):
- limit = max(1, min(int(limit), 20))
- # If the caller passes an entity-like value, resolve it and use the canonical
- # entity as the query lens. Otherwise keep the original topic path.
- topic_norm = normalize_query(topic).lower()
- resolved = resolve_entity_via_trends(topic_norm)
- allowed = {t.lower() for t in DEFAULT_TOPICS}
- is_topic = topic_norm in allowed
- query_terms = {
- topic_norm,
- str(resolved.get("normalized") or "").strip().lower(),
- str(resolved.get("canonical_label") or "").strip().lower(),
- str(resolved.get("mid") or "").strip().lower(),
- }
- query_terms = {q for q in query_terms if q}
- store = SQLiteClusterStore(DB_PATH)
- if is_topic:
- # Cache-first: only refresh if we currently have no fresh clusters for this topic.
- clusters = store.get_latest_clusters(topic=topic_norm, ttl_hours=DEFAULT_LOOKBACK_HOURS, limit=limit)
- if not clusters:
- await refresh_clusters(topic=topic_norm, limit=200)
- clusters = store.get_latest_clusters(topic=topic_norm, ttl_hours=DEFAULT_LOOKBACK_HOURS, limit=limit)
- else:
- # Entity-aware mode: search recent clusters across all topics and match by
- # raw entity, canonical label, or MID.
- clusters = store.get_latest_clusters_all_topics(ttl_hours=DEFAULT_LOOKBACK_HOURS, limit=limit * 8)
- filtered = []
- for c in clusters:
- haystack = _cluster_entity_haystack(c)
- if any(any(term in item for item in haystack) for term in query_terms):
- filtered.append(c)
- if len(filtered) >= limit:
- break
- clusters = filtered
- out = []
- for c in _sort_clusters_by_recency(clusters):
- item = {
- "cluster_id": c.get("cluster_id"),
- "headline": c.get("headline"),
- "summary": c.get("summary"),
- "entities": c.get("entities", []),
- "sentiment": c.get("sentiment", "neutral"),
- "importance": c.get("importance", 0.0),
- "sources": c.get("sources", []),
- "timestamp": c.get("timestamp"),
- }
- if include_articles:
- # Return minimal article fields to keep responses compact.
- arts = c.get("articles", []) or []
- item["articles"] = [
- {
- "title": a.get("title"),
- "url": a.get("url"),
- "source": a.get("source"),
- "timestamp": a.get("timestamp"),
- }
- for a in arts
- if isinstance(a, dict)
- ]
- out.append(item)
- return out
- @mcp.tool(description="Investigate a person, company, place, or theme by matching extracted entities within a time window.")
- async def get_events_for_entity(entity: str, limit: int = 10, timeframe: str = "24h", include_articles: bool = False):
- limit = max(1, min(int(limit), 30))
- query = normalize_query(entity).strip().lower()
- if not query:
- return []
- resolved = resolve_entity_via_trends(query)
- query_terms = {
- query,
- str(resolved.get("normalized") or "").strip().lower(),
- str(resolved.get("canonical_label") or "").strip().lower(),
- str(resolved.get("mid") or "").strip().lower(),
- }
- query_terms = {q for q in query_terms if q}
- store = SQLiteClusterStore(DB_PATH)
- def _match_clusters(clusters: list[dict]) -> list[dict]:
- hits: list[dict] = []
- for c in _sort_clusters_by_recency(clusters):
- haystack = _cluster_entity_haystack(c)
- if any(any(term in item for item in haystack) for term in query_terms):
- hits.append(c)
- if len(hits) >= limit:
- break
- return hits
- hours = _parse_timeframe_to_hours(timeframe)
- clusters = store.get_latest_clusters_all_topics(ttl_hours=hours, limit=max(200, limit * 10))
- hits = _match_clusters(clusters)
- out = []
- for c in hits:
- item = {
- "cluster_id": c.get("cluster_id"),
- "headline": c.get("headline"),
- "summary": c.get("summary"),
- "entities": c.get("entities", []),
- "sentiment": c.get("sentiment", "neutral"),
- "importance": c.get("importance", 0.0),
- "sources": c.get("sources", []),
- "timestamp": c.get("timestamp"),
- }
- if include_articles:
- arts = c.get("articles", []) or []
- item["articles"] = [
- {
- "title": a.get("title"),
- "url": a.get("url"),
- "source": a.get("source"),
- "timestamp": a.get("timestamp"),
- }
- for a in arts
- if isinstance(a, dict)
- ]
- out.append(item)
- return out
- @mcp.tool(description="Investigate one cluster in depth and return a concise LLM-written explanation plus key facts.")
- async def get_event_summary(event_id: str, include_articles: bool = False):
- store = SQLiteClusterStore(DB_PATH)
- # Summary cache: reuse if present within TTL.
- cached_summary = store.get_cluster_summary(
- cluster_id=event_id,
- ttl_hours=DEFAULT_LOOKBACK_HOURS,
- )
- if cached_summary:
- out = {
- "event_id": event_id,
- "headline": cached_summary.get("headline"),
- "mergedSummary": cached_summary.get("mergedSummary"),
- "keyFacts": cached_summary.get("keyFacts", []),
- "sources": cached_summary.get("sources", []),
- }
- if include_articles:
- cluster = store.get_cluster_by_id(event_id)
- arts = (cluster or {}).get("articles", []) or []
- out["articles"] = [
- {
- "title": a.get("title"),
- "url": a.get("url"),
- "source": a.get("source"),
- "timestamp": a.get("timestamp"),
- }
- for a in arts
- if isinstance(a, dict)
- ]
- return out
- cluster = store.get_cluster_by_id(event_id)
- if not cluster:
- return {
- "event_id": event_id,
- "error": "NOT_FOUND",
- }
- articles_out = None
- if include_articles:
- arts = cluster.get("articles", []) or []
- articles_out = [
- {
- "title": a.get("title"),
- "url": a.get("url"),
- "source": a.get("source"),
- "timestamp": a.get("timestamp"),
- }
- for a in arts
- if isinstance(a, dict)
- ]
- summary = await summarize_cluster_llm(cluster)
- store.upsert_cluster_summary(event_id, summary)
- out = {
- "event_id": event_id,
- "headline": summary.get("headline"),
- "mergedSummary": summary.get("mergedSummary"),
- "keyFacts": summary.get("keyFacts", []),
- "sources": summary.get("sources", []),
- }
- if include_articles:
- out["articles"] = articles_out or []
- return out
- @mcp.tool(description="Explore what is starting to matter: surface emerging entities and phrases from recent clusters.")
- async def detect_emerging_topics(limit: int = 10):
- limit = max(1, min(int(limit), 20))
- store = SQLiteClusterStore(DB_PATH)
- clusters = store.get_latest_clusters_all_topics(ttl_hours=DEFAULT_LOOKBACK_HOURS, limit=200)
- import re
- entity_counts = Counter()
- entity_importance_sum = Counter()
- # co-occurrence: ent -> other_ent -> count
- entity_cooccur = {}
- phrase_counts = Counter()
- topic_counts = Counter()
- # Very light heuristics to reduce “meta entities” dominating emerging topics.
- # Keep it conservative: only skip obvious boilerplate.
- def _is_generic_entity(ent: str) -> bool:
- e = str(ent).strip().lower()
- if not e:
- return True
- if len(e) < 4:
- return True
- # common outlet-ish / meta-ish tokens
- if e in {"news", "latest", "breaking"}:
- return True
- return False
- for c in clusters:
- topic_counts[c.get("topic", "other")] += 1
- ents_in_cluster = [e for e in (c.get("entities", []) or []) if not _is_generic_entity(e)]
- ents_in_cluster_norm = [str(e).strip().lower() for e in ents_in_cluster if str(e).strip()]
- for ent in ents_in_cluster_norm:
- if _is_generic_entity(ent):
- continue
- entity_counts[ent] += 1
- try:
- entity_importance_sum[ent] += float(c.get("importance", 0.0) or 0.0)
- except Exception:
- pass
- # update co-occurrence counts
- for i in range(len(ents_in_cluster_norm)):
- a = ents_in_cluster_norm[i]
- if not a:
- continue
- entity_cooccur.setdefault(a, Counter())
- for j in range(len(ents_in_cluster_norm)):
- if i == j:
- continue
- b = ents_in_cluster_norm[j]
- if not b:
- continue
- entity_cooccur[a][b] += 1
- text = f"{c.get('headline','')} {c.get('summary','')}"
- words = [w for w in re.findall(r"[A-Za-z][A-Za-z0-9\-]{2,}", text.lower())]
- for i in range(len(words) - 1):
- phrase = f"{words[i]} {words[i+1]}"
- if len(phrase) > 6:
- phrase_counts[phrase] += 1
- emerging = []
- # Combine frequency with average importance so “big signal” rises over pure repetition.
- for ent, count in entity_counts.most_common(limit):
- avg_imp = entity_importance_sum[ent] / max(1, count)
- # avg_imp is typically 0..~1; keep score bounded.
- trend_score = 0.25 + 0.40 * min(1.0, avg_imp) + 0.08 * min(6.0, float(count))
- related = []
- for other, _cnt in (entity_cooccur.get(ent) or Counter()).most_common(3):
- # avoid returning the entity itself (shouldn't happen, but be safe)
- if other != ent:
- related.append(other)
- emerging.append({
- "topic": ent,
- "trend_score": min(0.99, round(trend_score, 2)),
- "related_entities": related if related else [ent],
- "signal_type": "entity",
- "count": count,
- "avg_importance": round(avg_imp, 3),
- })
- for phrase, count in phrase_counts.most_common(limit * 2):
- if any(item["topic"] == phrase for item in emerging):
- continue
- emerging.append({
- "topic": phrase.title(),
- "trend_score": min(0.99, round(0.20 + 0.10 * count, 2)),
- "related_entities": [],
- "signal_type": "phrase",
- "count": count,
- })
- if len(emerging) >= limit:
- break
- return emerging[:limit]
- @mcp.tool(description="Investigate whether sentiment around an entity is positive, negative, or neutral over a chosen lookback window.")
- async def get_news_sentiment(entity: str, timeframe: str = "24h"):
- store = SQLiteClusterStore(DB_PATH)
- ent = normalize_query(entity).strip().lower()
- resolved = resolve_entity_via_trends(ent)
- query_terms = {
- ent,
- str(resolved.get("normalized") or "").strip().lower(),
- str(resolved.get("canonical_label") or "").strip().lower(),
- str(resolved.get("mid") or "").strip().lower(),
- }
- query_terms = {q for q in query_terms if q}
- if not ent:
- return {
- "entity": entity,
- "sentiment": "neutral",
- "score": 0.0,
- "cluster_count": 0,
- }
- # timeframe: accept '24h' or '24'
- tf = str(timeframe).strip().lower()
- try:
- hours = int(tf[:-1]) if tf.endswith("h") else int(tf)
- except Exception:
- hours = 24
- hours = max(1, min(int(hours), 168))
- clusters = store.get_latest_clusters_all_topics(ttl_hours=hours, limit=500)
- matched = []
- for c in clusters:
- haystack = _cluster_entity_haystack(c)
- if any(any(term in item for item in haystack) for term in query_terms):
- matched.append(c)
- if not matched:
- return {
- "entity": entity,
- "sentiment": "neutral",
- "score": 0.0,
- "cluster_count": 0,
- }
- scores = []
- for c in matched:
- s = c.get("sentimentScore")
- if s is not None:
- try:
- scores.append(float(s))
- except Exception:
- pass
- avg_score = sum(scores) / len(scores) if scores else 0.0
- # Keep the label aligned with the numeric score.
- # Small magnitudes are treated as neutral to avoid noisy label flips.
- if avg_score >= 0.15:
- sentiment = "positive"
- elif avg_score <= -0.15:
- sentiment = "negative"
- else:
- sentiment = "neutral"
- return {
- "entity": entity,
- "sentiment": sentiment,
- "score": round(avg_score, 3),
- "cluster_count": len(matched),
- }
- def _parse_timeframe_to_hours(timeframe: str) -> int:
- tf = str(timeframe).strip().lower()
- try:
- if tf.endswith("d"):
- days = int(tf[:-1])
- return max(1, days * 24)
- if tf.endswith("h"):
- return max(1, int(tf[:-1]))
- return max(1, int(tf))
- except Exception:
- return 24
- @mcp.tool(
- description="Investigate which entities tend to appear alongside a subject entity in recent clusters, based on co-occurrence."
- )
- async def get_related_entities(subject: str, timeframe: str = "24h", limit: int = 10):
- store = SQLiteClusterStore(DB_PATH)
- limit = max(1, min(int(limit), 30))
- subj = normalize_query(subject).strip().lower()
- if not subj:
- return []
- resolved = resolve_entity_via_trends(subj)
- query_terms = {
- subj,
- str(resolved.get("normalized") or "").strip().lower(),
- str(resolved.get("canonical_label") or "").strip().lower(),
- str(resolved.get("mid") or "").strip().lower(),
- }
- query_terms = {q for q in query_terms if q}
- hours = _parse_timeframe_to_hours(timeframe)
- clusters = store.get_latest_clusters_all_topics(ttl_hours=hours, limit=500)
- # Aggregate related metrics per entity.
- rel_count = Counter()
- rel_imp_sum = Counter()
- rel_sent_sum = Counter()
- rel_sent_n = Counter()
- for c in clusters:
- haystack = _cluster_entity_haystack(c)
- if not any(term in item for item in haystack for term in query_terms):
- continue
- ents = [str(e).strip().lower() for e in (c.get("entities", []) or []) if str(e).strip()]
- # remove generic/meta-ish short tokens conservatively
- ents = [e for e in ents if len(e) >= 4]
- for e in ents:
- if e in query_terms:
- continue
- rel_count[e] += 1
- try:
- rel_imp_sum[e] += float(c.get("importance", 0.0) or 0.0)
- except Exception:
- pass
- # sentiment aggregation based on sentimentScore if available.
- s = c.get("sentimentScore")
- if s is not None:
- try:
- rel_sent_sum[e] += float(s)
- rel_sent_n[e] += 1
- except Exception:
- pass
- # Sort by count, then avg importance.
- items = []
- for ent, cnt in rel_count.most_common():
- avg_imp = rel_imp_sum[ent] / max(1, cnt)
- avg_score = rel_sent_sum[ent] / max(1, rel_sent_n[ent]) if rel_sent_n[ent] else 0.0
- if avg_score >= 0.15:
- sentiment = "positive"
- elif avg_score <= -0.15:
- sentiment = "negative"
- else:
- sentiment = "neutral"
- items.append(
- {
- "entity": ent,
- "count": cnt,
- "avg_importance": round(avg_imp, 3),
- "sentiment": sentiment,
- "score": round(avg_score, 3),
- }
- )
- if len(items) >= limit:
- break
- return items
- app = FastAPI(title="News MCP Server")
- logger = logging.getLogger("news_mcp.startup")
- app.mount("/mcp", mcp.sse_app())
- _background_task_started = False
- @app.on_event("startup")
- async def _start_background_refresh():
- global _background_task_started
- if _background_task_started:
- return
- if not NEWS_BACKGROUND_REFRESH_ENABLED:
- return
- _background_task_started = True
- logger.info("news-mcp llm config: %s", active_llm_config())
- store = SQLiteClusterStore(DB_PATH)
- prune_result = store.prune_if_due(
- pruning_enabled=NEWS_PRUNING_ENABLED,
- retention_days=NEWS_RETENTION_DAYS,
- interval_hours=NEWS_PRUNE_INTERVAL_HOURS,
- )
- logger.info("startup prune_result=%s", prune_result)
- async def _loop():
- if not NEWS_BACKGROUND_REFRESH_ON_START:
- await asyncio.sleep(float(NEWS_REFRESH_INTERVAL_SECONDS))
- while True:
- try:
- # Refresh all topics by passing topic=None
- await refresh_clusters(topic=None, limit=200)
- except Exception:
- # Avoid crashing the server on network errors.
- pass
- await asyncio.sleep(float(NEWS_REFRESH_INTERVAL_SECONDS))
- asyncio.create_task(_loop())
- @app.get("/")
- def root():
- return {
- "status": "ok",
- "transport": "fastmcp+sse",
- "mount": "/mcp",
- "tools": ["get_latest_events", "get_events_for_entity", "get_event_summary", "detect_emerging_topics"],
- "refresh": {
- "enabled": NEWS_BACKGROUND_REFRESH_ENABLED,
- "interval_seconds": NEWS_REFRESH_INTERVAL_SECONDS,
- },
- "retention": {
- "lookback_hours": DEFAULT_LOOKBACK_HOURS,
- "retention_days": NEWS_RETENTION_DAYS,
- },
- "pruning": {
- "enabled": NEWS_PRUNING_ENABLED,
- "interval_hours": NEWS_PRUNE_INTERVAL_HOURS,
- },
- }
- @app.get("/health")
- def health():
- store = SQLiteClusterStore(DB_PATH)
- return {
- "status": "ok",
- "lookback_hours": DEFAULT_LOOKBACK_HOURS,
- "db": str(DB_PATH),
- "refresh": store.get_feed_state("breakingthenews"),
- "pruning": store.get_prune_state(
- pruning_enabled=NEWS_PRUNING_ENABLED,
- retention_days=NEWS_RETENTION_DAYS,
- interval_hours=NEWS_PRUNE_INTERVAL_HOURS,
- ),
- }
|