Prechádzať zdrojové kódy

feat: detect_emerging_topics rewrite — timeframe, velocity, topic/around scoping

- New params: timeframe (lookback window), topic (category filter),
  around (entity neighborhood scoping)
- Velocity scoring: splits window into recent vs prior half,
  computes acceleration ratio with smoothing
- Composed trend_score: velocity 35%, recency 25%, source_diversity 15%,
  sustained_buckets 10%, importance 15%
- Output: velocity, recent_count, prior_count, source_count per result
- Fetches 500 clusters (was 200) for meaningful velocity stats
- Tool card and composition recipe updated
- All 38 tests pass
Lukas Goldschmidt 1 týždeň pred
rodič
commit
763a686ed0
2 zmenil súbory, kde vykonal 210 pridanie a 63 odobranie
  1. 8 0
      RELEASE_NOTES.md
  2. 202 63
      news_mcp/mcp_server_fastmcp.py

+ 8 - 0
RELEASE_NOTES.md

@@ -3,6 +3,13 @@
 ## v0.3.1 — stable cluster IDs, cross-cycle merge, orphan dedup, multi-article signals
 
 ### Highlights
+- **Emerging topics rewrite** (`detect_emerging_topics`): complete rewrite with 5 new capabilities:
+  - **Timeframe parameter** (`"4h"`, `"24h"`, `"3d"`, etc.) — controls lookback window instead of always using `DEFAULT_LOOKBACK_HOURS`
+  - **Velocity scoring** — splits the window into recent vs prior half, computes `velocity = (recent + 0.5) / (prior + 0.5)`. Entities accelerating now vs before score much higher than steady-state ones
+  - **Composed trend score** — replaces the flat `0.25 + 0.40*imp + 0.08*count` formula with a weighted combination of: velocity (35%), recency concentration (25%), source diversity (15%), sustained presence across time buckets (10%), importance (15%)
+  - **Topic scoping** — optional `topic` parameter filters to a specific category before scoring
+  - **Entity neighborhood scoping** — optional `around` parameter only returns entities co-occurring with the specified entity (e.g. `around="Bitcoin"` finds what's emerging in Bitcoin's neighborhood)
+  - **Richer output** — each result now includes `velocity`, `recent_count`, `prior_count`, `source_count` alongside `trend_score` and `related_entities`
 - **Multi-article signal comparison**: `_signals()` now compares a new article against ALL articles in a candidate cluster (not just the seed). The best title and jaccard scores across all cluster members are used for matching.
 - **Stable cluster IDs**: `cluster_id = sha1(topic | min_article_key)` instead of `sha1(topic | seed_title)`. The same set of articles always maps to the same ID regardless of processing order. This eliminates duplicate clusters for the same event.
 - **Cross-cycle merge**: the poller loads recent clusters from the DB (controlled by `NEWS_CLUSTER_MAX_AGE_HOURS`, default 4h) and seeds them as merge targets before clustering. New articles in poll N+1 can merge into clusters created in poll N.
@@ -15,6 +22,7 @@
 - No database schema changes.
 - Existing cluster IDs will change format on the next polling cycle (old rows are updated in-place via `ON CONFLICT(cluster_id)` once the new ID is computed). Transient enrichment cache misses may occur for one cycle.
 - Old duplicate clusters (same event, different IDs) will age out via pruning. To clean them immediately, run the article dedup cleanup script.
+- `detect_emerging_topics` output shape changed: `count` replaced by `recent_count` + `prior_count`, new fields `velocity` and `source_count`. Clients using the old `count` field need to switch to `recent_count`.
 
 ## v0.3.0 — concurrent polling, enrichment retry, all-topics default
 

+ 202 - 63
news_mcp/mcp_server_fastmcp.py

@@ -2,6 +2,8 @@ from __future__ import annotations
 
 import asyncio
 import logging
+import math
+import re
 import time
 from collections import Counter
 from datetime import datetime, timezone
