|
|
@@ -9,6 +9,7 @@ from news_mcp.config import NEWS_REFRESH_INTERVAL_SECONDS, NEWS_BACKGROUND_REFRE
|
|
|
from news_mcp.jobs.poller import refresh_clusters
|
|
|
from news_mcp.storage.sqlite_store import SQLiteClusterStore
|
|
|
from news_mcp.enrichment.llm_enrich import summarize_cluster_groq
|
|
|
+from news_mcp.trends_resolution import resolve_entity_via_trends
|
|
|
from news_mcp.llm import active_llm_config
|
|
|
from news_mcp.entity_normalize import normalize_query
|
|
|
from collections import Counter
|
|
|
@@ -21,22 +22,58 @@ mcp = FastMCP(
|
|
|
)
|
|
|
|
|
|
|
|
|
+def _cluster_entity_haystack(cluster: dict) -> list[str]:
|
|
|
+ """Collect the normalized entity clues attached to a cluster."""
|
|
|
+ values: list[str] = []
|
|
|
+ for ent in cluster.get("entities", []) or []:
|
|
|
+ values.append(str(ent).strip().lower())
|
|
|
+ for res in cluster.get("entityResolutions", []) or []:
|
|
|
+ if not isinstance(res, dict):
|
|
|
+ continue
|
|
|
+ for key in ("normalized", "canonical_label", "mid"):
|
|
|
+ val = res.get(key)
|
|
|
+ if val:
|
|
|
+ values.append(str(val).strip().lower())
|
|
|
+ return [v for v in values if v]
|
|
|
+
|
|
|
+
|
|
|
@mcp.tool(description="What is happening right now? Return the latest deduplicated news clusters for a topic.")
|
|
|
async def get_latest_events(topic: str = "crypto", limit: int = 5):
|
|
|
limit = max(1, min(int(limit), 20))
|
|
|
- # In v1, `topic` is a coarse category. If the caller passes an entity name
|
|
|
- # (e.g. "trump"/"iran"), gracefully fall back to `other`.
|
|
|
+ # If the caller passes an entity-like value, resolve it and use the canonical
|
|
|
+ # entity as the query lens. Otherwise keep the original topic path.
|
|
|
topic_norm = normalize_query(topic).lower()
|
|
|
+ resolved = resolve_entity_via_trends(topic_norm)
|
|
|
allowed = {t.lower() for t in DEFAULT_TOPICS}
|
|
|
- if topic_norm not in allowed:
|
|
|
- topic_norm = "other"
|
|
|
+ is_topic = topic_norm in allowed
|
|
|
+ query_terms = {
|
|
|
+ topic_norm,
|
|
|
+ str(resolved.get("normalized") or "").strip().lower(),
|
|
|
+ str(resolved.get("canonical_label") or "").strip().lower(),
|
|
|
+ str(resolved.get("mid") or "").strip().lower(),
|
|
|
+ }
|
|
|
+ query_terms = {q for q in query_terms if q}
|
|
|
+
|
|
|
store = SQLiteClusterStore(DB_PATH)
|
|
|
|
|
|
- # Cache-first: only refresh if we currently have no fresh clusters for this topic.
|
|
|
- clusters = store.get_latest_clusters(topic=topic_norm, ttl_hours=CLUSTERS_TTL_HOURS, limit=limit)
|
|
|
- if not clusters:
|
|
|
- await refresh_clusters(topic=topic_norm, limit=200)
|
|
|
+ if is_topic:
|
|
|
+ # Cache-first: only refresh if we currently have no fresh clusters for this topic.
|
|
|
clusters = store.get_latest_clusters(topic=topic_norm, ttl_hours=CLUSTERS_TTL_HOURS, limit=limit)
|
|
|
+ if not clusters:
|
|
|
+ await refresh_clusters(topic=topic_norm, limit=200)
|
|
|
+ clusters = store.get_latest_clusters(topic=topic_norm, ttl_hours=CLUSTERS_TTL_HOURS, limit=limit)
|
|
|
+ else:
|
|
|
+ # Entity-aware mode: search recent clusters across all topics and match by
|
|
|
+ # raw entity, canonical label, or MID.
|
|
|
+ clusters = store.get_latest_clusters_all_topics(ttl_hours=CLUSTERS_TTL_HOURS, limit=limit * 8)
|
|
|
+ filtered = []
|
|
|
+ for c in clusters:
|
|
|
+ haystack = _cluster_entity_haystack(c)
|
|
|
+ if any(any(term in item for item in haystack) for term in query_terms):
|
|
|
+ filtered.append(c)
|
|
|
+ if len(filtered) >= limit:
|
|
|
+ break
|
|
|
+ clusters = filtered
|
|
|
|
|
|
# Ensure the response is compact and agent-friendly.
|
|
|
clusters_sorted = sorted(clusters, key=lambda x: float(x.get("importance", 0.0)), reverse=True)
|
|
|
@@ -66,14 +103,23 @@ async def get_events_for_entity(entity: str, limit: int = 10):
|
|
|
if not query:
|
|
|
return []
|
|
|
|
|
|
+ resolved = resolve_entity_via_trends(query)
|
|
|
+ query_terms = {
|
|
|
+ query,
|
|
|
+ str(resolved.get("normalized") or "").strip().lower(),
|
|
|
+ str(resolved.get("canonical_label") or "").strip().lower(),
|
|
|
+ str(resolved.get("mid") or "").strip().lower(),
|
|
|
+ }
|
|
|
+ query_terms = {q for q in query_terms if q}
|
|
|
+
|
|
|
# Cache-first: search recent clusters across all topics.
|
|
|
store = SQLiteClusterStore(DB_PATH)
|
|
|
clusters = store.get_latest_clusters_all_topics(ttl_hours=CLUSTERS_TTL_HOURS, limit=limit * 5)
|
|
|
|
|
|
hits = []
|
|
|
for c in clusters:
|
|
|
- ents = c.get("entities") or []
|
|
|
- if any(query in str(e).lower() for e in ents):
|
|
|
+ haystack = _cluster_entity_haystack(c)
|
|
|
+ if any(any(term in item for item in haystack) for term in query_terms):
|
|
|
hits.append(c)
|
|
|
if len(hits) >= limit:
|
|
|
break
|
|
|
@@ -191,6 +237,14 @@ async def get_news_sentiment(entity: str, timeframe: str = "24h"):
|
|
|
store = SQLiteClusterStore(DB_PATH)
|
|
|
|
|
|
ent = normalize_query(entity).strip().lower()
|
|
|
+ resolved = resolve_entity_via_trends(ent)
|
|
|
+ query_terms = {
|
|
|
+ ent,
|
|
|
+ str(resolved.get("normalized") or "").strip().lower(),
|
|
|
+ str(resolved.get("canonical_label") or "").strip().lower(),
|
|
|
+ str(resolved.get("mid") or "").strip().lower(),
|
|
|
+ }
|
|
|
+ query_terms = {q for q in query_terms if q}
|
|
|
if not ent:
|
|
|
return {
|
|
|
"entity": entity,
|
|
|
@@ -210,8 +264,8 @@ async def get_news_sentiment(entity: str, timeframe: str = "24h"):
|
|
|
clusters = store.get_latest_clusters_all_topics(ttl_hours=hours, limit=500)
|
|
|
matched = []
|
|
|
for c in clusters:
|
|
|
- ents = c.get("entities") or []
|
|
|
- if any(ent in str(e).lower() for e in ents):
|
|
|
+ haystack = _cluster_entity_haystack(c)
|
|
|
+ if any(any(term in item for item in haystack) for term in query_terms):
|
|
|
matched.append(c)
|
|
|
|
|
|
if not matched:
|