소스 검색

more fixes, maybe stable

Lukas Goldschmidt 1 일 전
부모
커밋
861c3e851d
11개의 변경된 파일1022개의 추가작업 그리고 85개의 파일을 삭제
  1. 65 0
      PROJECT.md
  2. 80 0
      README.md
  3. 9 1
      killserver.sh
  4. 188 43
      news_mcp/dedup/cluster.py
  5. 17 4
      news_mcp/enrichment/llm_enrich.py
  6. 22 3
      news_mcp/jobs/poller.py
  7. 33 9
      news_mcp/llm.py
  8. 132 21
      news_mcp/mcp_server_fastmcp.py
  9. 212 0
      news_mcp/storage/sqlite_store.py
  10. 17 4
      run.sh
  11. 247 0
      test_news_mcp.py

+ 65 - 0
PROJECT.md

@@ -55,3 +55,68 @@ Eventual agent tool shape (later): `get_emerging_entity_graph(timeframe, limit)`
 - Merge pass exists for destructive consolidation once thresholds look sane
 - Article-dedup cleanup exists for fixing duplicated article records already in SQLite
 - Entity lookup now respects timeframe as the scan window, with limit acting as a cap
+
+## Dashboard & REST API (added May 2026)
+
+### What was added
+- **5 REST endpoints** (`/api/v1/*`) for programmatic access to cluster data, sentiment series, entity frequencies, and health stats
+- **Dashboard SPA** at `/dashboard` — HTMX-based shell with Chart.js visualizations (5 views: health, clusters, sentiment, entities, detail)
+- **Non-blocking startup** — moved from synchronous `@app.on_event("startup")` pruning to `lifespan`-based fire-and-forget background loop; server responds within ~0.3s regardless of feed/LLM latency
+
+### Architecture
+```
+news-mcp/
+├── news_mcp/mcp_server_fastmcp.py   ← MCP tools + REST API + dashboard mount
+├── news_mcp/dashboard/
+│   ├── dashboard_store.py           ← Read-only query layer (no side effects)
+│   ├── index.html                   ← SPA shell with 5 views
+│   ├── style.css                    ← Dark theme, responsive
+│   └── dashboard.js                 ← Client-side rendering + Chart.js
+```
+
+### Key design decisions
+- Dashboard store wraps `SQLiteClusterStore` with thin read-only methods — no enrichment, no writes
+- Single shared store instance (`_shared_store`) avoids repeated DB connections
+- Static SPA files are served by FastAPI's `StaticFiles` mount — no Jinja2/templating dependency
+- Client-side `fetch()` + Chart.js avoids HTMX raw-JSON-in-DOM issues
+- Default lookback matches `NEWS_DEFAULT_LOOKBACK_HOURS` (144h), not a hardcoded 24h
+
+### Known gaps
+- No auth (LAN-only, no login)
+- Entity detail view in dashboard is minimal (click-to-expand from entity list is stub)
+- No alerting/threshold notifications yet (Phase 2: velocity spikes, sentiment divergence)
+
+## Dashboard & REST API (added May 2026)
+
+### What was added
+- **5 REST endpoints** (`/api/v1/*`) — JSON-only, for programmatic access and the dashboard
+- **Dashboard SPA** at `/dashboard` — 5 views (health, clusters, sentiment, entities, detail), Chart.js visualizations, instant client-side rendering
+- **Non-blocking startup** — replaced synchronous `@app.on_event("startup")` with `lifespan`-based fire-and-forget background loop; server responds in <0.3s
+- **Async ingestion lock** — `asyncio.Lock` prevents overlapping refresh cycles
+- **Hardened LLM calls** — OpenRouter retry logic with exponential backoff on 429/5xx, response shape validation
+
+### Architecture additions
+```
+news-mcp/
+├── news_mcp/mcp_server_fastmcp.py   ← MCP + REST API + /dashboard static mount
+├── news_mcp/dashboard/
+│   ├── __init__.py
+│   ├── dashboard_store.py           ← Read-only query layer (no side effects)
+│   ├── index.html                   ← SPA shell, 5 views
+│   ├── style.css                    ← Dark theme, responsive grid
+│   └── dashboard.js                 ← Client render, Chart.js, null-safe DOM access
+```
+
+### Design decisions
+- **Dashboard store** wraps `SQLiteClusterStore` with read-only methods (stats, pagination, series, frequencies, detail). No writes, no enrichment.
+- **Single shared store** (`_shared_store`) — one DB connection pool for the entire process.
+- **Static SPA** served via FastAPI `StaticFiles` — no Jinja2/templating dependency.
+- **Client-side rendering** with `fetch()` + Chart.js — avoids HTMX raw-JSON-in-DOM issues.
+- **Default lookback** follows `NEWS_DEFAULT_LOOKBACK_HOURS` (144h), not hardcoded.
+- **Cluster ordering** — always date-descending (SQL `ORDER BY updated_at DESC` + client-side sort as safety net).
+
+### Known gaps (for future work)
+- No auth (LAN-only assumption)
+- Entity detail view is functional but minimal
+- No alerting/threshold notifications (Phase 2)
+- No server-sent events for real-time dashboard updates

+ 80 - 0
README.md