@@ -157,9 +159,14 @@ NEWS_TOOL_CARDS = [
     _tool_card(
         "detect_emerging_topics",
         "Surface entities and phrases starting to matter in the recent window.",
-        [{"name": "limit", "type": "integer", "default": 10, "range": "1-20"}],
-        ["topic", "trend_score", "related_entities", "signal_type", "count", "avg_importance"],
-        ["Good for 'what is heating up?' style questions."],
+        [
+            {"name": "limit", "type": "integer", "default": 10, "range": "1-20"},
+            {"name": "timeframe", "type": "string", "default": "24h", "examples": ["4h", "24h", "3d"]},
+            {"name": "topic", "type": "string", "default": "all topics", "examples": ["crypto", "macro", "regulation", "ai", "other"]},
+            {"name": "around", "type": "string", "default": "none", "meaning": "entity to scope results to its neighborhood (e.g. \"Bitcoin\")"},
+        ],
+        ["topic", "trend_score", "velocity", "recent_count", "prior_count", "source_count", "related_entities", "signal_type"],
+        ["Use timeframe to control lookback, topic to scope to a category, around to find what's emerging near a specific entity."],
     ),
     _tool_card(
         "get_news_sentiment",
@@ -220,7 +227,7 @@ NEWS_COMPOSITION_RECIPES = [
             "get_events_for_entity(entity=...)",
             "get_news_sentiment(entity=...)",
         ],
-        "notes": ["Good for trend scouting and risk mapping."],
+        "notes": ["Use timeframe to control lookback, topic to scope to a category, around to find what's emerging near a specific entity. Good for trend scouting and risk mapping."],
     },
 ]
 
@@ -515,98 +522,230 @@ async def get_event_summary(event_id: str, include_articles: bool = False):
     return out
 
 
-@mcp.tool(description="Explore what is starting to matter: surface emerging entities and phrases from recent clusters.")
-async def detect_emerging_topics(limit: int = 10):
+@mcp.tool(description="Explore what is starting to matter: surface emerging entities and phrases from recent clusters. "
+           "Use timeframe to control the lookback window, topic to scope to a category, and around to find what's emerging near a specific entity.")
+async def detect_emerging_topics(limit: int = 10, timeframe: str = "24h", topic: str | None = None, around: str | None = None):
+    """Surface entities and phrases that are accelerating in recent clusters.
+
+    Args:
+        limit: max results to return (1-20, default 10).
+        timeframe: lookback window like "4h", "24h", "3d" (default "24h").
+        topic: optional coarse topic filter ("crypto", "macro", "regulation", "ai", "other").
+        around: optional entity — only return entities that co-occur with this entity
+                 in the recent window (e.g. "Bitcoin" to find what's emerging in Bitcoin's neighborhood).
+    """
     limit = max(1, min(int(limit), 20))
+    hours = _parse_timeframe_to_hours(timeframe)
+    half_hours = hours / 2.0
+
     store = SQLiteClusterStore(DB_PATH)
-    clusters = store.get_latest_clusters_all_topics(ttl_hours=DEFAULT_LOOKBACK_HOURS, limit=200)
+    # Fetch more clusters than needed so velocity stats are meaningful even for short windows.
+    clusters = store.get_latest_clusters_all_topics(ttl_hours=hours, limit=500)
 
-    import re
+    # --- optional topic filter ---
+    if topic:
+        topic_norm = normalize_query(topic).strip().lower()
+        if topic_norm:
+            clusters = [c for c in clusters if (c.get("topic") or "other").strip().lower() == topic_norm]
+
+    # --- resolve the 'around' entity ---
+    around_terms: set[str] = set()
+    if around:
+        around_norm = normalize_query(around).strip().lower()
+        if around_norm:
+            resolved = resolve_entity_via_trends(around_norm)
+            around_terms = {
+                around_norm,
+                str(resolved.get("normalized") or "").strip().lower(),
+                str(resolved.get("canonical_label") or "").strip().lower(),
+            }
+            around_terms.discard("")
+
+    # split clusters into first-half vs second-half by timestamp
+    # clusters are already sorted most-recent-first from the store
+    now = datetime.now(timezone.utc)
+
+    def _cluster_age_hours(c: dict) -> float:
+        """Return the cluster's age in hours (approximate, from now)."""
+        ts = c.get("timestamp") or c.get("last_updated")
+        if not ts:
+            return 0.0  # treat un-dated as fresh
+        try:
+            s = str(ts).replace("Z", "+00:00")
+            dt = datetime.fromisoformat(s)
+            if dt.tzinfo is None:
+                dt = dt.replace(tzinfo=timezone.utc)
+            return max(0.0, (now - dt.astimezone(timezone.utc)).total_seconds() / 3600.0)
+        except Exception:
+            try:
+                dt = parsedate_to_datetime(str(ts))
+                if dt.tzinfo is None:
+                    dt = dt.replace(tzinfo=timezone.utc)
+                return max(0.0, (now - dt.astimezone(timezone.utc)).total_seconds() / 3600.0)
+            except Exception:
+                return 0.0
 
