from __future__ import annotations import asyncio import logging from collections import Counter from datetime import datetime, timezone from email.utils import parsedate_to_datetime from fastapi import FastAPI from mcp.server.fastmcp import FastMCP from mcp.server.transport_security import TransportSecuritySettings from news_mcp.config import DEFAULT_LOOKBACK_HOURS, DEFAULT_TOPICS, DB_PATH from news_mcp.config import ( NEWS_PRUNE_INTERVAL_HOURS, NEWS_PRUNING_ENABLED, NEWS_REFRESH_INTERVAL_SECONDS, NEWS_BACKGROUND_REFRESH_ENABLED, NEWS_BACKGROUND_REFRESH_ON_START, NEWS_RETENTION_DAYS, ) from news_mcp.jobs.poller import refresh_clusters from news_mcp.storage.sqlite_store import SQLiteClusterStore from news_mcp.dashboard.dashboard_store import DashboardStore from news_mcp.enrichment.llm_enrich import summarize_cluster_llm from news_mcp.trends_resolution import resolve_entity_via_trends from news_mcp.llm import active_llm_config from news_mcp.entity_normalize import normalize_query from news_mcp.related_entities import related_recent_entities logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s", ) mcp = FastMCP( "news-mcp", transport_security=TransportSecuritySettings(enable_dns_rebinding_protection=False), ) def _cluster_entity_haystack(cluster: dict) -> list[str]: """Collect the normalized entity clues attached to a cluster.""" values: list[str] = [] for ent in cluster.get("entities", []) or []: values.append(str(ent).strip().lower()) for res in cluster.get("entityResolutions", []) or []: if not isinstance(res, dict): continue for key in ("normalized", "canonical_label", "mid"): val = res.get(key) if val: values.append(str(val).strip().lower()) return [v for v in values if v] def _parse_cluster_timestamp(value) -> datetime: if not value: return datetime.min.replace(tzinfo=timezone.utc) text = str(value).strip() if not text: return datetime.min.replace(tzinfo=timezone.utc) try: dt = datetime.fromisoformat(text.replace("Z", "+00:00")) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) return dt.astimezone(timezone.utc) except Exception: pass try: dt = parsedate_to_datetime(text) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) return dt.astimezone(timezone.utc) except Exception: return datetime.min.replace(tzinfo=timezone.utc) def _sort_clusters_by_recency(clusters: list[dict]) -> list[dict]: return sorted( clusters, key=lambda c: ( _parse_cluster_timestamp(c.get("timestamp")), float(c.get("importance", 0.0) or 0.0), ), reverse=True, ) def _tool_card(name: str, description: str, inputs: list[dict], outputs: list[str], notes: list[str] | None = None) -> dict: return { "name": name, "description": description, "inputs": inputs, "outputs": outputs, "notes": notes or [], } NEWS_TOOL_CARDS = [ _tool_card( "get_latest_events", "Get the newest deduplicated clusters for a topic or resolved entity-like query.", [ {"name": "topic", "type": "string", "default": "crypto", "meaning": "coarse category or entity-like topic"}, {"name": "limit", "type": "integer", "default": 5, "range": "1-20"}, {"name": "include_articles", "type": "boolean", "default": False}, ], ["headline", "summary", "entities", "sentiment", "importance", "sources", "timestamp", "articles?"], ["Use when you want the freshest clusters and are willing to let the server decide topic vs entity mode."], ), _tool_card( "get_events_for_entity", "Search recent clusters for a person, place, company, or theme by entity matching.", [ {"name": "entity", "type": "string", "meaning": "entity label or phrase"}, {"name": "timeframe", "type": "string", "default": "24h", "examples": ["24h", "72h", "3d"]}, {"name": "limit", "type": "integer", "default": 10, "range": "1-30"}, {"name": "include_articles", "type": "boolean", "default": False}, ], ["headline", "summary", "entities", "sentiment", "importance", "sources", "timestamp", "articles?"], ["Normalization is automatic; use this for an entity-centered deep dive."], ), _tool_card( "get_event_summary", "Produce a concise LLM-written explanation for one cluster and key facts.", [ {"name": "event_id", "type": "string", "meaning": "cluster_id; do not surface in user-facing prose"}, {"name": "include_articles", "type": "boolean", "default": False}, ], ["headline", "mergedSummary", "keyFacts", "sources", "articles?"], ["Prefer this after you have already chosen a specific cluster to explain."], ), _tool_card( "detect_emerging_topics", "Surface entities and phrases starting to matter in the recent window.", [{"name": "limit", "type": "integer", "default": 10, "range": "1-20"}], ["topic", "trend_score", "related_entities", "signal_type", "count", "avg_importance"], ["Good for 'what is heating up?' style questions."], ), _tool_card( "get_news_sentiment", "Estimate sentiment around an entity over a lookback window.", [ {"name": "entity", "type": "string"}, {"name": "timeframe", "type": "string", "default": "24h"}, ], ["entity", "sentiment", "score", "cluster_count"], ["Use after locating a cluster set or entity neighborhood."], ), _tool_card( "get_related_recent_entities", "Blend local co-occurrence with Google Trends related topics, while preserving mids where available.", [ {"name": "subject", "type": "string", "meaning": "canonical entity or subject phrase"}, {"name": "timeframe", "type": "string", "default": "72h"}, {"name": "limit", "type": "integer", "default": 10, "range": "1-25"}, {"name": "include_trends", "type": "boolean", "default": True}, ], ["subject", "related[].normalized", "related[].canonical_label", "related[].mid", "related[].sources", "related[].scores"], ["Use this to drill from a subject into related entities, then feed those into get_events_for_entity."], ), ] NEWS_COMPOSITION_RECIPES = [ { "name": "fresh-news-tail", "steps": [ "get_latest_events(topic=...)", "optionally get_event_summary(event_id=...) for the strongest cluster", ], "notes": ["Best for a quick tail of what is happening now."] }, { "name": "entity-deep-dive", "steps": [ "get_events_for_entity(entity=...)", "get_event_summary(event_id=...)", "get_news_sentiment(entity=..., timeframe=...)", ], "notes": ["Prefer canonical entity labels when you have them; the server normalizes for you."], }, { "name": "subject-neighborhood", "steps": [ "get_related_recent_entities(subject=...)", "for each strong related entity, call get_events_for_entity(entity=...)", ], "notes": ["Use this when you want a graph-like expansion around a subject."] }, { "name": "emerging-signal", "steps": [ "detect_emerging_topics(limit=...)", "choose a topic/entity", "get_events_for_entity(entity=...)", "get_news_sentiment(entity=...)", ], "notes": ["Good for trend scouting and risk mapping."], }, ] NEWS_AGENT_TIPS = [ "If you need a fast answer, start with get_latest_events, then summarize the strongest cluster with get_event_summary.", "If a user asks about a person/place/company/theme, use get_events_for_entity before broadening to get_related_recent_entities.", "Treat cluster_id as an internal cursor, not user-facing output; use it only for follow-up tool calls.", "When describing clusters, keep sources and timestamps visible so the user can assess recency and provenance.", "Prefer a short chain of tools over many parallel calls unless you are building a neighborhood map or comparison table.", "For tricky names, rely on the server’s resolver instead of inventing alias rules in the client.", ] NEWS_EXAMPLE_CHAINS = [ { "task": "What is happening now?", "chain": [ "get_latest_events(topic=...)", "get_event_summary(event_id=...) if one cluster looks important", ], }, { "task": "Deep dive on an entity", "chain": [ "get_events_for_entity(entity=..., timeframe=...)", "get_news_sentiment(entity=..., timeframe=...)", "get_event_summary(event_id=...) for the strongest cluster", ], }, { "task": "Broaden from a subject", "chain": [ "get_related_recent_entities(subject=..., include_trends=true)", "get_events_for_entity(entity=...) for the strongest related entities", ], }, { "task": "Find what is emerging", "chain": [ "detect_emerging_topics(limit=...)", "get_events_for_entity(entity=...) on one or two emerging terms", ], }, ] @mcp.tool(description="Investigate a topic and return the newest deduplicated news clusters, sorted by recency.") async def get_latest_events(topic: str = "crypto", limit: int = 5, include_articles: bool = False): limit = max(1, min(int(limit), 20)) # If the caller passes an entity-like value, resolve it and use the canonical # entity as the query lens. Otherwise keep the original topic path. topic_norm = normalize_query(topic).lower() resolved = resolve_entity_via_trends(topic_norm) allowed = {t.lower() for t in DEFAULT_TOPICS} is_topic = topic_norm in allowed query_terms = { topic_norm, str(resolved.get("normalized") or "").strip().lower(), str(resolved.get("canonical_label") or "").strip().lower(), str(resolved.get("mid") or "").strip().lower(), } query_terms = {q for q in query_terms if q} store = SQLiteClusterStore(DB_PATH) if is_topic: # Cache-first: only refresh if we currently have no fresh clusters for this topic. clusters = store.get_latest_clusters(topic=topic_norm, ttl_hours=DEFAULT_LOOKBACK_HOURS, limit=limit) if not clusters: await refresh_clusters(topic=topic_norm, limit=200) clusters = store.get_latest_clusters(topic=topic_norm, ttl_hours=DEFAULT_LOOKBACK_HOURS, limit=limit) else: # Entity-aware mode: search recent clusters across all topics and match by # raw entity, canonical label, or MID. clusters = store.get_latest_clusters_all_topics(ttl_hours=DEFAULT_LOOKBACK_HOURS, limit=limit * 8) filtered = [] for c in clusters: haystack = _cluster_entity_haystack(c) if any(any(term in item for item in haystack) for term in query_terms): filtered.append(c) if len(filtered) >= limit: break clusters = filtered out = [] for c in _sort_clusters_by_recency(clusters): item = { "cluster_id": c.get("cluster_id"), "headline": c.get("headline"), "summary": c.get("summary"), "entities": c.get("entities", []), "sentiment": c.get("sentiment", "neutral"), "importance": c.get("importance", 0.0), "sources": c.get("sources", []), "timestamp": c.get("timestamp"), } if include_articles: # Return minimal article fields to keep responses compact. arts = c.get("articles", []) or [] item["articles"] = [ { "title": a.get("title"), "url": a.get("url"), "source": a.get("source"), "timestamp": a.get("timestamp"), } for a in arts if isinstance(a, dict) ] out.append(item) return out @mcp.tool(description="Investigate a person, company, place, or theme by matching extracted entities within a time window.") async def get_events_for_entity(entity: str, limit: int = 10, timeframe: str = "24h", include_articles: bool = False): limit = max(1, min(int(limit), 30)) query = normalize_query(entity).strip().lower() if not query: return [] resolved = resolve_entity_via_trends(query) query_terms = { query, str(resolved.get("normalized") or "").strip().lower(), str(resolved.get("canonical_label") or "").strip().lower(), str(resolved.get("mid") or "").strip().lower(), } query_terms = {q for q in query_terms if q} store = SQLiteClusterStore(DB_PATH) def _match_clusters(clusters: list[dict]) -> list[dict]: hits: list[dict] = [] for c in _sort_clusters_by_recency(clusters): haystack = _cluster_entity_haystack(c) if any(any(term in item for item in haystack) for term in query_terms): hits.append(c) if len(hits) >= limit: break return hits hours = _parse_timeframe_to_hours(timeframe) clusters = store.get_latest_clusters_all_topics(ttl_hours=hours, limit=max(200, limit * 10)) hits = _match_clusters(clusters) out = [] for c in hits: item = { "cluster_id": c.get("cluster_id"), "headline": c.get("headline"), "summary": c.get("summary"), "entities": c.get("entities", []), "sentiment": c.get("sentiment", "neutral"), "importance": c.get("importance", 0.0), "sources": c.get("sources", []), "timestamp": c.get("timestamp"), } if include_articles: arts = c.get("articles", []) or [] item["articles"] = [ { "title": a.get("title"), "url": a.get("url"), "source": a.get("source"), "timestamp": a.get("timestamp"), } for a in arts if isinstance(a, dict) ] out.append(item) return out @mcp.tool(description="Return entities most commonly associated with the subject in recent clusters, optionally blended with Google Trends suggestions.") async def get_related_recent_entities(subject: str, timeframe: str = "72h", limit: int = 10, include_trends: bool = True): limit = max(1, min(int(limit), 25)) hours = _parse_timeframe_to_hours(timeframe) include_trends_bool = str(include_trends).strip().lower() not in {"false", "0", "no"} store = SQLiteClusterStore(DB_PATH) result = related_recent_entities( store=store, subject=subject, timeframe_hours=hours, limit=limit, include_trends=include_trends_bool, ) return result @mcp.tool(description="Investigate one cluster in depth and return a concise LLM-written explanation plus key facts.") async def get_event_summary(event_id: str, include_articles: bool = False): store = SQLiteClusterStore(DB_PATH) # Summary cache: reuse if present within TTL. cached_summary = store.get_cluster_summary( cluster_id=event_id, ttl_hours=DEFAULT_LOOKBACK_HOURS, ) if cached_summary: out = { "event_id": event_id, "headline": cached_summary.get("headline"), "mergedSummary": cached_summary.get("mergedSummary"), "keyFacts": cached_summary.get("keyFacts", []), "sources": cached_summary.get("sources", []), } if include_articles: cluster = store.get_cluster_by_id(event_id) arts = (cluster or {}).get("articles", []) or [] out["articles"] = [ { "title": a.get("title"), "url": a.get("url"), "source": a.get("source"), "timestamp": a.get("timestamp"), } for a in arts if isinstance(a, dict) ] return out cluster = store.get_cluster_by_id(event_id) if not cluster: return { "event_id": event_id, "error": "NOT_FOUND", } articles_out = None if include_articles: arts = cluster.get("articles", []) or [] articles_out = [ { "title": a.get("title"), "url": a.get("url"), "source": a.get("source"), "timestamp": a.get("timestamp"), } for a in arts if isinstance(a, dict) ] summary = await summarize_cluster_llm(cluster) store.upsert_cluster_summary(event_id, summary) out = { "event_id": event_id, "headline": summary.get("headline"), "mergedSummary": summary.get("mergedSummary"), "keyFacts": summary.get("keyFacts", []), "sources": summary.get("sources", []), } if include_articles: out["articles"] = articles_out or [] return out @mcp.tool(description="Explore what is starting to matter: surface emerging entities and phrases from recent clusters.") async def detect_emerging_topics(limit: int = 10): limit = max(1, min(int(limit), 20)) store = SQLiteClusterStore(DB_PATH) clusters = store.get_latest_clusters_all_topics(ttl_hours=DEFAULT_LOOKBACK_HOURS, limit=200) import re entity_counts = Counter() entity_importance_sum = Counter() # co-occurrence: ent -> other_ent -> count entity_cooccur = {} phrase_counts = Counter() topic_counts = Counter() # Very light heuristics to reduce “meta entities” dominating emerging topics. # Keep it conservative: only skip obvious boilerplate. def _is_generic_entity(ent: str) -> bool: e = str(ent).strip().lower() if not e: return True if len(e) < 4: return True # common outlet-ish / meta-ish tokens if e in {"news", "latest", "breaking"}: return True return False for c in clusters: topic_counts[c.get("topic", "other")] += 1 ents_in_cluster = [e for e in (c.get("entities", []) or []) if not _is_generic_entity(e)] ents_in_cluster_norm = [str(e).strip().lower() for e in ents_in_cluster if str(e).strip()] for ent in ents_in_cluster_norm: if _is_generic_entity(ent): continue entity_counts[ent] += 1 try: entity_importance_sum[ent] += float(c.get("importance", 0.0) or 0.0) except Exception: pass # update co-occurrence counts for i in range(len(ents_in_cluster_norm)): a = ents_in_cluster_norm[i] if not a: continue entity_cooccur.setdefault(a, Counter()) for j in range(len(ents_in_cluster_norm)): if i == j: continue b = ents_in_cluster_norm[j] if not b: continue entity_cooccur[a][b] += 1 text = f"{c.get('headline','')} {c.get('summary','')}" words = [w for w in re.findall(r"[A-Za-z][A-Za-z0-9\-]{2,}", text.lower())] for i in range(len(words) - 1): phrase = f"{words[i]} {words[i+1]}" if len(phrase) > 6: phrase_counts[phrase] += 1 emerging = [] # Combine frequency with average importance so “big signal” rises over pure repetition. for ent, count in entity_counts.most_common(limit): avg_imp = entity_importance_sum[ent] / max(1, count) # avg_imp is typically 0..~1; keep score bounded. trend_score = 0.25 + 0.40 * min(1.0, avg_imp) + 0.08 * min(6.0, float(count)) related = [] for other, _cnt in (entity_cooccur.get(ent) or Counter()).most_common(3): # avoid returning the entity itself (shouldn't happen, but be safe) if other != ent: related.append(other) emerging.append({ "topic": ent, "trend_score": min(0.99, round(trend_score, 2)), "related_entities": related if related else [ent], "signal_type": "entity", "count": count, "avg_importance": round(avg_imp, 3), }) for phrase, count in phrase_counts.most_common(limit * 2): if any(item["topic"] == phrase for item in emerging): continue emerging.append({ "topic": phrase.title(), "trend_score": min(0.99, round(0.20 + 0.10 * count, 2)), "related_entities": [], "signal_type": "phrase", "count": count, }) if len(emerging) >= limit: break return emerging[:limit] @mcp.tool(description="Investigate whether sentiment around an entity is positive, negative, or neutral over a chosen lookback window.") async def get_news_sentiment(entity: str, timeframe: str = "24h"): store = SQLiteClusterStore(DB_PATH) ent = normalize_query(entity).strip().lower() resolved = resolve_entity_via_trends(ent) query_terms = { ent, str(resolved.get("normalized") or "").strip().lower(), str(resolved.get("canonical_label") or "").strip().lower(), str(resolved.get("mid") or "").strip().lower(), } query_terms = {q for q in query_terms if q} if not ent: return { "entity": entity, "sentiment": "neutral", "score": 0.0, "cluster_count": 0, } # timeframe: accept '24h' or '24' tf = str(timeframe).strip().lower() try: hours = int(tf[:-1]) if tf.endswith("h") else int(tf) except Exception: hours = 24 hours = max(1, min(int(hours), 168)) clusters = store.get_latest_clusters_all_topics(ttl_hours=hours, limit=500) matched = [] for c in clusters: haystack = _cluster_entity_haystack(c) if any(any(term in item for item in haystack) for term in query_terms): matched.append(c) if not matched: return { "entity": entity, "sentiment": "neutral", "score": 0.0, "cluster_count": 0, } scores = [] for c in matched: s = c.get("sentimentScore") if s is not None: try: scores.append(float(s)) except Exception: pass avg_score = sum(scores) / len(scores) if scores else 0.0 # Keep the label aligned with the numeric score. # Small magnitudes are treated as neutral to avoid noisy label flips. if avg_score >= 0.15: sentiment = "positive" elif avg_score <= -0.15: sentiment = "negative" else: sentiment = "neutral" return { "entity": entity, "sentiment": sentiment, "score": round(avg_score, 3), "cluster_count": len(matched), } @mcp.tool(description="Describe the server tool surface, how tools fit together, and output conventions for downstream agents.") async def get_capabilities(): return { "server": { "name": "news-mcp", "purpose": "Recent news clusters, entity drill-down, sentiment, emerging topics, and related-entity expansion.", "output_conventions": { "cluster_ids": "Do not surface cluster_id in user-facing prose unless explicitly requested; treat it as internal navigation metadata.", "sources": "Always preserve and display sources when summarizing a cluster or entity result.", "timestamps": "Mention timestamps consistently when comparing multiple clusters or when recency matters.", }, }, "tools": NEWS_TOOL_CARDS, "recipes": NEWS_COMPOSITION_RECIPES, "example_chains": NEWS_EXAMPLE_CHAINS, "agent_tips": NEWS_AGENT_TIPS, "guidance": [ "Use get_latest_events for a tail, get_events_for_entity for entity deep dives, and get_related_recent_entities for neighborhood expansion.", "Prefer normalized/canonical entities when possible, but the server will resolve common aliases and MIDs for you.", "When presenting results to users, summarize the cluster; avoid exposing internal IDs unless they are needed for follow-up tool calls.", ], } def _parse_timeframe_to_hours(timeframe: str) -> int: tf = str(timeframe).strip().lower() try: if tf.endswith("d"): days = int(tf[:-1]) return max(1, days * 24) if tf.endswith("h"): return max(1, int(tf[:-1])) return max(1, int(tf)) except Exception: return 24 from contextlib import asynccontextmanager @asynccontextmanager async def _lifespan(app: FastAPI): asyncio.ensure_future(_background_refresh_loop()) yield app = FastAPI(title="News MCP Server", lifespan=_lifespan) logger = logging.getLogger("news_mcp.startup") app.mount("/mcp", mcp.sse_app()) # Shared store — single connection pool _shared_store = SQLiteClusterStore(DB_PATH) _refresh_lock = asyncio.Lock() _refresh_started = False async def _background_refresh_loop(): """Non-blocking background refresher: prune then poll. Protected by an async lock so a second event-loop wake-up cannot start a parallel ingestion cycle. """ global _refresh_started async with _refresh_lock: if _refresh_started: return _refresh_started = True logger.info("news-mcp llm config: %s", active_llm_config()) # Prune off-thread so we do not block the event loop prune_result = await asyncio.to_thread( _shared_store.prune_if_due, NEWS_PRUNING_ENABLED, NEWS_RETENTION_DAYS, NEWS_PRUNE_INTERVAL_HOURS, ) logger.info("startup prune_result=%s", prune_result) if not NEWS_BACKGROUND_REFRESH_ENABLED: return async def _loop(): if not NEWS_BACKGROUND_REFRESH_ON_START: logger.info("background refresh delayed start interval_seconds=%s", NEWS_REFRESH_INTERVAL_SECONDS) await asyncio.sleep(float(NEWS_REFRESH_INTERVAL_SECONDS)) while True: try: logger.info("background refresh tick start") await refresh_clusters(topic=None, limit=200) logger.info("background refresh tick complete") except Exception: logger.exception("background refresh tick failed") await asyncio.sleep(float(NEWS_REFRESH_INTERVAL_SECONDS)) asyncio.create_task(_loop()) @app.get("/") def root(): return { "status": "ok", "transport": "fastmcp+sse", "mount": "/mcp", "tools": [ "get_latest_events", "get_events_for_entity", "get_event_summary", "detect_emerging_topics", "get_news_sentiment", "get_related_recent_entities", "get_capabilities", ], "refresh": { "enabled": NEWS_BACKGROUND_REFRESH_ENABLED, "interval_seconds": NEWS_REFRESH_INTERVAL_SECONDS, }, "retention": { "lookback_hours": DEFAULT_LOOKBACK_HOURS, "retention_days": NEWS_RETENTION_DAYS, }, "pruning": { "enabled": NEWS_PRUNING_ENABLED, "interval_hours": NEWS_PRUNE_INTERVAL_HOURS, }, } # ------------------------------------------------------------------ # Dashboard REST API endpoints # ------------------------------------------------------------------ from fastapi.staticfiles import StaticFiles from fastapi.responses import JSONResponse app.mount("/dashboard", StaticFiles(directory="dashboard", html=True), name="dashboard") import logging as _log API_LOG = _log.getLogger("news_mcp.api") def _api_ok(data: dict) -> dict: return data def _api_err(exc: Exception, ctx: str) -> JSONResponse: API_LOG.exception(f"API error in {ctx}") return JSONResponse(status_code=500, content={"error": str(exc), "ctx": ctx}) @app.get("/api/v1/health") def api_health(): """Extended health + dashboard stats.""" try: store = DashboardStore(_shared_store) return store.get_dashboard_stats() except Exception as e: return _api_err(e, "health") @app.get("/api/v1/clusters") def api_clusters( topic: str | None = None, hours: int = 24, limit: int = 50, offset: int = 0, ): """Paginated cluster listing.""" try: store = DashboardStore(_shared_store) clusters = store.get_clusters_page(topic=topic, hours=hours, limit=limit, offset=offset) with store._store._conn() as conn: if topic and topic != "all": count_row = conn.execute( "SELECT COUNT(*) FROM clusters WHERE updated_at >= datetime('now', ? || ' hours') AND topic = ?", (-hours, topic), ).fetchone() else: count_row = conn.execute( "SELECT COUNT(*) FROM clusters WHERE updated_at >= datetime('now', ? || ' hours')", (-hours,), ).fetchone() total = count_row[0] if count_row else 0 return {"clusters": clusters, "total": total, "topic": topic or "all", "hours": hours} except Exception as e: return _api_err(e, f"clusters(topic={topic},hours={hours})") @app.get("/api/v1/sentiment-series") def api_sentiment_series( topic: str | None = None, hours: int = 24, bucket_hours: float = 1.0, ): """Sentiment time-series for Chart.js.""" try: store = DashboardStore(_shared_store) series = store.get_sentiment_series(topic=topic, hours=hours, bucket_hours=bucket_hours) return {"series": series, "topic": topic or "all"} except Exception as e: return _api_err(e, f"sentiment(topic={topic})") @app.get("/api/v1/entities") def api_entities( hours: int = 24, limit: int = 30, ): """Top entity frequencies.""" try: store = DashboardStore(_shared_store) entities = store.get_entity_frequencies(hours=hours, limit=limit) return {"entities": entities, "hours": hours} except Exception as e: return _api_err(e, f"entities(hours={hours})") @app.get("/api/v1/cluster/{cluster_id}") def api_cluster_detail(cluster_id: str): """Full cluster detail for drill-down.""" try: store = DashboardStore(_shared_store) detail = store.get_cluster_detail(cluster_id) if not detail: return JSONResponse(status_code=404, content={"error": "Cluster not found", "id": cluster_id}) return detail except Exception as e: return _api_err(e, f"detail({cluster_id})") @app.get("/health") def health(): return { "status": "ok", "lookback_hours": DEFAULT_LOOKBACK_HOURS, "db": str(DB_PATH), "last_refresh_at": _shared_store.get_meta("last_refresh_at"), "feeds": _shared_store.get_all_feed_states(), "pruning": _shared_store.get_prune_state( pruning_enabled=NEWS_PRUNING_ENABLED, retention_days=NEWS_RETENTION_DAYS, interval_hours=NEWS_PRUNE_INTERVAL_HOURS, ), }