|
@@ -1,19 +1,30 @@
|
|
|
from __future__ import annotations
|
|
from __future__ import annotations
|
|
|
|
|
|
|
|
|
|
+import asyncio
|
|
|
|
|
+import logging
|
|
|
|
|
+from collections import Counter
|
|
|
|
|
+from datetime import datetime, timezone
|
|
|
|
|
+from email.utils import parsedate_to_datetime
|
|
|
|
|
+
|
|
|
from fastapi import FastAPI
|
|
from fastapi import FastAPI
|
|
|
from mcp.server.fastmcp import FastMCP
|
|
from mcp.server.fastmcp import FastMCP
|
|
|
from mcp.server.transport_security import TransportSecuritySettings
|
|
from mcp.server.transport_security import TransportSecuritySettings
|
|
|
|
|
|
|
|
-from news_mcp.config import CLUSTERS_TTL_HOURS, DEFAULT_TOPICS, DB_PATH
|
|
|
|
|
-from news_mcp.config import NEWS_REFRESH_INTERVAL_SECONDS, NEWS_BACKGROUND_REFRESH_ENABLED, NEWS_BACKGROUND_REFRESH_ON_START
|
|
|
|
|
|
|
+from news_mcp.config import DEFAULT_LOOKBACK_HOURS, DEFAULT_TOPICS, DB_PATH
|
|
|
|
|
+from news_mcp.config import (
|
|
|
|
|
+ NEWS_PRUNE_INTERVAL_HOURS,
|
|
|
|
|
+ NEWS_PRUNING_ENABLED,
|
|
|
|
|
+ NEWS_REFRESH_INTERVAL_SECONDS,
|
|
|
|
|
+ NEWS_BACKGROUND_REFRESH_ENABLED,
|
|
|
|
|
+ NEWS_BACKGROUND_REFRESH_ON_START,
|
|
|
|
|
+ NEWS_RETENTION_DAYS,
|
|
|
|
|
+)
|
|
|
from news_mcp.jobs.poller import refresh_clusters
|
|
from news_mcp.jobs.poller import refresh_clusters
|
|
|
from news_mcp.storage.sqlite_store import SQLiteClusterStore
|
|
from news_mcp.storage.sqlite_store import SQLiteClusterStore
|
|
|
-from news_mcp.enrichment.llm_enrich import summarize_cluster_groq
|
|
|
|
|
|
|
+from news_mcp.enrichment.llm_enrich import summarize_cluster_llm
|
|
|
from news_mcp.trends_resolution import resolve_entity_via_trends
|
|
from news_mcp.trends_resolution import resolve_entity_via_trends
|
|
|
from news_mcp.llm import active_llm_config
|
|
from news_mcp.llm import active_llm_config
|
|
|
from news_mcp.entity_normalize import normalize_query
|
|
from news_mcp.entity_normalize import normalize_query
|
|
|
-from collections import Counter
|
|
|
|
|
-import logging
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
mcp = FastMCP(
|
|
mcp = FastMCP(
|
|
@@ -37,7 +48,40 @@ def _cluster_entity_haystack(cluster: dict) -> list[str]:
|
|
|
return [v for v in values if v]
|
|
return [v for v in values if v]
|
|
|
|
|
|
|
|
|
|
|
|
|
-@mcp.tool(description="What is happening right now? Return the latest deduplicated news clusters for a topic.")
|
|
|
|
|
|
|
+def _parse_cluster_timestamp(value) -> datetime:
|
|
|
|
|
+ if not value:
|
|
|
|
|
+ return datetime.min.replace(tzinfo=timezone.utc)
|
|
|
|
|
+ text = str(value).strip()
|
|
|
|
|
+ if not text:
|
|
|
|
|
+ return datetime.min.replace(tzinfo=timezone.utc)
|
|
|
|
|
+ try:
|
|
|
|
|
+ dt = datetime.fromisoformat(text.replace("Z", "+00:00"))
|
|
|
|
|
+ if dt.tzinfo is None:
|
|
|
|
|
+ dt = dt.replace(tzinfo=timezone.utc)
|
|
|
|
|
+ return dt.astimezone(timezone.utc)
|
|
|
|
|
+ except Exception:
|
|
|
|
|
+ pass
|
|
|
|
|
+ try:
|
|
|
|
|
+ dt = parsedate_to_datetime(text)
|
|
|
|
|
+ if dt.tzinfo is None:
|
|
|
|
|
+ dt = dt.replace(tzinfo=timezone.utc)
|
|
|
|
|
+ return dt.astimezone(timezone.utc)
|
|
|
|
|
+ except Exception:
|
|
|
|
|
+ return datetime.min.replace(tzinfo=timezone.utc)
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def _sort_clusters_by_recency(clusters: list[dict]) -> list[dict]:
|
|
|
|
|
+ return sorted(
|
|
|
|
|
+ clusters,
|
|
|
|
|
+ key=lambda c: (
|
|
|
|
|
+ _parse_cluster_timestamp(c.get("timestamp")),
|
|
|
|
|
+ float(c.get("importance", 0.0) or 0.0),
|
|
|
|
|
+ ),
|
|
|
|
|
+ reverse=True,
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+@mcp.tool(description="Investigate a topic and return the newest deduplicated news clusters, sorted by recency.")
|
|
|
async def get_latest_events(topic: str = "crypto", limit: int = 5, include_articles: bool = False):
|
|
async def get_latest_events(topic: str = "crypto", limit: int = 5, include_articles: bool = False):
|
|
|
limit = max(1, min(int(limit), 20))
|
|
limit = max(1, min(int(limit), 20))
|
|
|
# If the caller passes an entity-like value, resolve it and use the canonical
|
|
# If the caller passes an entity-like value, resolve it and use the canonical
|
|
@@ -58,14 +102,14 @@ async def get_latest_events(topic: str = "crypto", limit: int = 5, include_artic
|
|
|
|
|
|
|
|
if is_topic:
|
|
if is_topic:
|
|
|
# Cache-first: only refresh if we currently have no fresh clusters for this topic.
|
|
# Cache-first: only refresh if we currently have no fresh clusters for this topic.
|
|
|
- clusters = store.get_latest_clusters(topic=topic_norm, ttl_hours=CLUSTERS_TTL_HOURS, limit=limit)
|
|
|
|
|
|
|
+ clusters = store.get_latest_clusters(topic=topic_norm, ttl_hours=DEFAULT_LOOKBACK_HOURS, limit=limit)
|
|
|
if not clusters:
|
|
if not clusters:
|
|
|
await refresh_clusters(topic=topic_norm, limit=200)
|
|
await refresh_clusters(topic=topic_norm, limit=200)
|
|
|
- clusters = store.get_latest_clusters(topic=topic_norm, ttl_hours=CLUSTERS_TTL_HOURS, limit=limit)
|
|
|
|
|
|
|
+ clusters = store.get_latest_clusters(topic=topic_norm, ttl_hours=DEFAULT_LOOKBACK_HOURS, limit=limit)
|
|
|
else:
|
|
else:
|
|
|
# Entity-aware mode: search recent clusters across all topics and match by
|
|
# Entity-aware mode: search recent clusters across all topics and match by
|
|
|
# raw entity, canonical label, or MID.
|
|
# raw entity, canonical label, or MID.
|
|
|
- clusters = store.get_latest_clusters_all_topics(ttl_hours=CLUSTERS_TTL_HOURS, limit=limit * 8)
|
|
|
|
|
|
|
+ clusters = store.get_latest_clusters_all_topics(ttl_hours=DEFAULT_LOOKBACK_HOURS, limit=limit * 8)
|
|
|
filtered = []
|
|
filtered = []
|
|
|
for c in clusters:
|
|
for c in clusters:
|
|
|
haystack = _cluster_entity_haystack(c)
|
|
haystack = _cluster_entity_haystack(c)
|
|
@@ -75,11 +119,8 @@ async def get_latest_events(topic: str = "crypto", limit: int = 5, include_artic
|
|
|
break
|
|
break
|
|
|
clusters = filtered
|
|
clusters = filtered
|
|
|
|
|
|
|
|
- # Ensure the response is compact and agent-friendly.
|
|
|
|
|
- clusters_sorted = sorted(clusters, key=lambda x: float(x.get("importance", 0.0)), reverse=True)
|
|
|
|
|
-
|
|
|
|
|
out = []
|
|
out = []
|
|
|
- for c in clusters_sorted:
|
|
|
|
|
|
|
+ for c in _sort_clusters_by_recency(clusters):
|
|
|
item = {
|
|
item = {
|
|
|
"cluster_id": c.get("cluster_id"),
|
|
"cluster_id": c.get("cluster_id"),
|
|
|
"headline": c.get("headline"),
|
|
"headline": c.get("headline"),
|
|
@@ -108,7 +149,7 @@ async def get_latest_events(topic: str = "crypto", limit: int = 5, include_artic
|
|
|
return out
|
|
return out
|
|
|
|
|
|
|
|
|
|
|
|
|
-@mcp.tool(description="What's happening with X? Filter clusters by extracted entity substring (case-insensitive) within a timeframe.")
|
|
|
|
|
|
|
+@mcp.tool(description="Investigate a person, company, place, or theme by matching extracted entities within a time window.")
|
|
|
async def get_events_for_entity(entity: str, limit: int = 10, timeframe: str = "24h", include_articles: bool = False):
|
|
async def get_events_for_entity(entity: str, limit: int = 10, timeframe: str = "24h", include_articles: bool = False):
|
|
|
limit = max(1, min(int(limit), 30))
|
|
limit = max(1, min(int(limit), 30))
|
|
|
query = normalize_query(entity).strip().lower()
|
|
query = normalize_query(entity).strip().lower()
|
|
@@ -128,7 +169,7 @@ async def get_events_for_entity(entity: str, limit: int = 10, timeframe: str = "
|
|
|
|
|
|
|
|
def _match_clusters(clusters: list[dict]) -> list[dict]:
|
|
def _match_clusters(clusters: list[dict]) -> list[dict]:
|
|
|
hits: list[dict] = []
|
|
hits: list[dict] = []
|
|
|
- for c in clusters:
|
|
|
|
|
|
|
+ for c in _sort_clusters_by_recency(clusters):
|
|
|
haystack = _cluster_entity_haystack(c)
|
|
haystack = _cluster_entity_haystack(c)
|
|
|
if any(any(term in item for item in haystack) for term in query_terms):
|
|
if any(any(term in item for item in haystack) for term in query_terms):
|
|
|
hits.append(c)
|
|
hits.append(c)
|
|
@@ -136,14 +177,10 @@ async def get_events_for_entity(entity: str, limit: int = 10, timeframe: str = "
|
|
|
break
|
|
break
|
|
|
return hits
|
|
return hits
|
|
|
|
|
|
|
|
- clusters = store.get_latest_clusters_all_topics(ttl_hours=CLUSTERS_TTL_HOURS, limit=limit * 5)
|
|
|
|
|
- hits = _match_clusters(clusters)
|
|
|
|
|
-
|
|
|
|
|
hours = _parse_timeframe_to_hours(timeframe)
|
|
hours = _parse_timeframe_to_hours(timeframe)
|
|
|
clusters = store.get_latest_clusters_all_topics(ttl_hours=hours, limit=max(200, limit * 10))
|
|
clusters = store.get_latest_clusters_all_topics(ttl_hours=hours, limit=max(200, limit * 10))
|
|
|
hits = _match_clusters(clusters)
|
|
hits = _match_clusters(clusters)
|
|
|
|
|
|
|
|
- # Compress to tool response shape.
|
|
|
|
|
out = []
|
|
out = []
|
|
|
for c in hits:
|
|
for c in hits:
|
|
|
item = {
|
|
item = {
|
|
@@ -172,14 +209,14 @@ async def get_events_for_entity(entity: str, limit: int = 10, timeframe: str = "
|
|
|
return out
|
|
return out
|
|
|
|
|
|
|
|
|
|
|
|
|
-@mcp.tool(description="Explain an event clearly by cluster_id (Groq summary).")
|
|
|
|
|
|
|
+@mcp.tool(description="Investigate one cluster in depth and return a concise LLM-written explanation plus key facts.")
|
|
|
async def get_event_summary(event_id: str, include_articles: bool = False):
|
|
async def get_event_summary(event_id: str, include_articles: bool = False):
|
|
|
store = SQLiteClusterStore(DB_PATH)
|
|
store = SQLiteClusterStore(DB_PATH)
|
|
|
|
|
|
|
|
# Summary cache: reuse if present within TTL.
|
|
# Summary cache: reuse if present within TTL.
|
|
|
cached_summary = store.get_cluster_summary(
|
|
cached_summary = store.get_cluster_summary(
|
|
|
cluster_id=event_id,
|
|
cluster_id=event_id,
|
|
|
- ttl_hours=CLUSTERS_TTL_HOURS,
|
|
|
|
|
|
|
+ ttl_hours=DEFAULT_LOOKBACK_HOURS,
|
|
|
)
|
|
)
|
|
|
if cached_summary:
|
|
if cached_summary:
|
|
|
out = {
|
|
out = {
|
|
@@ -226,7 +263,7 @@ async def get_event_summary(event_id: str, include_articles: bool = False):
|
|
|
if isinstance(a, dict)
|
|
if isinstance(a, dict)
|
|
|
]
|
|
]
|
|
|
|
|
|
|
|
- summary = await summarize_cluster_groq(cluster)
|
|
|
|
|
|
|
+ summary = await summarize_cluster_llm(cluster)
|
|
|
|
|
|
|
|
store.upsert_cluster_summary(event_id, summary)
|
|
store.upsert_cluster_summary(event_id, summary)
|
|
|
out = {
|
|
out = {
|
|
@@ -242,13 +279,12 @@ async def get_event_summary(event_id: str, include_articles: bool = False):
|
|
|
return out
|
|
return out
|
|
|
|
|
|
|
|
|
|
|
|
|
-@mcp.tool(description="Detect emerging topics/entities from recent cached news clusters.")
|
|
|
|
|
|
|
+@mcp.tool(description="Explore what is starting to matter: surface emerging entities and phrases from recent clusters.")
|
|
|
async def detect_emerging_topics(limit: int = 10):
|
|
async def detect_emerging_topics(limit: int = 10):
|
|
|
limit = max(1, min(int(limit), 20))
|
|
limit = max(1, min(int(limit), 20))
|
|
|
store = SQLiteClusterStore(DB_PATH)
|
|
store = SQLiteClusterStore(DB_PATH)
|
|
|
- clusters = store.get_latest_clusters_all_topics(ttl_hours=CLUSTERS_TTL_HOURS, limit=200)
|
|
|
|
|
|
|
+ clusters = store.get_latest_clusters_all_topics(ttl_hours=DEFAULT_LOOKBACK_HOURS, limit=200)
|
|
|
|
|
|
|
|
- from collections import Counter
|
|
|
|
|
import re
|
|
import re
|
|
|
|
|
|
|
|
entity_counts = Counter()
|
|
entity_counts = Counter()
|
|
@@ -280,9 +316,9 @@ async def detect_emerging_topics(limit: int = 10):
|
|
|
continue
|
|
continue
|
|
|
entity_counts[ent] += 1
|
|
entity_counts[ent] += 1
|
|
|
try:
|
|
try:
|
|
|
- entity_importance_sum[ent] += float(c.get("importance", 0.0) or 0.0)
|
|
|
|
|
|
|
+ entity_importance_sum[ent] += float(c.get("importance", 0.0) or 0.0)
|
|
|
except Exception:
|
|
except Exception:
|
|
|
- pass
|
|
|
|
|
|
|
+ pass
|
|
|
|
|
|
|
|
# update co-occurrence counts
|
|
# update co-occurrence counts
|
|
|
for i in range(len(ents_in_cluster_norm)):
|
|
for i in range(len(ents_in_cluster_norm)):
|
|
@@ -342,7 +378,7 @@ async def detect_emerging_topics(limit: int = 10):
|
|
|
return emerging[:limit]
|
|
return emerging[:limit]
|
|
|
|
|
|
|
|
|
|
|
|
|
-@mcp.tool(description="What's the overall sentiment around an entity within a timeframe?")
|
|
|
|
|
|
|
+@mcp.tool(description="Investigate whether sentiment around an entity is positive, negative, or neutral over a chosen lookback window.")
|
|
|
async def get_news_sentiment(entity: str, timeframe: str = "24h"):
|
|
async def get_news_sentiment(entity: str, timeframe: str = "24h"):
|
|
|
store = SQLiteClusterStore(DB_PATH)
|
|
store = SQLiteClusterStore(DB_PATH)
|
|
|
|
|
|
|
@@ -428,7 +464,7 @@ def _parse_timeframe_to_hours(timeframe: str) -> int:
|
|
|
|
|
|
|
|
|
|
|
|
|
@mcp.tool(
|
|
@mcp.tool(
|
|
|
- description="Given a subject entity, find related entities via co-occurrence inside recent clusters (entity-only, no topic fallback)."
|
|
|
|
|
|
|
+ description="Investigate which entities tend to appear alongside a subject entity in recent clusters, based on co-occurrence."
|
|
|
)
|
|
)
|
|
|
async def get_related_entities(subject: str, timeframe: str = "24h", limit: int = 10):
|
|
async def get_related_entities(subject: str, timeframe: str = "24h", limit: int = 10):
|
|
|
store = SQLiteClusterStore(DB_PATH)
|
|
store = SQLiteClusterStore(DB_PATH)
|
|
@@ -529,6 +565,14 @@ async def _start_background_refresh():
|
|
|
_background_task_started = True
|
|
_background_task_started = True
|
|
|
logger.info("news-mcp llm config: %s", active_llm_config())
|
|
logger.info("news-mcp llm config: %s", active_llm_config())
|
|
|
|
|
|
|
|
|
|
+ store = SQLiteClusterStore(DB_PATH)
|
|
|
|
|
+ prune_result = store.prune_if_due(
|
|
|
|
|
+ pruning_enabled=NEWS_PRUNING_ENABLED,
|
|
|
|
|
+ retention_days=NEWS_RETENTION_DAYS,
|
|
|
|
|
+ interval_hours=NEWS_PRUNE_INTERVAL_HOURS,
|
|
|
|
|
+ )
|
|
|
|
|
+ logger.info("startup prune_result=%s", prune_result)
|
|
|
|
|
+
|
|
|
async def _loop():
|
|
async def _loop():
|
|
|
if not NEWS_BACKGROUND_REFRESH_ON_START:
|
|
if not NEWS_BACKGROUND_REFRESH_ON_START:
|
|
|
await asyncio.sleep(float(NEWS_REFRESH_INTERVAL_SECONDS))
|
|
await asyncio.sleep(float(NEWS_REFRESH_INTERVAL_SECONDS))
|
|
@@ -541,8 +585,6 @@ async def _start_background_refresh():
|
|
|
pass
|
|
pass
|
|
|
await asyncio.sleep(float(NEWS_REFRESH_INTERVAL_SECONDS))
|
|
await asyncio.sleep(float(NEWS_REFRESH_INTERVAL_SECONDS))
|
|
|
|
|
|
|
|
- import asyncio
|
|
|
|
|
-
|
|
|
|
|
asyncio.create_task(_loop())
|
|
asyncio.create_task(_loop())
|
|
|
|
|
|
|
|
|
|
|
|
@@ -557,6 +599,14 @@ def root():
|
|
|
"enabled": NEWS_BACKGROUND_REFRESH_ENABLED,
|
|
"enabled": NEWS_BACKGROUND_REFRESH_ENABLED,
|
|
|
"interval_seconds": NEWS_REFRESH_INTERVAL_SECONDS,
|
|
"interval_seconds": NEWS_REFRESH_INTERVAL_SECONDS,
|
|
|
},
|
|
},
|
|
|
|
|
+ "retention": {
|
|
|
|
|
+ "ttl_hours": DEFAULT_LOOKBACK_HOURS,
|
|
|
|
|
+ "retention_days": NEWS_RETENTION_DAYS,
|
|
|
|
|
+ },
|
|
|
|
|
+ "pruning": {
|
|
|
|
|
+ "enabled": NEWS_PRUNING_ENABLED,
|
|
|
|
|
+ "interval_hours": NEWS_PRUNE_INTERVAL_HOURS,
|
|
|
|
|
+ },
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
@@ -565,7 +615,12 @@ def health():
|
|
|
store = SQLiteClusterStore(DB_PATH)
|
|
store = SQLiteClusterStore(DB_PATH)
|
|
|
return {
|
|
return {
|
|
|
"status": "ok",
|
|
"status": "ok",
|
|
|
- "ttl_hours": CLUSTERS_TTL_HOURS,
|
|
|
|
|
|
|
+ "ttl_hours": DEFAULT_LOOKBACK_HOURS,
|
|
|
"db": str(DB_PATH),
|
|
"db": str(DB_PATH),
|
|
|
"refresh": store.get_feed_state("breakingthenews"),
|
|
"refresh": store.get_feed_state("breakingthenews"),
|
|
|
|
|
+ "pruning": store.get_prune_state(
|
|
|
|
|
+ pruning_enabled=NEWS_PRUNING_ENABLED,
|
|
|
|
|
+ retention_days=NEWS_RETENTION_DAYS,
|
|
|
|
|
+ interval_hours=NEWS_PRUNE_INTERVAL_HOURS,
|
|
|
|
|
+ ),
|
|
|
}
|
|
}
|