-    entity_counts = Counter()
-    entity_importance_sum = Counter()
-    # co-occurrence: ent -> other_ent -> count
-    entity_cooccur = {}
-    phrase_counts = Counter()
-    topic_counts = Counter()
+    # Generic entity filter
+    _generic_tokens = {"news", "latest", "breaking", "update", "updates", "report", "reports"}
 
-    # Very light heuristics to reduce “meta entities” dominating emerging topics.
-    # Keep it conservative: only skip obvious boilerplate.
     def _is_generic_entity(ent: str) -> bool:
         e = str(ent).strip().lower()
-        if not e:
+        if not e or len(e) < 4:
             return True
-        if len(e) < 4:
-            return True
-        # common outlet-ish / meta-ish tokens
-        if e in {"news", "latest", "breaking"}:
+        if e in _generic_tokens:
             return True
         return False
 
+    # --- accumulate signals ---
+    # recent = second half of timeframe (newer), prior = first half (older)
+    entity_counts_recent = Counter()
+    entity_counts_prior = Counter()
+    entity_importance_recent = Counter()
+    entity_sources: dict[str, set] = {}  # ent -> set of source names
+    entity_buckets: dict[str, set] = {}  # ent -> set of time-bucket indices (for sustained-spike detection)
+    entity_cooccur: dict[str, Counter] = {}
+    phrase_counts_recent = Counter()
+
+    bucket_size_hours = max(1.0, hours / 6.0)  # split window into ~6 buckets
+
     for c in clusters:
-        topic_counts[c.get("topic", "other")] += 1
         ents_in_cluster = [e for e in (c.get("entities", []) or []) if not _is_generic_entity(e)]
-        ents_in_cluster_norm = [str(e).strip().lower() for e in ents_in_cluster if str(e).strip()]
-        for ent in ents_in_cluster_norm:
-            if _is_generic_entity(ent):
+        ents_norm = [str(e).strip().lower() for e in ents_in_cluster if str(e).strip()]
+
+        age_h = _cluster_age_hours(c)
+        is_recent = age_h <= half_hours
+        bucket_idx = int(age_h / bucket_size_hours)
+
+        # --- around filter: only count clusters that mention the target entity ---
+        if around_terms:
+            haystack = set(ents_norm)
+            for res in c.get("entityResolutions", []) or []:
+                if isinstance(res, dict):
+                    for key in ("normalized", "canonical_label"):
+                        val = res.get(key)
+                        if val:
+                            haystack.add(str(val).strip().lower())
+            if not (haystack & around_terms):
                 continue
-            entity_counts[ent] += 1
-            try:
-                entity_importance_sum[ent] += float(c.get("importance", 0.0) or 0.0)
-            except Exception:
-                pass
 
-        # update co-occurrence counts
-        for i in range(len(ents_in_cluster_norm)):
-            a = ents_in_cluster_norm[i]
-            if not a:
+        counts = entity_counts_recent if is_recent else entity_counts_prior
+        imp_acc = entity_importance_recent if is_recent else None  # only importance from recent window
+
+        for ent in ents_norm:
+            if _is_generic_entity(ent):
+                continue
+            counts[ent] += 1
+            if ent not in entity_sources:
+                entity_sources[ent] = set()
+            src = c.get("source") or c.get("headline", "").split(" - ")[-1] if c.get("headline") else ""
+            if src:
+                entity_sources[ent].add(str(src))
+            if ent not in entity_buckets:
+                entity_buckets[ent] = set()
+            entity_buckets[ent].add(bucket_idx)
+            if imp_acc is not None:
+                try:
+                    imp_acc[ent] += float(c.get("importance", 0.0) or 0.0)
+                except Exception:
+                    pass
+
+        # co-occurrence (only for clusters matching the around filter, if any)
+        for i in range(len(ents_norm)):
+            a = ents_norm[i]
+            if _is_generic_entity(a):
                 continue
-            entity_cooccur.setdefault(a, Counter())
-            for j in range(len(ents_in_cluster_norm)):
+            if a not in entity_cooccur:
+                entity_cooccur[a] = Counter()
+            for j in range(len(ents_norm)):
                 if i == j:
                     continue
-                b = ents_in_cluster_norm[j]
-                if not b:
+                b = ents_norm[j]
+                if _is_generic_entity(b):
                     continue
                 entity_cooccur[a][b] += 1
 
