|
|
@@ -2,6 +2,7 @@ from __future__ import annotations
|
|
|
|
|
|
import asyncio
|
|
|
import hashlib
|
|
|
+import json
|
|
|
import logging
|
|
|
|
|
|
import math
|
|
|
@@ -175,10 +176,10 @@ NEWS_TOOL_CARDS = [
|
|
|
"Produce a concise LLM-written explanation for one cluster and key facts.",
|
|
|
[
|
|
|
{"name": "event_id", "type": "string", "meaning": "cluster_id; do not surface in user-facing prose"},
|
|
|
- {"name": "include_articles", "type": "boolean", "default": False},
|
|
|
+ {"name": "include_articles", "type": "boolean", "default": True},
|
|
|
],
|
|
|
- ["headline", "mergedSummary", "keyFacts", "sources", "articles?"],
|
|
|
- ["Prefer this after you have already chosen a specific cluster to explain."],
|
|
|
+ ["headline", "mergedSummary", "keyFacts", "sources", "entities", "keywords", "related_entities", "related_keywords", "topic", "sentiment", "importance", "articles"],
|
|
|
+ ["Rich cluster drill-down. Returns LLM summary + cluster metadata + articles. Defaults to include articles."],
|
|
|
),
|
|
|
_tool_card(
|
|
|
"detect_emerging_topics",
|
|
|
@@ -189,7 +190,7 @@ NEWS_TOOL_CARDS = [
|
|
|
{"name": "topic", "type": "string", "default": "all topics", "examples": ["crypto", "macro", "regulation", "ai", "other"]},
|
|
|
{"name": "around", "type": "string", "default": "none", "meaning": "entity to scope results to its neighborhood (e.g. \"Bitcoin\")"},
|
|
|
],
|
|
|
- ["topic", "trend_score", "velocity", "recent_count", "prior_count", "source_count", "related_entities", "signal_type"],
|
|
|
+ ["topic", "trend_score", "velocity", "recent_count", "prior_count", "source_count", "related_entities", "related_keywords", "signal_type"],
|
|
|
["Use timeframe to control lookback, topic to scope to a category, around to find what's emerging near a specific entity. Signal types: entity (named entity), keyword (thematic descriptor), phrase (headline bigram). Check velocity and source_count to distinguish real spikes from noise."],
|
|
|
),
|
|
|
_tool_card(
|
|
|
@@ -263,8 +264,10 @@ NEWS_AGENT_TIPS = [
|
|
|
"When describing clusters, keep sources and timestamps visible so the user can assess recency and provenance.",
|
|
|
"Prefer a short chain of tools over many parallel calls unless you are building a neighborhood map or comparison table.",
|
|
|
"For tricky names, rely on the server's resolver instead of inventing alias rules in the client.",
|
|
|
- "Use detect_emerging_topics with timeframe=\"4h\" for what's hot right now, timeframe=\"3d\" for weekly trends. Use topic= to scope to a category, around= to find what's emerging near a specific entity. Check velocity to distinguish accelerating signals from steady-state ones. Filter by signal_type to focus on entities, keywords, or phrases.",
|
|
|
+ "Use detect_emerging_topics with timeframe=\"4h\" for what's hot right now, timeframe=\"3d\" for weekly trends. Use topic= to scope to a category, around= to find what's emerging near a specific entity. Check velocity to distinguish accelerating signals from steady-state ones. Filter by signal_type to focus on entities, keywords, or phrases. Each result also includes related_keywords for thematic context.",
|
|
|
+ "get_event_summary returns a rich result: headline, mergedSummary, keyFacts, entities, keywords, related_entities, related_keywords, topic, sentiment, importance, and articles (included by default). Use it for full cluster drill-down.",
|
|
|
"Each cluster contains both entities (named entities with identity resolution) and keywords (thematic descriptors). Use keywords to understand what a story is about beyond the named entities.",
|
|
|
+ "Use detect_emerging_topics with multiple timeframes (e.g. 4h vs 3d) and compare results to distinguish what's hot right now vs what's persistently trending. related_keywords help identify thematic neighborhoods.",
|
|
|
]
|
|
|
|
|
|
|
|
|
@@ -305,6 +308,16 @@ NEWS_EXAMPLE_CHAINS = [
|
|
|
"get_events_for_entity(entity=...) on the top emerging neighbor",
|
|
|
],
|
|
|
},
|
|
|
+ {
|
|
|
+ "task": "Full investigation pipeline",
|
|
|
+ "chain": [
|
|
|
+ "detect_emerging_topics(limit=20, timeframe=\"3d\")",
|
|
|
+ "pick an emerging entity/keyword and note its related_entities and related_keywords",
|
|
|
+ "get_event_summary(event_id=...) on the top cluster for full context including articles",
|
|
|
+ "get_news_sentiment(entity=...) to gauge tone around the emerging topic",
|
|
|
+ "detect_emerging_topics(around=<entity>, timeframe=\"4h\") to scout its neighborhood",
|
|
|
+ ],
|
|
|
+ },
|
|
|
]
|
|
|
|
|
|
|
|
|
@@ -470,28 +483,48 @@ async def get_related_recent_entities(subject: str, timeframe: str = "72h", limi
|
|
|
return result
|
|
|
|
|
|
|
|
|
-@mcp.tool(description="Investigate one cluster in depth and return a concise LLM-written explanation plus key facts.")
|
|
|
-async def get_event_summary(event_id: str, include_articles: bool = False):
|
|
|
+@mcp.tool(description="Investigate one cluster in depth and return a concise LLM-written explanation plus key facts, "
|
|
|
+ "entities, keywords, related entities and keywords, sentiment, importance, and articles.")
|
|
|
+async def get_event_summary(event_id: str, include_articles: bool = True):
|
|
|
store = SQLiteClusterStore(DB_PATH)
|
|
|
|
|
|
# Summary cache: reuse if present within TTL.
|
|
|
+ cluster = store.get_cluster_by_id(event_id)
|
|
|
+ if not cluster:
|
|
|
+ return {
|
|
|
+ "event_id": event_id,
|
|
|
+ "error": "NOT_FOUND",
|
|
|
+ }
|
|
|
+
|
|
|
cached_summary = store.get_cluster_summary(
|
|
|
cluster_id=event_id,
|
|
|
ttl_hours=DEFAULT_LOOKBACK_HOURS,
|
|
|
)
|
|
|
- if cached_summary:
|
|
|
- out = {
|
|
|
- "event_id": event_id,
|
|
|
- "headline": cached_summary.get("headline"),
|
|
|
- "mergedSummary": cached_summary.get("mergedSummary"),
|
|
|
- "keyFacts": cached_summary.get("keyFacts", []),
|
|
|
- "sources": cached_summary.get("sources", []),
|
|
|
- }
|
|
|
|
|
|
+ def _enrich(base: dict, src_cluster: dict) -> dict:
|
|
|
+ base["entities"] = src_cluster.get("entities", [])
|
|
|
+ base["keywords"] = src_cluster.get("keywords", [])
|
|
|
+ base["topic"] = src_cluster.get("topic", "other")
|
|
|
+ base["sentiment"] = src_cluster.get("sentiment", "neutral")
|
|
|
+ base["sentimentScore"] = src_cluster.get("sentimentScore")
|
|
|
+ base["importance"] = src_cluster.get("importance", 0.0)
|
|
|
+ # Related entities: from co-occurrence in this cluster's article set
|
|
|
+ resolved = src_cluster.get("entityResolutions", []) or []
|
|
|
+ related_ents = []
|
|
|
+ seen_ents = {str(e).strip().lower() for e in (src_cluster.get("entities", []) or [])}
|
|
|
+ for res in resolved:
|
|
|
+ if isinstance(res, dict):
|
|
|
+ label = str(res.get("canonical_label") or res.get("normalized") or "").strip()
|
|
|
+ if label and label.lower() not in seen_ents:
|
|
|
+ related_ents.append(label)
|
|
|
+ seen_ents.add(label.lower())
|
|
|
+ base["related_entities"] = related_ents[:10]
|
|
|
+ # Related keywords: from the cluster's own keywords (thematic descriptors)
|
|
|
+ # plus any co-occurring keywords from recent related clusters
|
|
|
+ base["related_keywords"] = _fetch_related_keywords(store, src_cluster, event_id)
|
|
|
if include_articles:
|
|
|
- cluster = store.get_cluster_by_id(event_id)
|
|
|
- arts = (cluster or {}).get("articles", []) or []
|
|
|
- out["articles"] = [
|
|
|
+ arts = src_cluster.get("articles", []) or []
|
|
|
+ base["articles"] = [
|
|
|
{
|
|
|
"title": a.get("title"),
|
|
|
"url": a.get("url"),
|
|
|
@@ -501,31 +534,20 @@ async def get_event_summary(event_id: str, include_articles: bool = False):
|
|
|
for a in arts
|
|
|
if isinstance(a, dict)
|
|
|
]
|
|
|
- return out
|
|
|
+ return base
|
|
|
|
|
|
- cluster = store.get_cluster_by_id(event_id)
|
|
|
- if not cluster:
|
|
|
- return {
|
|
|
+ if cached_summary:
|
|
|
+ out = {
|
|
|
"event_id": event_id,
|
|
|
- "error": "NOT_FOUND",
|
|
|
+ "headline": cached_summary.get("headline"),
|
|
|
+ "mergedSummary": cached_summary.get("mergedSummary"),
|
|
|
+ "keyFacts": cached_summary.get("keyFacts", []),
|
|
|
+ "sources": cached_summary.get("sources", []),
|
|
|
}
|
|
|
-
|
|
|
- articles_out = None
|
|
|
- if include_articles:
|
|
|
- arts = cluster.get("articles", []) or []
|
|
|
- articles_out = [
|
|
|
- {
|
|
|
- "title": a.get("title"),
|
|
|
- "url": a.get("url"),
|
|
|
- "source": a.get("source"),
|
|
|
- "timestamp": a.get("timestamp"),
|
|
|
- }
|
|
|
- for a in arts
|
|
|
- if isinstance(a, dict)
|
|
|
- ]
|
|
|
+ out = _enrich(out, cluster)
|
|
|
+ return out
|
|
|
|
|
|
summary = await summarize_cluster_llm(cluster)
|
|
|
-
|
|
|
store.upsert_cluster_summary(event_id, summary)
|
|
|
out = {
|
|
|
"event_id": event_id,
|
|
|
@@ -534,12 +556,75 @@ async def get_event_summary(event_id: str, include_articles: bool = False):
|
|
|
"keyFacts": summary.get("keyFacts", []),
|
|
|
"sources": summary.get("sources", []),
|
|
|
}
|
|
|
-
|
|
|
- if include_articles:
|
|
|
- out["articles"] = articles_out or []
|
|
|
+ out = _enrich(out, cluster)
|
|
|
return out
|
|
|
|
|
|
|
|
|
+def _fetch_related_keywords(store: SQLiteClusterStore, cluster: dict, event_id: str) -> list[str]:
|
|
|
+ """Find keywords that co-occur with this cluster's entities in recent clusters.
|
|
|
+
|
|
|
+ This gives agents thematic context: what else was being discussed alongside
|
|
|
+ the entities in this cluster during the same window.
|
|
|
+ """
|
|
|
+ entities = cluster.get("entities", []) or []
|
|
|
+ if not entities:
|
|
|
+ return []
|
|
|
+
|
|
|
+ # Build a set of entity terms to search with
|
|
|
+ entity_terms = set()
|
|
|
+ for e in entities:
|
|
|
+ entity_terms.add(str(e).strip().lower())
|
|
|
+ for res in (cluster.get("entityResolutions", []) or []):
|
|
|
+ if isinstance(res, dict):
|
|
|
+ for key in ("normalized", "canonical_label"):
|
|
|
+ val = res.get(key)
|
|
|
+ if val:
|
|
|
+ entity_terms.add(str(val).strip().lower())
|
|
|
+ entity_terms.discard("")
|
|
|
+
|
|
|
+ if not entity_terms:
|
|
|
+ return []
|
|
|
+
|
|
|
+ # Find recent clusters that share any entity, collect their keywords
|
|
|
+ # Use payload_ts lookback of 48h for co-occurrence window
|
|
|
+ from datetime import timedelta
|
|
|
+ cutoff = (datetime.now(timezone.utc) - timedelta(hours=48)).isoformat()
|
|
|
+ placeholders = ",".join("?" for _ in entity_terms)
|
|
|
+
|
|
|
+ try:
|
|
|
+ rows = store._conn().execute(
|
|
|
+ f"SELECT DISTINCT c.payload FROM clusters c "
|
|
|
+ f"JOIN cluster_entities ce ON c.cluster_id = ce.cluster_id "
|
|
|
+ f"WHERE c.payload_ts >= ? AND c.cluster_id != ? "
|
|
|
+ f"AND ce.entity IN ({placeholders}) "
|
|
|
+ f"ORDER BY c.payload_ts DESC LIMIT 20",
|
|
|
+ (cutoff, event_id, *entity_terms),
|
|
|
+ ).fetchall()
|
|
|
+ except Exception:
|
|
|
+ return []
|
|
|
+
|
|
|
+ kw_counter: dict[str, int] = {}
|
|
|
+ cluster_kws = {str(k).strip().lower() for k in (cluster.get("keywords", []) or []) if str(k).strip()}
|
|
|
+ for (payload_text,) in rows:
|
|
|
+ try:
|
|
|
+ c = json.loads(payload_text)
|
|
|
+ except Exception:
|
|
|
+ continue
|
|
|
+ for kw in (c.get("keywords", []) or []):
|
|
|
+ kw_norm = str(kw).strip()
|
|
|
+ if not kw_norm:
|
|
|
+ continue
|
|
|
+ kw_key = kw_norm.lower()
|
|
|
+ # Skip keywords that already appear in this cluster
|
|
|
+ if kw_key in cluster_kws:
|
|
|
+ continue
|
|
|
+ kw_counter[kw_norm] = kw_counter.get(kw_norm, 0) + 1
|
|
|
+
|
|
|
+ # Return top keywords by co-occurrence count
|
|
|
+ sorted_kws = sorted(kw_counter.items(), key=lambda x: -x[1])
|
|
|
+ return [kw for kw, _ in sorted_kws[:10]]
|
|
|
+
|
|
|
+
|
|
|
@mcp.tool(description="Explore what is starting to matter: surface emerging entities, thematic keywords, and phrases from recent clusters. "
|
|
|
"Use timeframe to control the lookback window, topic to scope to a category, and around to find what's emerging near a specific entity. "
|
|
|
"Results include signal_type (entity / keyword / phrase) for downstream filtering.")
|
|
|
@@ -625,6 +710,7 @@ async def detect_emerging_topics(limit: int = 10, timeframe: str = "24h", topic:
|
|
|
kw_importance_recent = Counter()
|
|
|
kw_sources: dict[str, set] = {}
|
|
|
kw_buckets: dict[str, set] = {}
|
|
|
+ kw_cooccur: dict[str, Counter] = {}
|
|
|
|
|
|
bucket_size_hours = max(1.0, hours / 6.0) # split window into ~6 buckets
|
|
|
|
|
|
@@ -711,6 +797,22 @@ async def detect_emerging_topics(limit: int = 10, timeframe: str = "24h", topic:
|
|
|
continue
|
|
|
entity_cooccur[a][b] += 1
|
|
|
|
|
|
+ # keyword co-occurrence: which keywords appear together in the same clusters
|
|
|
+ for i in range(len(kws_in_cluster)):
|
|
|
+ ka = kws_in_cluster[i]
|
|
|
+ if ka not in kw_cooccur:
|
|
|
+ kw_cooccur[ka] = Counter()
|
|
|
+ for j in range(len(kws_in_cluster)):
|
|
|
+ if i == j:
|
|
|
+ continue
|
|
|
+ kb = kws_in_cluster[j]
|
|
|
+ kw_cooccur[ka][kb] += 1
|
|
|
+ # also track entity↔keyword co-occurrence
|
|
|
+ for ent in ents_norm:
|
|
|
+ if _is_generic_entity(ent):
|
|
|
+ continue
|
|
|
+ kw_cooccur[ka][ent] += 1
|
|
|
+
|
|
|
# bigram phrases (recent only)
|
|
|
if is_recent:
|
|
|
text = f"{c.get('headline', '')} {c.get('summary', '')}"
|
|
|
@@ -763,10 +865,17 @@ async def detect_emerging_topics(limit: int = 10, timeframe: str = "24h", topic:
|
|
|
if other != ent:
|
|
|
related.append(other)
|
|
|
|
|
|
+ related_kws = []
|
|
|
+ if ent in kw_cooccur:
|
|
|
+ for kw, _cnt in kw_cooccur[ent].most_common(5):
|
|
|
+ if kw != ent:
|
|
|
+ related_kws.append(kw)
|
|
|
+
|
|
|
scored.append({
|
|
|
"topic": ent,
|
|
|
"trend_score": min(0.99, round(composed_score, 3)),
|
|
|
"related_entities": related[:3] if related else [ent],
|
|
|
+ "related_keywords": related_kws[:5],
|
|
|
"velocity": round(velocity, 2),
|
|
|
"recent_count": recent_n,
|
|
|
"prior_count": prior_n,
|
|
|
@@ -806,10 +915,17 @@ async def detect_emerging_topics(limit: int = 10, timeframe: str = "24h", topic:
|
|
|
0.15 * min(1.0, avg_imp)
|
|
|
)
|
|
|
|
|
|
+ kw_related_kws = []
|
|
|
+ if kw in kw_cooccur:
|
|
|
+ for other, _cnt in kw_cooccur[kw].most_common(5):
|
|
|
+ if other != kw:
|
|
|
+ kw_related_kws.append(other)
|
|
|
+
|
|
|
kw_scored.append({
|
|
|
"topic": kw,
|
|
|
"trend_score": min(0.99, round(composed_score, 3)),
|
|
|
"related_entities": [],
|
|
|
+ "related_keywords": kw_related_kws[:5],
|
|
|
"velocity": round(velocity, 2),
|
|
|
"recent_count": recent_n,
|
|
|
"prior_count": prior_n,
|
|
|
@@ -841,6 +957,7 @@ async def detect_emerging_topics(limit: int = 10, timeframe: str = "24h", topic:
|
|
|
"topic": phrase.title(),
|
|
|
"trend_score": min(0.99, round(0.30 + 0.15 * min(count, 5), 2)),
|
|
|
"related_entities": [],
|
|
|
+ "related_keywords": [],
|
|
|
"velocity": None,
|
|
|
"recent_count": count,
|
|
|
"prior_count": 0,
|