@@ -287,3 +287,83 @@ The live clustering path also deduplicates article entries when new data comes i
 
 As of the latest hardening, the server/storage write path also self-heals `payload.articles` by deduplicating before persisting (so historical rows can be fixed via the cleanup script, and future writes won’t reintroduce duplicates). 
 ```
+
+## Dashboard (new)
+
+A browser-based monitoring dashboard is available at:
+```
+http://127.0.0.1:8506/dashboard/
+```
+
+**Views:**
+- **Health** — cluster/entity counts, freshness indicator, topic distribution (doughnut chart), sentiment overview, feed activity
+- **Clusters** — filterable/sortable table with topic, sentiment, importance, entity chips; search by keyword
+- **Sentiment** — time-series chart (avg sentiment per configurable time bucket) with cluster count overlay
+- **Entities** — top entities by mention frequency, horizontal bar chart, click for detail
+- **Detail** — click any cluster row or search by cluster ID for full drill-down (summary, key facts, articles, keywords, entities)
+
+**Tech:** Pure static HTML/JS with Chart.js for visualizations. Served from the same FastAPI process at `/dashboard`.
+
+**Configuration defaults:** The dashboard's default lookback window follows `NEWS_DEFAULT_LOOKBACK_HOURS` (configured via `.env`, default 144h).
+
+## REST API
+
+The following read-only endpoints are available for programmatic access (in addition to the MCP SSE tools):
+
+| Method | Path | Description |
+|--------|------|-------------|
+| GET | `/api/v1/health` | Extended health: cluster/entity counts, freshness, feed state, pruning config |
+| GET | `/api/v1/clusters` | Paginated clusters. Params: `topic`, `hours`, `limit`, `offset` |
+| GET | `/api/v1/sentiment-series` | Sentiment time-series. Params: `topic`, `hours`, `bucket_hours` |
+| GET | `/api/v1/entities` | Top entities by frequency. Params: `hours`, `limit` |
+| GET | `/api/v1/cluster/{cluster_id}` | Full cluster detail with summary, facts, articles |
+
+## Startup behavior
+
+The server uses a lifespan-based startup (FastAPI ≥0.111). Background feed refresh and pruning run as fire-and-forget coroutines, so the HTTP API and dashboard are available immediately — no blocking on the first fetch cycle.
+
+Important env vars controlling background behavior:
+- `NEWS_BACKGROUND_REFRESH_ENABLED=true` — enable/disable background loop
+- `NEWS_BACKGROUND_REFRESH_ON_START=true` — fetch immediately on startup (previously `false`; changed to `true` for faster first data)
+- `NEWS_REFRESH_INTERVAL_SECONDS=900` — polling interval between refresh cycles
+
+## Dashboard (updated)
+
+A browser-based monitoring dashboard is available at:
+```
+http://<your-host>:8506/dashboard/
+```
+
+**Views:**
+| View | What it shows |
+|---|---|
+| Health | Cluster/entity counts, freshness badge, topic doughnut, sentiment overview, feed activity |
+| Clusters | Filterable table — topic, sentiment, importance, entity chips, full-text search |
+| Sentiment | Time-series line chart (avg sentiment per configurable bucket) + cluster count overlay |
+| Entities | Top entities by mention frequency, bar chart, click-through to matching clusters |
+| Detail | Click any cluster row or paste a cluster_id — full drill-down with summary, key facts, articles, keywords, entities |
+
+**Tech stack:** Vanilla HTML/JS + Chart.js, served as static files from the same FastAPI process at `/dashboard`. No extra dependencies.
+
+The dashboard default lookback window follows `NEWS_DEFAULT_LOOKBACK_HOURS` (configured via `.env`, default: 144h).
+
+## REST API
+
+Five read-only JSON endpoints for programmatic access:
+
+| Method | Path | Params | Returns |
+|--------|------|--------|---------|
+| GET | `/api/v1/health` | — | stats, freshness, feed state, pruning config |
+| GET | `/api/v1/clusters` | `topic`, `hours`, `limit`, `offset` | paginated cluster list |
+| GET | `/api/v1/sentiment-series` | `topic`, `hours`, `bucket_hours` | time-series data for Chart.js |
+| GET | `/api/v1/entities` | `hours`, `limit` | top entities by mention count |
+| GET | `/api/v1/cluster/{id}` | — (path param) | full cluster detail |
+
+## Startup
+
+Uses FastAPI `lifespan` — **HTTP API available in <0.3s** regardless of feed/LLM latency. Background refresh + pruning are fire-and-forget coroutines.
+
+Key `.env` vars:
+- `NEWS_BACKGROUND_REFRESH_ON_START=true` — fetch immediately on boot
+- `NEWS_BACKGROUND_REFRESH_ENABLED=true` — enable/disable the background loop
+- `NEWS_REFRESH_INTERVAL_SECONDS=900` — polling interval

+ 9 - 1
killserver.sh

@@ -26,7 +26,7 @@ stop_pid() {
 }
 
 if [ -f "$PIDFILE" ]; then
-  PID=$(cat "$PIDFILE" 2>/dev/null || true)
+  PID="$(cat "$PIDFILE" 2>/dev/null || true)"
   stop_pid "$PID"
   rm -f "$PIDFILE"
 fi
@@ -49,3 +49,11 @@ while IFS= read -r line; do
     stop_pid "$pid"
   fi
 done < <(pgrep -af 'news_mcp\.mcp_server_fastmcp:app' || true)
+
+# Remove stale pidfiles that point at dead processes if any remain.
+if [ -f "$PIDFILE" ]; then
+  PID="$(cat "$PIDFILE" 2>/dev/null || true)"
+  if [ -z "$PID" ] || ! ps -p "$PID" > /dev/null 2>&1; then
+    rm -f "$PIDFILE"
+  fi
+fi

+ 188 - 43
news_mcp/dedup/cluster.py

@@ -1,15 +1,20 @@
 from __future__ import annotations
 
-from typing import Any, Dict, List, Tuple
-
-from news_mcp.sources.news_feeds import normalize_topic_from_title
-from news_mcp.dedup.embedding_support import CandidateRules, cluster_is_candidate, cosine_similarity, ollama_embed
-from news_mcp.config import NEWS_EMBEDDINGS_ENABLED, NEWS_EMBEDDING_SIMILARITY_THRESHOLD
-
+import hashlib
 import re
 from difflib import SequenceMatcher
+from typing import Any, Dict, List
 from urllib.parse import urlparse
 
+from news_mcp.config import NEWS_EMBEDDINGS_ENABLED, NEWS_EMBEDDING_SIMILARITY_THRESHOLD
+from news_mcp.dedup.embedding_support import cosine_similarity, ollama_embed
+from news_mcp.sources.news_feeds import normalize_topic_from_title
+
+
+# ---------------------------------------------------------------------------
+# Text helpers
+# ---------------------------------------------------------------------------
+
 
 def _normalize_title(title: str) -> str:
     t = title.lower().strip()
@@ -42,73 +47,206 @@ def _cluster_text(a: Dict[str, Any]) -> str:
     return "\n".join(p for p in parts if p).strip()
 
 
+# ---------------------------------------------------------------------------
+# Token / Jaccard signal (used as a fallback alongside title similarity when
+# embeddings are unavailable, and as a soft signal even when they are).
+# ---------------------------------------------------------------------------
+
+# Tiny stop-word set — we keep it small on purpose because the corpus is news
+# headlines, where every additional removal risks losing genuine signal.
+_STOPWORDS = frozenset(
+    {
+        "a", "an", "the", "of", "to", "in", "on", "at", "for", "by", "with",
+        "and", "or", "but", "if", "is", "are", "was", "were", "be", "been",
+        "being", "as", "from", "that", "this", "these", "those", "it", "its",
+        "into", "over", "under", "than", "then", "so", "such", "no", "not",
+        "do", "does", "did", "will", "would", "can", "could", "should", "may",
+        "might", "has", "have", "had", "after", "before", "amid", "vs", "via",
+        "us", "uk",
+    }
+)
+
+
+def _tokens(text: str) -> set[str]:
+    """Lowercase content tokens, stop-words removed, length>=3."""
+    tokens = re.findall(r"[a-z0-9][a-z0-9\-]+", text.lower())
+    return {t for t in tokens if len(t) >= 3 and t not in _STOPWORDS}
+
+
+def _jaccard(a: set, b: set) -> float:
+    if not a or not b:
+        return 0.0
+    inter = len(a & b)
+    if inter == 0:
+        return 0.0
+    return inter / len(a | b)
+
+
+# ---------------------------------------------------------------------------
+# Composite similarity
+# ---------------------------------------------------------------------------
+
+
+# Each signal has its own threshold. We accept a merge if ANY signal clears its
+# threshold, which makes clustering robust when one signal happens to be weak
+# (short headlines kill SequenceMatcher; single-word stories kill Jaccard;
+# Ollama outages kill cosine similarity).
+DEFAULT_TITLE_THRESHOLD = 0.87
+DEFAULT_JACCARD_THRESHOLD = 0.55
+
+
+def _signals(article: Dict[str, Any], cluster: Dict[str, Any]) -> dict:
+    """Per-pair similarity signals (title, jaccard, embedding cosine).
+
+    Embedding cosine is only computed when both sides have a vector; we never
+    block on a fresh Ollama request here — that's the caller's job, so this
+    function stays pure and easy to test.
+    """
+    a_title = str(article.get("title") or "")
+    c_title = str(cluster.get("headline") or "")
+
+    title_sim = _title_similarity(a_title, c_title) if a_title and c_title else 0.0
+
+    a_text = _cluster_text(article)
+    c_text_seed = (cluster.get("articles") or [{}])[0]
+    c_text = _cluster_text(c_text_seed) if c_text_seed else c_title
+    jaccard = _jaccard(_tokens(a_text), _tokens(c_text)) if a_text and c_text else 0.0
+
+    a_emb = article.get("_embedding")
+    c_emb = cluster.get("embedding")
+    cosine = cosine_similarity(a_emb, c_emb) if a_emb and c_emb else 0.0
+
+    return {"title": title_sim, "jaccard": jaccard, "cosine": cosine}
+
+
+def _is_match(signals: dict, *, embeddings_enabled: bool) -> tuple[bool, str, float]:
+    """Decide whether two items should merge based on the strongest signal.
+
+    Returns (matched, signal_name, signal_value). The signal_name lets callers
+    log *why* something merged, which is huge for debugging clustering quality.
+    """
+    cosine_threshold = NEWS_EMBEDDING_SIMILARITY_THRESHOLD
+    if embeddings_enabled and signals["cosine"] >= cosine_threshold:
+        return True, "cosine", signals["cosine"]
+    if signals["title"] >= DEFAULT_TITLE_THRESHOLD:
+        return True, "title", signals["title"]
+    if signals["jaccard"] >= DEFAULT_JACCARD_THRESHOLD:
+        return True, "jaccard", signals["jaccard"]
+    return False, "none", 0.0
+
+
+# ---------------------------------------------------------------------------
+# Public API
+# ---------------------------------------------------------------------------
+
+
 def dedup_and_cluster_articles(
     articles: List[Dict[str, Any]],
-    similarity_threshold: float = 0.87,
+    similarity_threshold: float | None = None,
 ) -> Dict[str, List[Dict[str, Any]]]:
-    """v1 dedup: fuzzy title similarity per topic.
+    """Deduplicate raw articles into clusters keyed by topic.
+
+    v1.1 strategy: composite similarity.
+      * title fuzzy ratio
+      * token Jaccard over headline+summary (cheap, surprisingly resilient
+        when titles are reworded heavily across outlets)
+      * Ollama embedding cosine when available
+
+    A pair merges if ANY signal clears its threshold. Falling back through
+    multiple signals means a transient Ollama outage doesn't collapse the
+    server back into title-only clustering, and a heavily-reworded headline
+    can still merge via Jaccard or embeddings.
 
-    Instead of strict hashing, we merge clusters whose normalized titles are
-    similar enough. This helps create richer clusters (multiple sources/articles)
-    and therefore better importance.
+    The ``similarity_threshold`` argument is kept for backward compatibility
+    with the test suite. When provided, it overrides the title threshold.
     """
 
+    title_threshold = similarity_threshold if similarity_threshold is not None else DEFAULT_TITLE_THRESHOLD
+
     by_topic: Dict[str, List[Dict[str, Any]]] = {}
-    embedding_cache: Dict[str, list[float]] = {}
+    embedding_cache: Dict[str, list[float] | None] = {}
 
     def _embedding_for_text(text: str) -> list[float] | None:
-        if not NEWS_EMBEDDINGS_ENABLED:
+        if not NEWS_EMBEDDINGS_ENABLED or not text:
             return None
         if text in embedding_cache:
             return embedding_cache[text]
         emb = ollama_embed(text)
-        if emb:
-            embedding_cache[text] = emb
+        # Cache None too so a single failure doesn't trigger repeated retries
+        # within one ingestion cycle. The next refresh call clears this map.
+        embedding_cache[text] = emb
         return emb
 
     for a in articles:
-        title = a["title"]
+        title = a.get("title") or ""
+        if not title:
+            continue
         topic = normalize_topic_from_title(title)
         article_text = _cluster_text(a)
         article_embedding = _embedding_for_text(article_text)
 
+        # Attach embedding on the article dict so _signals() can read it
+        # without re-computing.
+        a_with_emb = dict(a)
+        if article_embedding is not None:
+            a_with_emb["_embedding"] = article_embedding
+
         by_topic.setdefault(topic, [])
         clusters = by_topic[topic]
 
         best_idx: int | None = None
-        best_sim = 0.0
+        best_signal_name = "none"
+        best_signal_value = 0.0
         for idx, c in enumerate(clusters):
-            if NEWS_EMBEDDINGS_ENABLED:
-                if not cluster_is_candidate(a, c, rules=CandidateRules(require_topic_match=False), article_topic=topic):
-                    continue
-                cluster_text = _cluster_text(c.get("articles", [{}])[0]) if c.get("articles") else c.get("headline", "")
-                cluster_embedding = _embedding_for_text(cluster_text)
-                if article_embedding and cluster_embedding:
-                    sim = cosine_similarity(article_embedding, cluster_embedding)
-                else:
-                    sim = _title_similarity(title, c.get("headline", ""))
+            sigs = _signals(a_with_emb, c)
+            # Use the title threshold the caller explicitly passed (test override)
+            # but otherwise rely on the module defaults.
+            local_match = False
+            if NEWS_EMBEDDINGS_ENABLED and sigs["cosine"] >= NEWS_EMBEDDING_SIMILARITY_THRESHOLD:
+                local_match = True
+                signal_name, signal_value = "cosine", sigs["cosine"]
+            elif sigs["title"] >= title_threshold:
+                local_match = True
+                signal_name, signal_value = "title", sigs["title"]
+            elif sigs["jaccard"] >= DEFAULT_JACCARD_THRESHOLD:
+                local_match = True
+                signal_name, signal_value = "jaccard", sigs["jaccard"]
+            # Consensus rule: when no single signal clears its strict threshold
+            # but two of them are simultaneously "strong-ish", treat that as a
+            # match. This catches reworded headlines whose embedding is just
+            # below the strict cosine cutoff. Numbers are intentionally
+            # conservative — both signals must be clearly above noise.
+            elif (
+                NEWS_EMBEDDINGS_ENABLED
+                and sigs["cosine"] >= 0.80
+                and (sigs["jaccard"] >= 0.30 or sigs["title"] >= 0.55)
+            ):
+                local_match = True
+                signal_name = "consensus"
+                signal_value = (sigs["cosine"] + max(sigs["jaccard"], sigs["title"])) / 2.0
             else:
-                sim = _title_similarity(title, c.get("headline", ""))
-            if sim > best_sim:
-                best_sim = sim
-                best_idx = idx
+                signal_name, signal_value = "none", max(sigs["title"], sigs["jaccard"], sigs["cosine"])
 
-        threshold = similarity_threshold
-        if NEWS_EMBEDDINGS_ENABLED:
-            threshold = max(similarity_threshold, NEWS_EMBEDDING_SIMILARITY_THRESHOLD)
+            if local_match and signal_value > best_signal_value:
+                best_idx = idx
+                best_signal_name = signal_name
+                best_signal_value = signal_value
 
-        if best_idx is not None and best_sim >= threshold:
+        if best_idx is not None:
             c = clusters[best_idx]
             existing_keys = {_article_key(x) for x in c.get("articles", []) or []}
             if _article_key(a) not in existing_keys:
                 c["articles"].append(a)
-            if a["source"] not in c["sources"]:
+            if a.get("source") and a["source"] not in c["sources"]:
                 c["sources"].append(a["source"])
-            c["last_updated"] = max(str(c["last_updated"]), str(a["timestamp"]))
+            c["last_updated"] = max(str(c.get("last_updated", "")), str(a.get("timestamp", "")))
+            # Keep a tiny audit trail per cluster on which signal grew it last.
+            # Not surfaced through tools — lives in the payload only for debug.
+            c.setdefault("_merge_signals", []).append(
+                {"signal": best_signal_name, "value": round(best_signal_value, 3)}
+            )
         else:
-            # Stable-ish cluster id: based on topic + normalized canonical title.
-            import hashlib
-
+            # Stable cluster id: based on topic + normalized canonical title.
             key = f"{topic}|{_normalize_title(title)}"
             cid = hashlib.sha1(key.encode("utf-8")).hexdigest()
             cluster_embedding = article_embedding if NEWS_EMBEDDINGS_ENABLED else None
@@ -117,17 +255,24 @@ def dedup_and_cluster_articles(
                     "cluster_id": cid,
                     "headline": title,
                     "summary": a.get("summary", ""),
+                    "topic": topic,
                     "entities": [],
                     "sentiment": "neutral",
                     "importance": 0.0,
-                    "sources": [a["source"]],
-                    "timestamp": a["timestamp"],
+                    "sources": [a["source"]] if a.get("source") else [],
+                    "timestamp": a.get("timestamp"),
                     "articles": [a],
-                    "first_seen": a["timestamp"],
-                    "last_updated": a["timestamp"],
+                    "first_seen": a.get("timestamp"),
+                    "last_updated": a.get("timestamp"),
                     "embedding": cluster_embedding,
                     "embedding_model": "ollama:nomic-embed-text" if cluster_embedding else None,
                 }
             )
 
+    # Strip the internal merge audit trail before returning so it does not
+    # accidentally bloat the SQLite payload. Storage layer doesn't filter it.
+    for clusters in by_topic.values():
+        for c in clusters:
+            c.pop("_merge_signals", None)
+
     return {topic: clusters for topic, clusters in by_topic.items()}

+ 17 - 4
news_mcp/enrichment/llm_enrich.py

@@ -3,7 +3,7 @@ from __future__ import annotations
 from fnmatch import fnmatchcase
 from typing import Any, Dict
 
-from news_mcp.config import NEWS_ENTITY_BLACKLIST
+from news_mcp.config import NEWS_ENTITY_BLACKLIST, DEFAULT_TOPICS
 from news_mcp.entity_normalize import normalize_entities
 from news_mcp.llm import call_extraction, call_summary
 from news_mcp.trends_resolution import resolve_entity_via_trends
@@ -29,11 +29,24 @@ def _filter_entities(entities, blacklist=None):
 async def classify_cluster_llm(cluster: Dict[str, Any]) -> Dict[str, Any]:
     parsed = await call_extraction(cluster)
     out = dict(cluster)
-    topic = parsed.get("topic", cluster.get("topic"))
+
+    # Topic: prefer the LLM's classification, fall back to the heuristic topic
+    # already on the input cluster. Validate against the allowed set so we never
+    # promote a free-form string into the SQL row column.
+    raw_topic = parsed.get("topic", cluster.get("topic"))
+    topic = str(raw_topic).strip().lower() if raw_topic else None
     if topic and _matches_blacklist(topic):
         topic = "other"
-    entities = normalize_entities(_filter_entities(parsed.get("entities", [])))
-    keywords = normalize_entities(_filter_entities(parsed.get("keywords", [])))
+    if topic not in {t.lower() for t in DEFAULT_TOPICS}:
+        # Unknown / hallucinated label -> fall back to whatever the heuristic
+        # classifier on the headline gave us, else "other".
+        fallback = str(cluster.get("topic") or "").strip().lower()
+        topic = fallback if fallback in {t.lower() for t in DEFAULT_TOPICS} else "other"
+
+    # IMPORTANT: normalize aliases BEFORE applying the blacklist, otherwise
+    # blacklisting "bitcoin" misses entries the LLM returned as "btc".
+    entities = _filter_entities(normalize_entities(parsed.get("entities", [])))
+    keywords = _filter_entities(normalize_entities(parsed.get("keywords", [])))
 
     out.update({
         "topic": topic,

+ 22 - 3
news_mcp/jobs/poller.py

@@ -5,7 +5,7 @@ import logging
 from datetime import datetime, timezone
 from typing import Any, Dict
 
-from news_mcp.config import DEFAULT_LOOKBACK_HOURS, DB_PATH, NEWS_FEED_URL, NEWS_FEED_URLS
+from news_mcp.config import DEFAULT_LOOKBACK_HOURS, DEFAULT_TOPICS, DB_PATH, NEWS_FEED_URL, NEWS_FEED_URLS
 from news_mcp.dedup.cluster import dedup_and_cluster_articles
 from news_mcp.enrichment.enrich import enrich_cluster
 from news_mcp.enrichment.llm_enrich import classify_cluster_llm
@@ -74,6 +74,9 @@ async def refresh_clusters(topic: str | None = None, limit: int = 80) -> None:
 
         for idx, c in enumerate(clusters[:enrich_limit]):
             c2 = enrich_cluster(c)
+            # Seed the heuristic topic on the payload so classify_cluster_llm
+            # has a sane fallback if the LLM omits or hallucinates one.
+            c2.setdefault("topic", t)
 
             if _llm_enabled_for_topic:
                 # Cache: if we already have entities/sentiment for this cluster, skip LLM call.
@@ -98,6 +101,10 @@ async def refresh_clusters(topic: str | None = None, limit: int = 80) -> None:
                         c2["sentimentScore"] = existing.get("sentimentScore")
                     if existing.get("keywords"):
                         c2["keywords"] = existing.get("keywords")
+                    # Preserve a previously-classified topic so we don't drift back
+                    # to the heuristic on cache hits.
+                    if existing.get("topic"):
+                        c2["topic"] = existing.get("topic")
                 else:
                     try:
                         c2 = await classify_cluster_llm(c2)
@@ -108,8 +115,20 @@ async def refresh_clusters(topic: str | None = None, limit: int = 80) -> None:
 
             enriched.append(c2)
 
-        store.upsert_clusters(enriched, topic=t)
-        logger.info("refresh stored topic=%s clusters=%s", t, len(enriched))
+        # Persist clusters under their *post-enrichment* topic so the SQL row
+        # column matches what the LLM (or the validated heuristic fallback)
+        # actually decided. Previously, every cluster from this bucket was
+        # forced into the heuristic topic `t`, which caused a ~97% mismatch
+        # between row-column topic and payload topic.
+        by_final_topic: Dict[str, list] = {}
+        for c2 in enriched:
+            final_topic = str(c2.get("topic") or t or "other").strip().lower()
+            if final_topic not in {x.lower() for x in DEFAULT_TOPICS}:
+                final_topic = "other"
+            by_final_topic.setdefault(final_topic, []).append(c2)
+        for final_topic, group in by_final_topic.items():
+            store.upsert_clusters(group, topic=final_topic)
+            logger.info("refresh stored topic=%s clusters=%s (heuristic_topic=%s)", final_topic, len(group), t)
 
     prune_result = store.prune_if_due(
         pruning_enabled=NEWS_PRUNING_ENABLED,

+ 33 - 9
news_mcp/llm.py

@@ -1,5 +1,6 @@
 from __future__ import annotations
 
+import asyncio
 import json
 from pathlib import Path
 from typing import Any, Dict, Iterable, List
@@ -85,7 +86,7 @@ async def _call_openai(model: str, messages: List[Dict[str, str]], response_json
 OR_OPENROUTER_URL = "https://openrouter.ai/api/v1/chat/completions"
 
 
-async def _call_openrouter(model: str, messages: List[Dict[str, str]], response_json: bool = True) -> str:
+async def _call_openrouter(model: str, messages: List[Dict[str, str]], response_json: bool = True, retries: int = 2) -> str:
     if not OPENROUTER_API_KEY:
         raise LLMError("OPENROUTER_API_KEY is not configured")
     req = {"model": model, "messages": messages, "temperature": 0.2}
@@ -96,15 +97,38 @@ async def _call_openrouter(model: str, messages: List[Dict[str, str]], response_
         "HTTP-Referer": "https://github.com/gr1m0/bolt.new-rss",
         "X-Title": "news-mcp",
     }
+    last_err = ""
     async with httpx.AsyncClient(timeout=45.0) as client:
-        resp = await client.post(
-            OR_OPENROUTER_URL,
-            headers=headers,
-            json=req,
-        )
-        resp.raise_for_status()
-        data = resp.json()
-    return data["choices"][0]["message"]["content"]
+        for attempt in range(1 + retries):
+            resp = await client.post(
+                OR_OPENROUTER_URL,
+                headers=headers,
+                json=req,
+            )
+            if resp.status_code != 200:
+                last_err = f"HTTP {resp.status_code}: {resp.text[:300]}"
+                if resp.status_code in (429, 500, 502, 503):
+                    await asyncio.sleep(2 ** attempt)
+                    continue
+                resp.raise_for_status()
+            data = resp.json()
+            if "error" in data:
+                last_err = f"API error: {data['error']}"
+                break
+            choices = data.get("choices", [])
+            if not choices:
+                last_err = f"No choices in response: {str(data)[:300]}"
+                if attempt < retries:
+                    await asyncio.sleep(2 ** attempt)
+                    continue
+                break
+            msg = choices[0].get("message", {})
+            content = msg.get("content")
+            if content:
+                return content
+            last_err = f"Empty content in choice: {str(msg)[:200]}"
+            break
+    raise LLMError(f"OpenRouter failed after {1+retries} attempts: {last_err}")
 
 
 async def call_llm(provider: str, model: str, system_prompt: str, user_prompt: str) -> str:

+ 132 - 21
news_mcp/mcp_server_fastmcp.py

@@ -21,6 +21,7 @@ from news_mcp.config import (
 )
 from news_mcp.jobs.poller import refresh_clusters
 from news_mcp.storage.sqlite_store import SQLiteClusterStore
+from news_mcp.dashboard.dashboard_store import DashboardStore
 from news_mcp.enrichment.llm_enrich import summarize_cluster_llm
 from news_mcp.trends_resolution import resolve_entity_via_trends
 from news_mcp.llm import active_llm_config
@@ -666,35 +667,53 @@ def _parse_timeframe_to_hours(timeframe: str) -> int:
     except Exception:
         return 24
 
+from contextlib import asynccontextmanager
 
-app = FastAPI(title="News MCP Server")
+@asynccontextmanager
+async def _lifespan(app: FastAPI):
+    asyncio.ensure_future(_background_refresh_loop())
+    yield
+
+
+app = FastAPI(title="News MCP Server", lifespan=_lifespan)
 
 logger = logging.getLogger("news_mcp.startup")
 
 app.mount("/mcp", mcp.sse_app())
 
 
-_background_task_started = False
+# Shared store — single connection pool
+_shared_store = SQLiteClusterStore(DB_PATH)
+_refresh_lock = asyncio.Lock()
+_refresh_started = False
 
 
-@app.on_event("startup")
-async def _start_background_refresh():
-    global _background_task_started
-    if _background_task_started:
-        return
-    if not NEWS_BACKGROUND_REFRESH_ENABLED:
-        return
-    _background_task_started = True
+async def _background_refresh_loop():
+    """Non-blocking background refresher: prune then poll.
+
+    Protected by an async lock so a second event-loop wake-up cannot
+    start a parallel ingestion cycle.
+    """
+    global _refresh_started
+    async with _refresh_lock:
+        if _refresh_started:
+            return
+        _refresh_started = True
+
     logger.info("news-mcp llm config: %s", active_llm_config())
 
-    store = SQLiteClusterStore(DB_PATH)
-    prune_result = store.prune_if_due(
-        pruning_enabled=NEWS_PRUNING_ENABLED,
-        retention_days=NEWS_RETENTION_DAYS,
-        interval_hours=NEWS_PRUNE_INTERVAL_HOURS,
+    # Prune off-thread so we do not block the event loop
+    prune_result = await asyncio.to_thread(
+        _shared_store.prune_if_due,
+        NEWS_PRUNING_ENABLED,
+        NEWS_RETENTION_DAYS,
+        NEWS_PRUNE_INTERVAL_HOURS,
     )
     logger.info("startup prune_result=%s", prune_result)
 
+    if not NEWS_BACKGROUND_REFRESH_ENABLED:
+        return
+
     async def _loop():
         if not NEWS_BACKGROUND_REFRESH_ON_START:
             logger.info("background refresh delayed start interval_seconds=%s", NEWS_REFRESH_INTERVAL_SECONDS)
@@ -702,11 +721,9 @@ async def _start_background_refresh():
         while True:
             try:
                 logger.info("background refresh tick start")
-                # Refresh all topics by passing topic=None
                 await refresh_clusters(topic=None, limit=200)
                 logger.info("background refresh tick complete")
             except Exception:
-                # Keep the server alive, but do not hide the failure.
                 logger.exception("background refresh tick failed")
             await asyncio.sleep(float(NEWS_REFRESH_INTERVAL_SECONDS))
 
@@ -743,16 +760,110 @@ def root():
     }
 
 
+# ------------------------------------------------------------------
+# Dashboard REST API endpoints
+# ------------------------------------------------------------------
+from fastapi.staticfiles import StaticFiles
+from fastapi.responses import JSONResponse
+
+app.mount("/dashboard", StaticFiles(directory="dashboard", html=True), name="dashboard")
+
+import logging as _log
+API_LOG = _log.getLogger("news_mcp.api")
+
+def _api_ok(data: dict) -> dict:
+    return data
+
+def _api_err(exc: Exception, ctx: str) -> JSONResponse:
+    API_LOG.exception(f"API error in {ctx}")
+    return JSONResponse(status_code=500, content={"error": str(exc), "ctx": ctx})
+
+
+@app.get("/api/v1/health")
+def api_health():
+    """Extended health + dashboard stats."""
+    try:
+        store = DashboardStore(_shared_store)
+        return store.get_dashboard_stats()
+    except Exception as e:
+        return _api_err(e, "health")
+
+@app.get("/api/v1/clusters")
+def api_clusters(
+    topic: str | None = None,
+    hours: int = 24,
+    limit: int = 50,
+    offset: int = 0,
+):
+    """Paginated cluster listing."""
+    try:
+        store = DashboardStore(_shared_store)
+        clusters = store.get_clusters_page(topic=topic, hours=hours, limit=limit, offset=offset)
+        with store._store._conn() as conn:
+            if topic and topic != "all":
+                count_row = conn.execute(
+                    "SELECT COUNT(*) FROM clusters WHERE updated_at >= datetime('now', ? || ' hours') AND topic = ?",
+                    (-hours, topic),
+                ).fetchone()
+            else:
+                count_row = conn.execute(
+                    "SELECT COUNT(*) FROM clusters WHERE updated_at >= datetime('now', ? || ' hours')",
+                    (-hours,),
+                ).fetchone()
+            total = count_row[0] if count_row else 0
+        return {"clusters": clusters, "total": total, "topic": topic or "all", "hours": hours}
+    except Exception as e:
+        return _api_err(e, f"clusters(topic={topic},hours={hours})")
+
+@app.get("/api/v1/sentiment-series")
+def api_sentiment_series(
+    topic: str | None = None,
+    hours: int = 24,
+    bucket_hours: float = 1.0,
+):
+    """Sentiment time-series for Chart.js."""
+    try:
+        store = DashboardStore(_shared_store)
+        series = store.get_sentiment_series(topic=topic, hours=hours, bucket_hours=bucket_hours)
+        return {"series": series, "topic": topic or "all"}
+    except Exception as e:
+        return _api_err(e, f"sentiment(topic={topic})")
+
+@app.get("/api/v1/entities")
+def api_entities(
+    hours: int = 24,
+    limit: int = 30,
+):
+    """Top entity frequencies."""
+    try:
+        store = DashboardStore(_shared_store)
+        entities = store.get_entity_frequencies(hours=hours, limit=limit)
+        return {"entities": entities, "hours": hours}
+    except Exception as e:
+        return _api_err(e, f"entities(hours={hours})")
+
+@app.get("/api/v1/cluster/{cluster_id}")
+def api_cluster_detail(cluster_id: str):
+    """Full cluster detail for drill-down."""
+    try:
+        store = DashboardStore(_shared_store)
+        detail = store.get_cluster_detail(cluster_id)
+        if not detail:
+            return JSONResponse(status_code=404, content={"error": "Cluster not found", "id": cluster_id})
+        return detail
+    except Exception as e:
+        return _api_err(e, f"detail({cluster_id})")
+
+
 @app.get("/health")
 def health():
-    store = SQLiteClusterStore(DB_PATH)
     return {
         "status": "ok",
         "lookback_hours": DEFAULT_LOOKBACK_HOURS,
         "db": str(DB_PATH),
-        "last_refresh_at": store.get_meta("last_refresh_at"),
-        "refresh": store.get_feed_state("breakingthenews"),
-        "pruning": store.get_prune_state(
+        "last_refresh_at": _shared_store.get_meta("last_refresh_at"),
+        "feeds": _shared_store.get_all_feed_states(),
+        "pruning": _shared_store.get_prune_state(
             pruning_enabled=NEWS_PRUNING_ENABLED,
             retention_days=NEWS_RETENTION_DAYS,
             interval_hours=NEWS_PRUNE_INTERVAL_HOURS,

+ 212 - 0
news_mcp/storage/sqlite_store.py

@@ -9,6 +9,11 @@ from typing import Any
 from urllib.parse import urlparse
 from email.utils import parsedate_to_datetime
 
+from news_mcp.config import (
+    NEWS_PRUNE_INTERVAL_HOURS,
+    NEWS_PRUNING_ENABLED,
+    NEWS_RETENTION_DAYS,
+)
 from news_mcp.entity_normalize import normalize_entities
 from news_mcp.trends_resolution import resolve_entity_via_trends
 
@@ -349,6 +354,22 @@ class SQLiteClusterStore:
                 return None
             return {"last_hash": row[0], "updated_at": row[1]}
 
+    def get_all_feed_states(self) -> list[dict[str, Any]]:
+        """All feed_state rows.
+
+        The live writer keys feed state as ``newsfeeds:<sha1(comma_joined_urls)>``,
+        so a hardcoded literal lookup never matches when more than one feed is
+        configured. Use this for surfacing health information.
+        """
+        with self._conn() as conn:
+            cur = conn.execute(
+                "SELECT feed_key, last_hash, updated_at FROM feed_state ORDER BY updated_at DESC"
+            )
+            return [
+                {"feed_key": row[0], "last_hash": row[1], "updated_at": row[2]}
+                for row in cur.fetchall()
+            ]
+
     def get_meta(self, key: str) -> str | None:
         with self._conn() as conn:
             cur = conn.execute("SELECT value FROM meta WHERE key=?", (key,))
@@ -512,3 +533,194 @@ class SQLiteClusterStore:
             "interval_hours": float(interval_hours),
             "last_prune_at": self.get_meta(META_LAST_PRUNE_AT),
         }
+
+    # ------------------------------------------------------------------
+    # Dashboard query helpers
+    # ------------------------------------------------------------------
+
+    def get_dashboard_stats(self) -> dict[str, Any]:
+        """Aggregate status numbers for the health panel."""
+        with self._conn() as conn:
+            total_clusters = conn.execute("SELECT COUNT(*) FROM clusters").fetchone()[0]
+            total_entities = conn.execute("SELECT COUNT(*) FROM entity_metadata").fetchone()[0]
+            topic_counts = dict(conn.execute(
+                "SELECT topic, COUNT(*) FROM clusters GROUP BY topic"
+            ).fetchall())
+            last_refresh = self.get_meta("last_refresh_at")
+            feeds = {}
+            for row in conn.execute("SELECT feed_key, last_hash, updated_at FROM feed_state"):
+                feeds[row[0]] = {"last_hash": row[1], "updated_at": row[2]}
+            last_prune = self.get_meta(META_LAST_PRUNE_AT)
+            prune_state = self.get_prune_state(
+                pruning_enabled=NEWS_PRUNING_ENABLED,
+                retention_days=NEWS_RETENTION_DAYS,
+                interval_hours=NEWS_PRUNE_INTERVAL_HOURS,
+            )
+            return {
+                "total_clusters": total_clusters,
+                "total_entities": total_entities,
+                "clusters_by_topic": topic_counts,
+                "last_refresh_at": last_refresh,
+                "last_prune_at": last_prune,
+                "prune_state": prune_state,
+                "feeds": feeds,
+            }
+
+    def get_clusters_page(
+        self,
+        topic: str | None = None,
+        hours: float = 24,
+        limit: int = 20,
+        offset: int = 0,
+    ) -> list[dict[str, Any]]:
+        """Paginated cluster listing for the dashboard."""
+        cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat()
+        query = "SELECT payload FROM clusters WHERE updated_at >= ?"
+        params: list = [cutoff]
+        if topic and topic != "all":
+            query += " AND topic = ?"
+            params.append(topic)
+        query += " ORDER BY updated_at DESC LIMIT ? OFFSET ?"
+        params.extend([limit, offset])
+        with self._conn() as conn:
+            cur = conn.execute(query, params)
+            rows = cur.fetchall()
+        clusters: list[dict[str, Any]] = []
+        for (payload_text,) in rows:
+            c = json.loads(payload_text)
+            clusters.append({
+                "cluster_id": c.get("cluster_id", ""),
+                "headline": c.get("headline", ""),
+                "topic": c.get("topic", ""),
+                "sentiment": c.get("sentiment", "neutral"),
+                "sentimentScore": c.get("sentimentScore"),
+                "importance": c.get("importance", 0),
+                "entities": c.get("entities", []),
+                "sources": c.get("sources", []),
+                "timestamp": c.get("timestamp", ""),
+                "keywords": c.get("keywords", []),
+                "article_count": len(c.get("articles", [])),
+            })
+        return clusters
+
+    def get_sentiment_series(
+        self,
+        topic: str | None = None,
+        hours: float = 24,
+        bucket_hours: float = 1,
+    ) -> list[dict[str, Any]]:
+        """Sentiment score averaged per time bucket."""
+        cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat()
+        query = "SELECT payload FROM clusters WHERE updated_at >= ?"
+        params: list = [cutoff]
+        if topic and topic != "all":
+            query += " AND topic = ?"
+            params.append(topic)
+        query += " ORDER BY updated_at ASC"
+        with self._conn() as conn:
+            cur = conn.execute(query, params)
+            rows = cur.fetchall()
+
+        def _parse_ts(ts: Any) -> datetime | None:
+            if not ts:
+                return None
+            try:
+                dt = datetime.fromisoformat(str(ts).replace("Z", "+00:00"))
+                if dt.tzinfo is None:
+                    dt = dt.replace(tzinfo=timezone.utc)
+                return dt.astimezone(timezone.utc)
+            except Exception:
+                return None
+
+        buckets: dict[datetime, list[float]] = {}
+        for (payload_text,) in rows:
+            c = json.loads(payload_text)
+            dt = _parse_ts(c.get("timestamp"))
+            score = c.get("sentimentScore")
+            if dt is None or score is None:
+                continue
+            bucket_key = dt.replace(minute=0, second=0, microsecond=0)
+            if bucket_hours > 1:
+                bucket_key = bucket_key.replace(
+                    hour=(bucket_key.hour // int(bucket_hours)) * int(bucket_hours)
+                )
+            buckets.setdefault(bucket_key, []).append(float(score))
+
+        series: list[dict[str, Any]] = []
+        for bucket_key in sorted(buckets):
+            scores = buckets[bucket_key]
+            series.append({
+                "time": bucket_key.isoformat(),
+                "avg_sentiment": round(sum(scores) / len(scores), 3),
+                "count": len(scores),
+                "min": round(min(scores), 3),
+                "max": round(max(scores), 3),
+            })
+        return series
+
+    def get_entity_frequencies(
+        self,
+        hours: float = 24,
+        limit: int = 30,
+    ) -> list[dict[str, Any]]:
+        """Top entities by mention count in recent clusters."""
+        cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat()
+        with self._conn() as conn:
+            cur = conn.execute(
+                "SELECT payload FROM clusters WHERE updated_at >= ? ORDER BY updated_at DESC LIMIT 500",
+                (cutoff,),
+            )
+            rows = cur.fetchall()
+        counter: dict[str, int] = {}
+        for (payload_text,) in rows:
+            c = json.loads(payload_text)
+            for ent in c.get("entities", []):
+                counter[ent] = counter.get(ent, 0) + 1
+        sorted_entities = sorted(counter.items(), key=lambda x: -x[1])[:limit]
+        result: list[dict[str, Any]] = []
+        for label, count in sorted_entities:
+            meta = self.get_entity_metadata(label)
+            result.append({
+                "label": label,
+                "count": count,
+                "canonical_label": meta["canonical_label"] if meta else label,
+                "mid": meta["mid"] if meta else None,
+            })
+        return result
+
+    def get_cluster_detail(self, cluster_id: str) -> dict[str, Any] | None:
+        """Dashboard-optimized cluster detail fetch."""
+        with self._conn() as conn:
+            cur = conn.execute(
+                "SELECT payload FROM clusters WHERE cluster_id = ?", (cluster_id,)
+            )
+            row = cur.fetchone()
+            if not row:
+                return None
+            c = json.loads(row[0])
+            summary = None
+            if c.get("summary_payload"):
+                try:
+                    summary = json.loads(c["summary_payload"])
+                except Exception:
+                    pass
+            return {
+                "cluster_id": c.get("cluster_id"),
+                "headline": c.get("headline", ""),
+                "summary": c.get("summary", ""),
+                "topic": c.get("topic", ""),
+                "sentiment": c.get("sentiment", "neutral"),
+                "sentimentScore": c.get("sentimentScore"),
+                "importance": c.get("importance", 0),
+                "entities": c.get("entities", []),
+                "entityResolutions": c.get("entityResolutions", []),
+                "keywords": c.get("keywords", []),
+                "sources": c.get("sources", []),
+                "timestamp": c.get("timestamp", ""),
+                "first_seen": c.get("first_seen", ""),
+                "last_updated": c.get("last_updated", ""),
+                "article_count": len(c.get("articles", [])),
+                "articles": c.get("articles", []),
+                "summary_text": summary.get("mergedSummary", "") if summary else "",
+                "key_facts": summary.get("keyFacts", []) if summary else [],
+            }

+ 17 - 4
run.sh

@@ -3,15 +3,28 @@ set -euo pipefail
 
 PORT=${PORT:-8506}
 APP_MODULE=${APP_MODULE:-news_mcp.mcp_server_fastmcp:app}
-LOGFILE=${LOGFILE:-logs/uvicorn.log}
+LOGFILE=${LOGFILE:-logs/server.log}
 PIDFILE=${PIDFILE:-logs/server.pid}
 
 mkdir -p "$(dirname "$LOGFILE")"
 mkdir -p "$(dirname "$PIDFILE")"
 
-if [ -f "$PIDFILE" ] && ps -p "$(cat "$PIDFILE" 2>/dev/null)" > /dev/null 2>&1; then
-  echo "Server already running (PID $(cat "$PIDFILE"))"
-  exit 0
+if [ -f "$PIDFILE" ]; then
+  PID="$(cat "$PIDFILE" 2>/dev/null || true)"
+  if [ -n "$PID" ] && ps -p "$PID" > /dev/null 2>&1; then
+    echo "Server already running (PID $PID)"
+    exit 0
+  fi
+  rm -f "$PIDFILE"
+fi
+
+# If another process already binds the configured port, fail fast rather than
+# launching a second server and hiding the collision.
+if command -v curl >/dev/null 2>&1; then
+  if curl -sf "http://127.0.0.1:${PORT}/health" >/dev/null 2>&1; then
+    echo "A server is already responding on port $PORT; refusing to start a second instance"
+    exit 1
+  fi
 fi
 
 UVICORN_BIN="${UVICORN_BIN:-}"

+ 247 - 0
test_news_mcp.py

@@ -412,3 +412,250 @@ def test_importance_prefers_llm_signal():
 
     assert imp_pos >= imp_base
     assert imp_neg >= imp_base
+
+
+# ---------------------------------------------------------------------------
+# Regression tests for the May 2026 correctness pass
+# ---------------------------------------------------------------------------
+
+
+def test_classify_cluster_llm_uses_llm_topic_and_drops_invalid_ones(monkeypatch):
+    """The LLM-extracted topic must propagate to the returned cluster, but
+    free-form / hallucinated topic strings must be coerced into the allowed
+    set so they never reach the SQL row column verbatim."""
+    import asyncio
+
+    from news_mcp.enrichment import llm_enrich
+
+    async def fake_extraction(cluster):
+        return {
+            "topic": "regulation",
+            "entities": ["SEC"],
+            "sentiment": "neutral",
+            "sentimentScore": 0.0,
+            "keywords": ["enforcement"],
+        }
+
+    monkeypatch.setattr(llm_enrich, "call_extraction", fake_extraction)
+    monkeypatch.setattr(llm_enrich, "resolve_entity_via_trends", lambda e: {"normalized": e, "canonical_label": e, "mid": None})
+
+    cluster = {"cluster_id": "x", "headline": "SEC fines firm", "summary": "...", "topic": "other"}
+    out = asyncio.run(llm_enrich.classify_cluster_llm(cluster))
+    assert out["topic"] == "regulation"
+
+    # Hallucinated topic is rejected; we fall back to the input cluster's
+    # heuristic topic when it is one of the allowed ones.
+    async def fake_extraction_garbage(cluster):
+        return {
+            "topic": "geopolitics-and-stuff",
+            "entities": ["NATO"],
+            "sentiment": "neutral",
+            "sentimentScore": 0.0,
+            "keywords": [],
+        }
+
+    monkeypatch.setattr(llm_enrich, "call_extraction", fake_extraction_garbage)
+    cluster = {"cluster_id": "y", "headline": "NATO meets", "summary": "...", "topic": "macro"}
+    out = asyncio.run(llm_enrich.classify_cluster_llm(cluster))
+    assert out["topic"] == "macro"  # heuristic fallback
+
+    # When neither the LLM nor the heuristic gives a valid label -> "other".
+    cluster = {"cluster_id": "z", "headline": "...", "summary": "...", "topic": "geopolitics-bucket"}
+    out = asyncio.run(llm_enrich.classify_cluster_llm(cluster))
+    assert out["topic"] == "other"
+
+
+def test_classify_cluster_llm_normalizes_aliases_before_blacklist(monkeypatch):
+    """Regression: previously ``_filter_entities`` ran before
+    ``normalize_entities``, so blacklisting "bitcoin" missed entries the LLM
+    returned as the alias "btc". Order is now normalize -> blacklist."""
+    import asyncio
+
+    from news_mcp.enrichment import llm_enrich
+
+    async def fake_extraction(cluster):
+        return {
+            "topic": "crypto",
+            "entities": ["btc", "Reuters"],
+            "sentiment": "neutral",
+            "sentimentScore": 0.0,
+            "keywords": ["btc rally", "Reuters"],
+        }
+
+    monkeypatch.setattr(llm_enrich, "call_extraction", fake_extraction)
+    monkeypatch.setattr(llm_enrich, "resolve_entity_via_trends", lambda e: {"normalized": e, "canonical_label": e, "mid": None})
+    monkeypatch.setattr(llm_enrich, "NEWS_ENTITY_BLACKLIST", ["bitcoin"])
+
+    cluster = {"cluster_id": "x", "headline": "BTC up", "summary": "...", "topic": "crypto"}
+    out = asyncio.run(llm_enrich.classify_cluster_llm(cluster))
+
+    # "btc" became "Bitcoin" via aliasing, then was filtered out by the
+    # blacklist. "Reuters" survives (not blacklisted in this test).
+    assert "Bitcoin" not in out["entities"]
+    assert "btc" not in [e.lower() for e in out["entities"]]
+    assert "Reuters" in out["entities"]
+
+
+def test_dedup_uses_jaccard_when_titles_diverge():
+    """Composite similarity: even with embeddings off, two articles whose
+    titles share only some tokens should still merge if their content (token
+    overlap) is high enough."""
+    from news_mcp.dedup import cluster as dc
+
+    # Titles differ heavily; bodies overlap heavily -> Jaccard should catch.
+    articles = [
+        {
+            "title": "Iran tension rises",
+            "url": "https://example.com/a",
+            "source": "A",
+            "timestamp": "Mon, 30 Mar 2026 12:00:00 GMT",
+            "summary": "Trump warns Iran war could spread across the Middle East amid rising tensions.",
+        },
+        {
+            "title": "Trump issues stark warning over Tehran",
+            "url": "https://example.com/b",
+            "source": "B",
+            "timestamp": "Mon, 30 Mar 2026 12:00:00 GMT",
+            "summary": "Trump warns Iran war could spread across the Middle East amid rising tensions.",
+        },
+    ]
+    clustered = dc.dedup_and_cluster_articles(articles)
+    total = sum(len(v) for v in clustered.values())
+    assert total == 1, f"Expected 1 merged cluster via Jaccard signal, got {total}"
+
+
+def test_dedup_does_not_merge_unrelated_articles():
+    """Negative control: cluster is robust against false-positives even with
+    the more permissive multi-signal merging."""
+    from news_mcp.dedup import cluster as dc
+
+    articles = [
+        {
+            "title": "Bitcoin hits new high",
+            "url": "https://example.com/a",
+            "source": "A",
+            "timestamp": "Mon, 30 Mar 2026 12:00:00 GMT",
+            "summary": "Bitcoin reached a record high amid rising demand.",
+        },
+        {
+            "title": "Local sports team wins",
+            "url": "https://example.com/b",
+            "source": "B",
+            "timestamp": "Mon, 30 Mar 2026 12:00:00 GMT",
+            "summary": "The local team won the regional championship.",
+        },
+    ]
+    clustered = dc.dedup_and_cluster_articles(articles)
+    total = sum(len(v) for v in clustered.values())
+    assert total == 2
+
+
+def test_get_all_feed_states_returns_all_rows():
+    """Health endpoint regression: the writer keys feed state with a hashed
+    multi-feed key, so the old hardcoded ``get_feed_state("breakingthenews")``
+    always returned None. Verify the bulk getter works."""
+    import tempfile
+    from pathlib import Path
+
+    with tempfile.TemporaryDirectory() as td:
+        db = Path(td) / "news.sqlite"
+        store = SQLiteClusterStore(db)
+        store.set_feed_hash("newsfeeds:abc123", "hash1")
+        store.set_feed_hash("newsfeeds:def456", "hash2")
+        all_states = store.get_all_feed_states()
+        assert len(all_states) == 2
+        keys = {s["feed_key"] for s in all_states}
+        assert keys == {"newsfeeds:abc123", "newsfeeds:def456"}
+
+
+def test_poller_persists_clusters_under_post_enrichment_topic(monkeypatch):
+    """Regression: the SQL row-column ``topic`` previously locked in the
+    headline-heuristic value (which is ``other`` for most stories) and ignored
+    the LLM's classification stored in the payload. Verify the upsert now uses
+    the post-enrichment topic so SQL filtering and dashboard groupings see the
+    real classification."""
+    import asyncio
+
+    import news_mcp.jobs.poller as poller
+
+    captured = {"upserts": []}
+
+    class DummyStore:
+        def __init__(self, *args, **kwargs):
+            pass
+
+        def get_feed_hash(self, feed_key):
+            return None
+
+        def set_feed_hash(self, feed_key, last_hash):
+            pass
+
+        def get_cluster_by_id(self, cluster_id):
+            return None
+
+        def upsert_clusters(self, clusters, topic):
+            # Capture the topic the poller chose for each cluster.
+            for c in clusters:
+                captured["upserts"].append({"row_topic": topic, "payload_topic": c.get("topic"), "cluster_id": c.get("cluster_id")})
+
+        def prune_if_due(self, **kwargs):
+            return {"deleted": 0}
+
+        def set_meta(self, key, value):
+            pass
+
+    async def fake_to_thread(fn, limit):
+        return [
+            {"title": "SEC fines firm", "url": "https://example.com/a", "source": "S", "timestamp": "Wed, 01 Apr 2026 12:00:00 GMT", "summary": "..."},
+        ]
+
+    def fake_cluster(articles):
+        # Heuristic put it in "other" (no crypto/macro/regulation/ai keywords
+        # in the title for the heuristic matcher — title above does have
+        # "law"-adjacent words but not the specific tokens it matches).
+        return {
+            "other": [
+                {
+                    "cluster_id": "cid",
+                    "headline": "SEC fines firm",
+                    "summary": "...",
+                    "topic": "other",
+                    "entities": [],
+                    "sentiment": "neutral",
+                    "importance": 0.0,
+                    "sources": ["S"],
+                    "timestamp": "Wed, 01 Apr 2026 12:00:00 GMT",
+                    "articles": [],
+                }
+            ]
+        }
+
+    def fake_enrich(cluster):
+        return cluster
+
+    async def fake_classify(cluster):
+        # The LLM thinks it's regulation -> the SQL row column must reflect that.
+        out = dict(cluster)
+        out["topic"] = "regulation"
+        out["entities"] = ["SEC"]
+        out["entityResolutions"] = []
+        out["sentiment"] = "neutral"
+        out["sentimentScore"] = 0.0
+        out["keywords"] = []
+        return out
+
+    monkeypatch.setattr(poller, "SQLiteClusterStore", DummyStore)
+    monkeypatch.setattr(poller, "fetch_news_articles", lambda limit: [])
+    monkeypatch.setattr(poller.asyncio, "to_thread", fake_to_thread)
+    monkeypatch.setattr(poller, "dedup_and_cluster_articles", fake_cluster)
+    monkeypatch.setattr(poller, "enrich_cluster", fake_enrich)
+    monkeypatch.setattr(poller, "classify_cluster_llm", fake_classify)
+
+    asyncio.run(poller.refresh_clusters(topic=None, limit=10))
+
+    assert captured["upserts"], "Expected at least one upsert call"
+    upsert = captured["upserts"][0]
+    assert upsert["row_topic"] == "regulation", (
+        f"Expected SQL row topic to follow the LLM's classification 'regulation', got {upsert['row_topic']!r}"
+    )
+    assert upsert["payload_topic"] == "regulation"