-        text = f"{c.get('headline','')} {c.get('summary','')}"
-        words = [w for w in re.findall(r"[A-Za-z][A-Za-z0-9\-]{2,}", text.lower())]
-        for i in range(len(words) - 1):
-            phrase = f"{words[i]} {words[i+1]}"
-            if len(phrase) > 6:
-                phrase_counts[phrase] += 1
-
-    emerging = []
-    # Combine frequency with average importance so “big signal” rises over pure repetition.
-    for ent, count in entity_counts.most_common(limit):
-        avg_imp = entity_importance_sum[ent] / max(1, count)
-        # avg_imp is typically 0..~1; keep score bounded.
-        trend_score = 0.25 + 0.40 * min(1.0, avg_imp) + 0.08 * min(6.0, float(count))
+        # bigram phrases (recent only)
+        if is_recent:
+            text = f"{c.get('headline', '')} {c.get('summary', '')}"
+            words = re.findall(r"[A-Za-z][A-Za-z0-9\-]{2,}", text.lower())
+            for i in range(len(words) - 1):
+                phrase = f"{words[i]} {words[i+1]}"
+                if len(phrase) > 6:
+                    phrase_counts_recent[phrase] += 1
+
+    # --- score entities ---
+    all_entities = set(entity_counts_recent.keys()) | set(entity_counts_prior.keys())
+    scored = []
+
+    for ent in all_entities:
+        recent_n = entity_counts_recent.get(ent, 0)
+        prior_n = entity_counts_prior.get(ent, 0)
+        total_n = recent_n + prior_n
+
+        if total_n < 1:
+            continue
+
+        # velocity: ratio of recent vs prior (smoothed to avoid division noise)
+        # 0 prior → velocity = recent_n (pure emergence)
+        # equal → velocity = 1.0 (steady)
+        velocity = (recent_n + 0.5) / (prior_n + 0.5)
+
+        # recency weight: what fraction of total hits are in the recent window
+        recency_ratio = recent_n / total_n
+
+        # source diversity: how many distinct outlets
+        n_sources = len(entity_sources.get(ent, set()))
+
+        # sustained: how many distinct time buckets did it appear in (max ~6)
+        n_buckets = len(entity_buckets.get(ent, set()))
+
+        # average importance (recent window only)
+        avg_imp = (entity_importance_recent.get(ent, 0.0) / max(1, recent_n)) if recent_n > 0 else 0.0
+
+        composed_score = (
+            0.35 * min(1.0, math.log1p(velocity) / math.log1p(4.0)) +   # velocity (0..1, 4x = max)
+            0.25 * recency_ratio +                                        # recency concentration
+            0.15 * min(1.0, n_sources / 5.0) +                            # source diversity
+            0.10 * min(1.0, n_buckets / 4.0) +                            # sustained (>1 bucket)
+            0.15 * min(1.0, avg_imp)                                      # importance
+        )
+
         related = []
-        for other, _cnt in (entity_cooccur.get(ent) or Counter()).most_common(3):
-            # avoid returning the entity itself (shouldn't happen, but be safe)
-            if other != ent:
-                related.append(other)
+        if ent in entity_cooccur:
+            for other, _cnt in entity_cooccur[ent].most_common(5):
+                if other != ent:
+                    related.append(other)
 
-        emerging.append({
+        scored.append({
             "topic": ent,
-            "trend_score": min(0.99, round(trend_score, 2)),
-            "related_entities": related if related else [ent],
-            "signal_type": "entity",
-            "count": count,
+            "trend_score": min(0.99, round(composed_score, 3)),
+            "related_entities": related[:3] if related else [ent],
+            "velocity": round(velocity, 2),
+            "recent_count": recent_n,
+            "prior_count": prior_n,
+            "source_count": n_sources,
             "avg_importance": round(avg_imp, 3),
+            "signal_type": "entity",
         })
 
-    for phrase, count in phrase_counts.most_common(limit * 2):
+    # sort by composed score descending
+    scored.sort(key=lambda x: (-x["trend_score"], -x["velocity"], x["topic"]))
+
+    # --- add phrase signals (only from recent window) ---
+    emerging = list(scored)  # start with entities
+    for phrase, count in phrase_counts_recent.most_common(limit * 2):
         if any(item["topic"] == phrase for item in emerging):
             continue
         emerging.append({
             "topic": phrase.title(),
-            "trend_score": min(0.99, round(0.20 + 0.10 * count, 2)),
+            "trend_score": min(0.99, round(0.30 + 0.15 * min(count, 5), 2)),
             "related_entities": [],
+            "velocity": None,
+            "recent_count": count,
+            "prior_count": 0,
+            "source_count": 0,
+            "avg_importance": 0.0,
             "signal_type": "phrase",
-            "count": count,
         })
         if len(emerging) >= limit:
             break