| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377 |
- from __future__ import annotations
- import asyncio
- import hashlib
- import json
- import logging
- import math
- import re
- import time
- from collections import Counter
- from datetime import datetime, timezone
- from pathlib import Path
- from fastapi import FastAPI, Form
- from mcp.server.fastmcp import FastMCP
- from mcp.server.transport_security import TransportSecuritySettings
- from news_mcp.config import DEFAULT_LOOKBACK_HOURS, DEFAULT_TOPICS, DB_PATH
- from news_mcp.config import (
- NEWS_FEED_URL,
- NEWS_FEED_URLS,
- NEWS_PRUNE_INTERVAL_HOURS,
- NEWS_PRUNING_ENABLED,
- NEWS_REFRESH_INTERVAL_SECONDS,
- NEWS_BACKGROUND_REFRESH_ENABLED,
- NEWS_BACKGROUND_REFRESH_ON_START,
- NEWS_RETENTION_DAYS,
- )
- from news_mcp.jobs.poller import refresh_clusters
- from news_mcp.storage.sqlite_store import SQLiteClusterStore
- from news_mcp.enrichment.llm_enrich import summarize_cluster_llm
- from news_mcp.trends_resolution import resolve_entity_via_trends
- from news_mcp.llm import active_llm_config
- from news_mcp.entity_normalize import normalize_query
- from news_mcp.related_entities import related_recent_entities
- logging.basicConfig(
- level=logging.INFO,
- format="%(asctime)s %(levelname)s %(name)s: %(message)s",
- )
- _PROCESS_STARTED_AT = time.monotonic()
- _PACKAGE_DIR = Path(__file__).resolve().parent
- def _compute_version_hash() -> str:
- """SHA-256 of all .py files under news_mcp/, sorted by relative path.
- Deterministic across machines and environments — no git dependency.
- Works identically in Docker and native runs.
- """
- hasher = hashlib.sha256()
- for f in sorted(_PACKAGE_DIR.rglob("*.py")):
- try:
- hasher.update(f.read_bytes())
- except OSError:
- continue
- return hasher.hexdigest()[:9]
- _VERSION_HASH = _compute_version_hash()
- mcp = FastMCP(
- "news-mcp",
- transport_security=TransportSecuritySettings(enable_dns_rebinding_protection=False),
- )
- def _cluster_entity_haystack(cluster: dict) -> list[str]:
- """Collect the normalized entity + keyword clues attached to a cluster."""
- values: list[str] = []
- for ent in cluster.get("entities", []) or []:
- values.append(str(ent).strip().lower())
- for res in cluster.get("entityResolutions", []) or []:
- if not isinstance(res, dict):
- continue
- for key in ("normalized", "canonical_label", "mid"):
- val = res.get(key)
- if val:
- values.append(str(val).strip().lower())
- # Keywords are LLM-curated thematic descriptors — include them in the
- # searchable haystack so entity/theme queries match on subject-matter
- # signals, not just named entities.
- for kw in cluster.get("keywords", []) or []:
- values.append(str(kw).strip().lower())
- return [v for v in values if v]
- def _parse_cluster_timestamp(value) -> datetime:
- """Parse a stored cluster timestamp.
- payload.timestamp is guaranteed ISO 8601 UTC (YYYY-MM-DDTHH:MM:SS+00:00)
- at write time. Only datetime.fromisoformat is needed — no RFC 2822 fallback.
- """
- if not value:
- return datetime.min.replace(tzinfo=timezone.utc)
- text = str(value).strip()
- if not text:
- return datetime.min.replace(tzinfo=timezone.utc)
- try:
- dt = datetime.fromisoformat(text)
- if dt.tzinfo is None:
- dt = dt.replace(tzinfo=timezone.utc)
- return dt.astimezone(timezone.utc)
- except Exception:
- return datetime.min.replace(tzinfo=timezone.utc)
- def _sort_clusters_by_recency(clusters: list[dict]) -> list[dict]:
- return sorted(
- clusters,
- key=lambda c: (
- _parse_cluster_timestamp(c.get("timestamp")),
- float(c.get("importance", 0.0) or 0.0),
- ),
- reverse=True,
- )
- def _tool_card(name: str, description: str, inputs: list[dict], outputs: list[str], notes: list[str] | None = None) -> dict:
- return {
- "name": name,
- "description": description,
- "inputs": inputs,
- "outputs": outputs,
- "notes": notes or [],
- }
- NEWS_TOOL_CARDS = [
- _tool_card(
- "get_feeds",
- "List all configured RSS feeds with their enabled/disabled status.",
- [],
- ["feeds[]: {feed_key, enabled, last_hash, last_item_count, updated_at}"],
- ["Use this to see which feeds are currently active or disabled."],
- ),
- _tool_card(
- "toggle_feed",
- "Enable or disable a specific RSS feed by URL.",
- [
- {"name": "feed_url", "type": "string", "meaning": "the feed URL to toggle"},
- {"name": "enabled", "type": "boolean", "meaning": "true to enable, false to disable"},
- ],
- ["ok", "feed_key", "enabled"],
- ["Changes take effect on the next refresh cycle."],
- ),
- _tool_card(
- "get_latest_events",
- "Get the newest deduplicated clusters for a topic or resolved entity-like query.",
- [
- {"name": "topic", "type": "string", "default": "all topics", "meaning": "coarse category (crypto, macro, regulation, ai, other), entity-like topic, or omit for all topics"},
- {"name": "limit", "type": "integer", "default": 5, "range": "1-20"},
- {"name": "include_articles", "type": "boolean", "default": False},
- ],
- ["headline", "summary", "entities", "keywords", "sentiment", "importance", "sources", "timestamp", "articles?"],
- ["Use when you want the freshest clusters. Each cluster includes both named entities and LLM-curated thematic keywords describing what the story is about."],
- ),
- _tool_card(
- "get_events_for_entity",
- "Search recent clusters for a person, place, company, theme, or keyword by matching entities and thematic keywords.",
- [
- {"name": "entity", "type": "string", "meaning": "entity label, phrase, or keyword to search for"},
- {"name": "timeframe", "type": "string", "default": "24h", "examples": ["24h", "72h", "3d"]},
- {"name": "limit", "type": "integer", "default": 10, "range": "1-30"},
- {"name": "include_articles", "type": "boolean", "default": False},
- ],
- ["headline", "summary", "entities", "keywords", "sentiment", "importance", "sources", "timestamp", "articles?"],
- ["Matches against both named entities and thematic keywords. Use this for an entity-centered or theme-centered deep dive."],
- ),
- _tool_card(
- "get_event_summary",
- "Produce a concise LLM-written explanation for one cluster and key facts.",
- [
- {"name": "event_id", "type": "string", "meaning": "cluster_id; do not surface in user-facing prose"},
- {"name": "include_articles", "type": "boolean", "default": True},
- ],
- ["headline", "mergedSummary", "keyFacts", "sources", "entities", "keywords", "related_entities", "related_keywords", "topic", "sentiment", "importance", "articles"],
- ["Rich cluster drill-down. Returns LLM summary + cluster metadata + articles. Defaults to include articles."],
- ),
- _tool_card(
- "detect_emerging_topics",
- "Surface emerging entities, thematic keywords, and phrases that are accelerating in the recent window.",
- [
- {"name": "limit", "type": "integer", "default": 10, "range": "1-20"},
- {"name": "timeframe", "type": "string", "default": "24h", "examples": ["4h", "24h", "3d"]},
- {"name": "topic", "type": "string", "default": "all topics", "examples": ["crypto", "macro", "regulation", "ai", "other"]},
- {"name": "around", "type": "string", "default": "none", "meaning": "entity to scope results to its neighborhood (e.g. \"Bitcoin\")"},
- ],
- ["topic", "trend_score", "velocity", "recent_count", "prior_count", "source_count", "related_entities", "related_keywords", "signal_type"],
- ["Use timeframe to control lookback, topic to scope to a category, around to find what's emerging near a specific entity. Signal types: entity (named entity), keyword (thematic descriptor), phrase (headline bigram). Check velocity and source_count to distinguish real spikes from noise."],
- ),
- _tool_card(
- "get_news_sentiment",
- "Estimate sentiment around an entity or keyword over a lookback window.",
- [
- {"name": "entity", "type": "string", "meaning": "entity label, phrase, or keyword to analyze"},
- {"name": "timeframe", "type": "string", "default": "24h"},
- ],
- ["entity", "sentiment", "score", "cluster_count"],
- ["Matches clusters by entities and keywords. Use after locating a cluster set or entity neighborhood."],
- ),
- _tool_card(
- "get_related_recent_entities",
- "Find entities and thematic keywords commonly co-occurring with a subject in recent clusters, optionally blended with Google Trends suggestions.",
- [
- {"name": "subject", "type": "string", "meaning": "canonical entity or subject phrase"},
- {"name": "timeframe", "type": "string", "default": "72h"},
- {"name": "limit", "type": "integer", "default": 10, "range": "1-25"},
- {"name": "include_trends", "type": "boolean", "default": True},
- ],
- ["subject", "related[].normalized", "related[].canonical_label", "related[].mid", "related[].sources", "related[].scores"],
- ["Use this to drill from a subject into related entities and themes, then feed results into get_events_for_entity."],
- ),
- ]
- NEWS_COMPOSITION_RECIPES = [
- {
- "name": "fresh-news-tail",
- "steps": [
- "get_latest_events(topic=...)",
- "optionally get_event_summary(event_id=...) for the strongest cluster",
- ],
- "notes": ["Best for a quick tail of what is happening now. Omit topic for all topics, or pass crypto/macro/regulation/ai/other to filter."]
- },
- {
- "name": "entity-deep-dive",
- "steps": [
- "get_events_for_entity(entity=...)",
- "get_event_summary(event_id=...)",
- "get_news_sentiment(entity=..., timeframe=...)",
- ],
- "notes": ["Prefer canonical entity labels when you have them; the server normalizes for you."],
- },
- {
- "name": "subject-neighborhood",
- "steps": [
- "get_related_recent_entities(subject=...)",
- "for each strong related entity, call get_events_for_entity(entity=...)",
- ],
- "notes": ["Use this when you want a graph-like expansion around a subject."]
- },
- {
- "name": "emerging-signal",
- "steps": [
- "detect_emerging_topics(limit=..., timeframe=..., topic=..., around=...)",
- "choose a topic/entity from the results",
- "get_events_for_entity(entity=...)",
- "get_news_sentiment(entity=...)",
- ],
- "notes": ["Use timeframe to control lookback (e.g. \"4h\" for what's hot right now, \"3d\" for weekly trends), topic to scope to a category, around to find what's emerging near a specific entity. Check velocity and source_count to distinguish real spikes from noise."],
- },
- ]
- NEWS_AGENT_TIPS = [
- "If you need a fast answer, start with get_latest_events, then summarize the strongest cluster with get_event_summary.",
- "If a user asks about a person/place/company/theme, use get_events_for_entity before broadening to get_related_recent_entities.",
- "Treat cluster_id as an internal cursor, not user-facing output; use it only for follow-up tool calls.",
- "When describing clusters, keep sources and timestamps visible so the user can assess recency and provenance.",
- "Prefer a short chain of tools over many parallel calls unless you are building a neighborhood map or comparison table.",
- "For tricky names, rely on the server's resolver instead of inventing alias rules in the client.",
- "Use detect_emerging_topics with timeframe=\"4h\" for what's hot right now, timeframe=\"3d\" for weekly trends. Use topic= to scope to a category, around= to find what's emerging near a specific entity. Check velocity to distinguish accelerating signals from steady-state ones. Filter by signal_type to focus on entities, keywords, or phrases. Each result also includes related_keywords for thematic context.",
- "get_event_summary returns a rich result: headline, mergedSummary, keyFacts, entities, keywords, related_entities, related_keywords, topic, sentiment, importance, and articles (included by default). Use it for full cluster drill-down.",
- "Each cluster contains both entities (named entities with identity resolution) and keywords (thematic descriptors). Use keywords to understand what a story is about beyond the named entities.",
- "Use detect_emerging_topics with multiple timeframes (e.g. 4h vs 3d) and compare results to distinguish what's hot right now vs what's persistently trending. related_keywords help identify thematic neighborhoods.",
- ]
- NEWS_EXAMPLE_CHAINS = [
- {
- "task": "What is happening now?",
- "chain": [
- "get_latest_events(topic=...)",
- "get_event_summary(event_id=...) if one cluster looks important",
- ],
- },
- {
- "task": "Deep dive on an entity",
- "chain": [
- "get_events_for_entity(entity=..., timeframe=...)",
- "get_news_sentiment(entity=..., timeframe=...)",
- "get_event_summary(event_id=...) for the strongest cluster",
- ],
- },
- {
- "task": "Broaden from a subject",
- "chain": [
- "get_related_recent_entities(subject=..., include_trends=true)",
- "get_events_for_entity(entity=...) for the strongest related entities",
- ],
- },
- {
- "task": "Find what is emerging",
- "chain": [
- "detect_emerging_topics(limit=..., timeframe=..., topic=..., around=...) with optional scoping",
- "get_events_for_entity(entity=...) on one or two emerging terms",
- ],
- },
- {
- "task": "What's heating up around a specific entity",
- "chain": [
- "detect_emerging_topics(around=\"<entity>\", timeframe=\"4h\")",
- "get_events_for_entity(entity=...) on the top emerging neighbor",
- ],
- },
- {
- "task": "Full investigation pipeline",
- "chain": [
- "detect_emerging_topics(limit=20, timeframe=\"3d\")",
- "pick an emerging entity/keyword and note its related_entities and related_keywords",
- "get_event_summary(event_id=...) on the top cluster for full context including articles",
- "get_news_sentiment(entity=...) to gauge tone around the emerging topic",
- "detect_emerging_topics(around=<entity>, timeframe=\"4h\") to scout its neighborhood",
- ],
- },
- ]
- def _configured_feed_urls() -> list[str]:
- """Return the configured feed URLs from environment variables."""
- urls = [u.strip() for u in NEWS_FEED_URLS.split(",") if u.strip()]
- if not urls:
- urls = [NEWS_FEED_URL]
- return urls
- @mcp.tool(description="List all configured RSS feeds with their current enabled/disabled status.")
- async def get_feeds() -> list[dict]:
- """Return each feed URL with its enabled flag, last fetch stats, and timestamps."""
- store = SQLiteClusterStore(DB_PATH)
- return store.get_feed_state_list()
- @mcp.tool(description="Enable or disable a specific RSS feed by URL.")
- async def toggle_feed(feed_url: str, enabled: bool) -> dict:
- """Toggle a feed's active/inactive state.
- Changes take effect on the next background refresh cycle.
- Returns the updated feed state.
- """
- store = SQLiteClusterStore(DB_PATH)
- store.set_feed_enabled(feed_url.strip(), enabled)
- updated = store.get_feed_state(feed_url.strip())
- return {"ok": True, "feed_key": feed_url.strip(), "enabled": enabled, "details": updated}
- @mcp.tool(description="Investigate a topic and return the newest deduplicated news clusters with entities and thematic keywords, sorted by recency.")
- async def get_latest_events(topic: str | None = None, limit: int = 5, include_articles: bool = False):
- limit = max(1, min(int(limit), 20))
- # When topic is omitted, search across all topics (no topic filter).
- # When topic is provided and matches a known topic, filter by that topic.
- # Otherwise treat the value as an entity-like query.
- topic_norm = normalize_query(topic).lower() if topic else ""
- resolved = resolve_entity_via_trends(topic_norm) if topic_norm else {}
- allowed = {t.lower() for t in DEFAULT_TOPICS}
- is_topic = topic_norm in allowed
- is_all_topics = not topic_norm
- query_terms = {
- topic_norm,
- str(resolved.get("normalized") or "").strip().lower(),
- str(resolved.get("canonical_label") or "").strip().lower(),
- str(resolved.get("mid") or "").strip().lower(),
- }
- query_terms = {q for q in query_terms if q}
- store = SQLiteClusterStore(DB_PATH)
- if is_all_topics:
- # No topic specified: return freshest clusters across all topics.
- clusters = store.get_latest_clusters_all_topics(ttl_hours=DEFAULT_LOOKBACK_HOURS, limit=limit)
- elif is_topic:
- # Cache-first: only refresh if we currently have no fresh clusters for this topic.
- clusters = store.get_latest_clusters(topic=topic_norm, ttl_hours=DEFAULT_LOOKBACK_HOURS, limit=limit)
- if not clusters:
- await refresh_clusters(topic=topic_norm, limit=200)
- clusters = store.get_latest_clusters(topic=topic_norm, ttl_hours=DEFAULT_LOOKBACK_HOURS, limit=limit)
- else:
- # Entity-aware mode: search recent clusters across all topics and match by
- # raw entity, canonical label, or MID using SQL-level junction table search.
- clusters = store.get_clusters_by_entity_or_keyword(
- query_terms=query_terms, hours=DEFAULT_LOOKBACK_HOURS, limit=limit
- )
- out = []
- for c in _sort_clusters_by_recency(clusters):
- item = {
- "cluster_id": c.get("cluster_id"),
- "headline": c.get("headline"),
- "summary": c.get("summary"),
- "entities": c.get("entities", []),
- "keywords": c.get("keywords", []),
- "sentiment": c.get("sentiment", "neutral"),
- "importance": c.get("importance", 0.0),
- "sources": c.get("sources", []),
- "timestamp": c.get("timestamp"),
- }
- if include_articles:
- # Return minimal article fields to keep responses compact.
- arts = c.get("articles", []) or []
- item["articles"] = [
- {
- "title": a.get("title"),
- "url": a.get("url"),
- "source": a.get("source"),
- "timestamp": a.get("timestamp"),
- }
- for a in arts
- if isinstance(a, dict)
- ]
- out.append(item)
- return out
- @mcp.tool(description="Investigate a person, company, place, theme, or keyword by matching entities and thematic keywords within a time window.")
- async def get_events_for_entity(entity: str, limit: int = 10, timeframe: str = "24h", include_articles: bool = False):
- limit = max(1, min(int(limit), 30))
- query = normalize_query(entity).strip().lower()
- if not query:
- return []
- resolved = resolve_entity_via_trends(query)
- query_terms = {
- query,
- str(resolved.get("normalized") or "").strip().lower(),
- str(resolved.get("canonical_label") or "").strip().lower(),
- str(resolved.get("mid") or "").strip().lower(),
- }
- query_terms = {q for q in query_terms if q}
- store = SQLiteClusterStore(DB_PATH)
- hours = _parse_timeframe_to_hours(timeframe)
- hits = store.get_clusters_by_entity_or_keyword(query_terms=query_terms, hours=hours, limit=limit)
- out = []
- for c in hits:
- item = {
- "cluster_id": c.get("cluster_id"),
- "headline": c.get("headline"),
- "summary": c.get("summary"),
- "entities": c.get("entities", []),
- "keywords": c.get("keywords", []),
- "sentiment": c.get("sentiment", "neutral"),
- "importance": c.get("importance", 0.0),
- "sources": c.get("sources", []),
- "timestamp": c.get("timestamp"),
- }
- if include_articles:
- arts = c.get("articles", []) or []
- item["articles"] = [
- {
- "title": a.get("title"),
- "url": a.get("url"),
- "source": a.get("source"),
- "timestamp": a.get("timestamp"),
- }
- for a in arts
- if isinstance(a, dict)
- ]
- out.append(item)
- return out
- @mcp.tool(description="Return entities and thematic keywords commonly co-occurring with the subject in recent clusters, optionally blended with Google Trends suggestions.")
- async def get_related_recent_entities(subject: str, timeframe: str = "72h", limit: int = 10, include_trends: bool = True):
- limit = max(1, min(int(limit), 25))
- hours = _parse_timeframe_to_hours(timeframe)
- include_trends_bool = str(include_trends).strip().lower() not in {"false", "0", "no"}
- store = SQLiteClusterStore(DB_PATH)
- result = related_recent_entities(
- store=store,
- subject=subject,
- timeframe_hours=hours,
- limit=limit,
- include_trends=include_trends_bool,
- )
- return result
- @mcp.tool(description="Investigate one cluster in depth and return a concise LLM-written explanation plus key facts, "
- "entities, keywords, related entities and keywords, sentiment, importance, and articles.")
- async def get_event_summary(event_id: str, include_articles: bool = True):
- store = SQLiteClusterStore(DB_PATH)
- # Summary cache: reuse if present within TTL.
- cluster = store.get_cluster_by_id(event_id)
- if not cluster:
- return {
- "event_id": event_id,
- "error": "NOT_FOUND",
- }
- cached_summary = store.get_cluster_summary(
- cluster_id=event_id,
- ttl_hours=DEFAULT_LOOKBACK_HOURS,
- )
- def _enrich(base: dict, src_cluster: dict) -> dict:
- base["entities"] = src_cluster.get("entities", [])
- base["keywords"] = src_cluster.get("keywords", [])
- base["topic"] = src_cluster.get("topic", "other")
- base["sentiment"] = src_cluster.get("sentiment", "neutral")
- base["sentimentScore"] = src_cluster.get("sentimentScore")
- base["importance"] = src_cluster.get("importance", 0.0)
- # Related entities: from co-occurrence in this cluster's article set
- resolved = src_cluster.get("entityResolutions", []) or []
- related_ents = []
- seen_ents = {str(e).strip().lower() for e in (src_cluster.get("entities", []) or [])}
- for res in resolved:
- if isinstance(res, dict):
- label = str(res.get("canonical_label") or res.get("normalized") or "").strip()
- if label and label.lower() not in seen_ents:
- related_ents.append(label)
- seen_ents.add(label.lower())
- base["related_entities"] = related_ents[:10]
- # Related keywords: from the cluster's own keywords (thematic descriptors)
- # plus any co-occurring keywords from recent related clusters
- base["related_keywords"] = _fetch_related_keywords(store, src_cluster, event_id)
- if include_articles:
- arts = src_cluster.get("articles", []) or []
- base["articles"] = [
- {
- "title": a.get("title"),
- "url": a.get("url"),
- "source": a.get("source"),
- "timestamp": a.get("timestamp"),
- }
- for a in arts
- if isinstance(a, dict)
- ]
- return base
- if cached_summary:
- out = {
- "event_id": event_id,
- "headline": cached_summary.get("headline"),
- "mergedSummary": cached_summary.get("mergedSummary"),
- "keyFacts": cached_summary.get("keyFacts", []),
- "sources": cached_summary.get("sources", []),
- }
- out = _enrich(out, cluster)
- return out
- summary = await summarize_cluster_llm(cluster)
- store.upsert_cluster_summary(event_id, summary)
- out = {
- "event_id": event_id,
- "headline": summary.get("headline"),
- "mergedSummary": summary.get("mergedSummary"),
- "keyFacts": summary.get("keyFacts", []),
- "sources": summary.get("sources", []),
- }
- out = _enrich(out, cluster)
- return out
- def _fetch_related_keywords(store: SQLiteClusterStore, cluster: dict, event_id: str) -> list[str]:
- """Find keywords that co-occur with this cluster's entities in recent clusters.
- This gives agents thematic context: what else was being discussed alongside
- the entities in this cluster during the same window.
- """
- entities = cluster.get("entities", []) or []
- if not entities:
- return []
- # Build a set of entity terms to search with
- entity_terms = set()
- for e in entities:
- entity_terms.add(str(e).strip().lower())
- for res in (cluster.get("entityResolutions", []) or []):
- if isinstance(res, dict):
- for key in ("normalized", "canonical_label"):
- val = res.get(key)
- if val:
- entity_terms.add(str(val).strip().lower())
- entity_terms.discard("")
- if not entity_terms:
- return []
- # Find recent clusters that share any entity, collect their keywords
- # Use payload_ts lookback of 48h for co-occurrence window
- from datetime import timedelta
- cutoff = (datetime.now(timezone.utc) - timedelta(hours=48)).isoformat()
- placeholders = ",".join("?" for _ in entity_terms)
- try:
- rows = store._conn().execute(
- f"SELECT DISTINCT c.payload FROM clusters c "
- f"JOIN cluster_entities ce ON c.cluster_id = ce.cluster_id "
- f"WHERE c.payload_ts >= ? AND c.cluster_id != ? "
- f"AND ce.entity IN ({placeholders}) "
- f"ORDER BY c.payload_ts DESC LIMIT 20",
- (cutoff, event_id, *entity_terms),
- ).fetchall()
- except Exception:
- return []
- kw_counter: dict[str, int] = {}
- cluster_kws = {str(k).strip().lower() for k in (cluster.get("keywords", []) or []) if str(k).strip()}
- for (payload_text,) in rows:
- try:
- c = json.loads(payload_text)
- except Exception:
- continue
- for kw in (c.get("keywords", []) or []):
- kw_norm = str(kw).strip()
- if not kw_norm:
- continue
- kw_key = kw_norm.lower()
- # Skip keywords that already appear in this cluster
- if kw_key in cluster_kws:
- continue
- kw_counter[kw_norm] = kw_counter.get(kw_norm, 0) + 1
- # Return top keywords by co-occurrence count
- sorted_kws = sorted(kw_counter.items(), key=lambda x: -x[1])
- return [kw for kw, _ in sorted_kws[:10]]
- @mcp.tool(description="Explore what is starting to matter: surface emerging entities, thematic keywords, and phrases from recent clusters. "
- "Use timeframe to control the lookback window, topic to scope to a category, and around to find what's emerging near a specific entity. "
- "Results include signal_type (entity / keyword / phrase) for downstream filtering.")
- async def detect_emerging_topics(limit: int = 10, timeframe: str = "24h", topic: str | None = None, around: str | None = None):
- """Surface entities and phrases that are accelerating in recent clusters.
- Args:
- limit: max results to return (1-20, default 10).
- timeframe: lookback window like "4h", "24h", "3d" (default "24h").
- topic: optional coarse topic filter ("crypto", "macro", "regulation", "ai", "other").
- around: optional entity — only return entities that co-occur with this entity
- in the recent window (e.g. "Bitcoin" to find what's emerging in Bitcoin's neighborhood).
- """
- limit = max(1, min(int(limit), 20))
- hours = _parse_timeframe_to_hours(timeframe)
- half_hours = hours / 2.0
- store = SQLiteClusterStore(DB_PATH)
- # Fetch more clusters than needed so velocity stats are meaningful even for short windows.
- clusters = store.get_latest_clusters_all_topics(ttl_hours=hours, limit=500)
- # --- optional topic filter ---
- if topic:
- topic_norm = normalize_query(topic).strip().lower()
- if topic_norm:
- clusters = [c for c in clusters if (c.get("topic") or "other").strip().lower() == topic_norm]
- # --- resolve the 'around' entity ---
- around_terms: set[str] = set()
- if around:
- around_norm = normalize_query(around).strip().lower()
- if around_norm:
- resolved = resolve_entity_via_trends(around_norm)
- around_terms = {
- around_norm,
- str(resolved.get("normalized") or "").strip().lower(),
- str(resolved.get("canonical_label") or "").strip().lower(),
- }
- around_terms.discard("")
- # split clusters into first-half vs second-half by timestamp
- # clusters are already sorted most-recent-first from the store
- now = datetime.now(timezone.utc)
- def _cluster_age_hours(c: dict) -> float:
- """Return the cluster's age in hours. payload.timestamp is ISO 8601 UTC guaranteed."""
- ts = c.get("timestamp") or c.get("last_updated")
- if not ts:
- return 0.0
- try:
- dt = datetime.fromisoformat(str(ts).strip())
- if dt.tzinfo is None:
- dt = dt.replace(tzinfo=timezone.utc)
- return max(0.0, (now - dt.astimezone(timezone.utc)).total_seconds() / 3600.0)
- except Exception:
- return 0.0
- # Generic entity filter
- _generic_tokens = {"news", "latest", "breaking", "update", "updates", "report", "reports"}
- def _is_generic_entity(ent: str) -> bool:
- e = str(ent).strip().lower()
- if not e or len(e) < 4:
- return True
- if e in _generic_tokens:
- return True
- return False
- # --- accumulate signals ---
- # recent = second half of timeframe (newer), prior = first half (older)
- entity_counts_recent = Counter()
- entity_counts_prior = Counter()
- entity_importance_recent = Counter()
- entity_sources: dict[str, set] = {} # ent -> set of source names
- entity_buckets: dict[str, set] = {} # ent -> set of time-bucket indices (for sustained-spike detection)
- entity_cooccur: dict[str, Counter] = {}
- phrase_counts_recent = Counter()
- # Keyword accumulators — same scoring pipeline as entities, but tracking
- # LLM-curated thematic descriptors instead of named entities.
- kw_counts_recent = Counter()
- kw_counts_prior = Counter()
- kw_importance_recent = Counter()
- kw_sources: dict[str, set] = {}
- kw_buckets: dict[str, set] = {}
- kw_cooccur: dict[str, Counter] = {}
- entity_kw_cooccur: dict[str, Counter] = {} # entity -> Counter of co-occurring keywords
- bucket_size_hours = max(1.0, hours / 6.0) # split window into ~6 buckets
- for c in clusters:
- ents_in_cluster = [e for e in (c.get("entities", []) or []) if not _is_generic_entity(e)]
- ents_norm = [str(e).strip().lower() for e in ents_in_cluster if str(e).strip()]
- # Keywords: deduplicate per cluster so a cluster with the same keyword
- # listed twice doesn't inflate counts.
- kws_in_cluster = list(dict.fromkeys(
- str(k).strip().lower()
- for k in (c.get("keywords", []) or [])
- if str(k).strip() and not _is_generic_entity(k)
- ))
- age_h = _cluster_age_hours(c)
- is_recent = age_h <= half_hours
- bucket_idx = int(age_h / bucket_size_hours)
- # --- around filter: only count clusters that mention the target entity ---
- if around_terms:
- haystack = set(ents_norm)
- for res in c.get("entityResolutions", []) or []:
- if isinstance(res, dict):
- for key in ("normalized", "canonical_label"):
- val = res.get(key)
- if val:
- haystack.add(str(val).strip().lower())
- if not (haystack & around_terms):
- continue
- counts = entity_counts_recent if is_recent else entity_counts_prior
- imp_acc = entity_importance_recent if is_recent else None # only importance from recent window
- for ent in ents_norm:
- if _is_generic_entity(ent):
- continue
- counts[ent] += 1
- if ent not in entity_sources:
- entity_sources[ent] = set()
- src = c.get("source") or c.get("headline", "").split(" - ")[-1] if c.get("headline") else ""
- if src:
- entity_sources[ent].add(str(src))
- if ent not in entity_buckets:
- entity_buckets[ent] = set()
- entity_buckets[ent].add(bucket_idx)
- if imp_acc is not None:
- try:
- imp_acc[ent] += float(c.get("importance", 0.0) or 0.0)
- except Exception:
- pass
- # --- keyword counting (same recent/prior split as entities) ---
- kw_counts = kw_counts_recent if is_recent else kw_counts_prior
- kw_imp_acc = kw_importance_recent if is_recent else None
- for kw in kws_in_cluster:
- kw_counts[kw] += 1
- if kw not in kw_sources:
- kw_sources[kw] = set()
- src = c.get("source") or c.get("headline", "").split(" - ")[-1] if c.get("headline") else ""
- if src:
- kw_sources[kw].add(str(src))
- if kw not in kw_buckets:
- kw_buckets[kw] = set()
- kw_buckets[kw].add(bucket_idx)
- if kw_imp_acc is not None:
- try:
- kw_imp_acc[kw] += float(c.get("importance", 0.0) or 0.0) # type: ignore[assignment]
- except Exception:
- pass
- # co-occurrence (only for clusters matching the around filter, if any)
- for i in range(len(ents_norm)):
- a = ents_norm[i]
- if _is_generic_entity(a):
- continue
- if a not in entity_cooccur:
- entity_cooccur[a] = Counter()
- for j in range(len(ents_norm)):
- if i == j:
- continue
- b = ents_norm[j]
- if _is_generic_entity(b):
- continue
- entity_cooccur[a][b] += 1
- # keyword co-occurrence: which keywords appear together in the same clusters
- for i in range(len(kws_in_cluster)):
- ka = kws_in_cluster[i]
- if ka not in kw_cooccur:
- kw_cooccur[ka] = Counter()
- for j in range(len(kws_in_cluster)):
- if i == j:
- continue
- kb = kws_in_cluster[j]
- kw_cooccur[ka][kb] += 1
- # also track entity<->keyword co-occurrence (bidirectional)
- for ent in ents_norm:
- if _is_generic_entity(ent):
- continue
- kw_cooccur[ka][ent] += 1
- # and the reverse: entity -> keyword
- if ent not in entity_kw_cooccur:
- entity_kw_cooccur[ent] = Counter()
- entity_kw_cooccur[ent][ka] += 1
- # bigram phrases (recent only)
- if is_recent:
- text = f"{c.get('headline', '')} {c.get('summary', '')}"
- words = re.findall(r"[A-Za-z][A-Za-z0-9\-]{2,}", text.lower())
- for i in range(len(words) - 1):
- phrase = f"{words[i]} {words[i+1]}"
- if len(phrase) > 6:
- phrase_counts_recent[phrase] += 1
- # --- score entities ---
- all_entities = set(entity_counts_recent.keys()) | set(entity_counts_prior.keys())
- scored = []
- for ent in all_entities:
- recent_n = entity_counts_recent.get(ent, 0)
- prior_n = entity_counts_prior.get(ent, 0)
- total_n = recent_n + prior_n
- if total_n < 1:
- continue
- # velocity: ratio of recent vs prior (smoothed to avoid division noise)
- # 0 prior → velocity = recent_n (pure emergence)
- # equal → velocity = 1.0 (steady)
- velocity = (recent_n + 0.5) / (prior_n + 0.5)
- # recency weight: what fraction of total hits are in the recent window
- recency_ratio = recent_n / total_n
- # source diversity: how many distinct outlets
- n_sources = len(entity_sources.get(ent, set()))
- # sustained: how many distinct time buckets did it appear in (max ~6)
- n_buckets = len(entity_buckets.get(ent, set()))
- # average importance (recent window only)
- avg_imp = (entity_importance_recent.get(ent, 0.0) / max(1, recent_n)) if recent_n > 0 else 0.0
- composed_score = (
- 0.35 * min(1.0, math.log1p(velocity) / math.log1p(4.0)) + # velocity (0..1, 4x = max)
- 0.25 * recency_ratio + # recency concentration
- 0.15 * min(1.0, n_sources / 5.0) + # source diversity
- 0.10 * min(1.0, n_buckets / 4.0) + # sustained (>1 bucket)
- 0.15 * min(1.0, avg_imp) # importance
- )
- related = []
- if ent in entity_cooccur:
- for other, _cnt in entity_cooccur[ent].most_common(10):
- if other != ent:
- related.append(other)
- related_kws = []
- if ent in entity_kw_cooccur:
- # Build a set of related entity names (lowercased) to deduplicate
- # keywords that are already represented in related_entities
- related_ent_names = {e.strip().lower() for e in related}
- # Also include the entity itself and its common aliases
- related_ent_names.add(ent.strip().lower())
- for kw, _cnt in entity_kw_cooccur[ent].most_common(10):
- kw_lower = kw.strip().lower()
- # Skip keywords that are just a related entity name (substring match)
- if any(kw_lower in ent_name or ent_name in kw_lower
- for ent_name in related_ent_names):
- continue
- related_kws.append(kw)
- if len(related_kws) >= 5:
- break
- scored.append({
- "topic": ent,
- "trend_score": min(0.99, round(composed_score, 3)),
- "related_entities": related[:5] if related else [ent],
- "related_keywords": related_kws[:5],
- "velocity": round(velocity, 2),
- "recent_count": recent_n,
- "prior_count": prior_n,
- "source_count": n_sources,
- "avg_importance": round(avg_imp, 3),
- "signal_type": "entity",
- })
- # --- score keywords (same velocity/recency/source/sustained/importance formula) ---
- all_keywords = set(kw_counts_recent.keys()) | set(kw_counts_prior.keys())
- kw_scored = []
- for kw in all_keywords:
- # Skip keywords that are already scored as entities — entity signal is
- # higher quality (proper nouns, resolved identities).
- if kw in all_entities:
- continue
- recent_n = kw_counts_recent.get(kw, 0)
- prior_n = kw_counts_prior.get(kw, 0)
- total_n = recent_n + prior_n
- if total_n < 1:
- continue
- velocity = (recent_n + 0.5) / (prior_n + 0.5)
- recency_ratio = recent_n / total_n
- n_sources = len(kw_sources.get(kw, set()))
- n_buckets = len(kw_buckets.get(kw, set()))
- avg_imp = (kw_importance_recent.get(kw, 0.0) / max(1, recent_n)) if recent_n > 0 else 0.0
- composed_score = (
- 0.35 * min(1.0, math.log1p(velocity) / math.log1p(4.0)) +
- 0.25 * recency_ratio +
- 0.15 * min(1.0, n_sources / 5.0) +
- 0.10 * min(1.0, n_buckets / 4.0) +
- 0.15 * min(1.0, avg_imp)
- )
- kw_related_kws = []
- kw_related_ents = []
- if kw in kw_cooccur:
- for other, _cnt in kw_cooccur[kw].most_common(10):
- if other == kw:
- continue
- # If this co-occurring term is a known entity, route to related_entities
- if other in all_entities:
- kw_related_ents.append(other)
- else:
- kw_related_kws.append(other)
- if len(kw_related_kws) >= 5 and len(kw_related_ents) >= 5:
- break
- kw_scored.append({
- "topic": kw,
- "trend_score": min(0.99, round(composed_score, 3)),
- "related_entities": kw_related_ents[:5],
- "related_keywords": kw_related_kws[:5],
- "velocity": round(velocity, 2),
- "recent_count": recent_n,
- "prior_count": prior_n,
- "source_count": n_sources,
- "avg_importance": round(avg_imp, 3),
- "signal_type": "keyword",
- })
- # sort keywords by score descending
- kw_scored.sort(key=lambda x: (-x["trend_score"], -x["velocity"], x["topic"]))
- # sort by composed score descending
- scored.sort(key=lambda x: (-x["trend_score"], -x["velocity"], x["topic"]))
- # --- merge: entities first, then keywords, then phrases ---
- emerging = list(scored) # start with entities
- seen_topics = {item["topic"] for item in emerging}
- for kw_item in kw_scored:
- if kw_item["topic"] not in seen_topics:
- emerging.append(kw_item)
- seen_topics.add(kw_item["topic"])
- # --- add phrase signals (only from recent window) ---
- for phrase, count in phrase_counts_recent.most_common(limit * 2):
- if phrase in seen_topics:
- continue
- emerging.append({
- "topic": phrase.title(),
- "trend_score": min(0.99, round(0.30 + 0.15 * min(count, 5), 2)),
- "related_entities": [],
- "related_keywords": [],
- "velocity": None,
- "recent_count": count,
- "prior_count": 0,
- "source_count": 0,
- "avg_importance": 0.0,
- "signal_type": "phrase",
- })
- seen_topics.add(phrase)
- if len(emerging) >= limit:
- break
- return emerging[:limit]
- @mcp.tool(description="Investigate whether sentiment around an entity or keyword is positive, negative, or neutral over a chosen lookback window. "
- "Matches clusters by both named entities and thematic keywords.")
- async def get_news_sentiment(entity: str, timeframe: str = "24h"):
- store = SQLiteClusterStore(DB_PATH)
- ent = normalize_query(entity).strip().lower()
- resolved = resolve_entity_via_trends(ent)
- query_terms = {
- ent,
- str(resolved.get("normalized") or "").strip().lower(),
- str(resolved.get("canonical_label") or "").strip().lower(),
- str(resolved.get("mid") or "").strip().lower(),
- }
- query_terms = {q for q in query_terms if q}
- if not ent:
- return {
- "entity": entity,
- "sentiment": "neutral",
- "score": 0.0,
- "cluster_count": 0,
- }
- # timeframe: accept '24h' or '24'
- tf = str(timeframe).strip().lower()
- try:
- hours = int(tf[:-1]) if tf.endswith("h") else int(tf)
- except Exception:
- hours = 24
- hours = max(1, min(int(hours), 168))
- clusters = store.get_clusters_by_entity_or_keyword(query_terms=query_terms, hours=hours, limit=500)
- matched = clusters
- if not matched:
- return {
- "entity": entity,
- "sentiment": "neutral",
- "score": 0.0,
- "cluster_count": 0,
- }
- scores = []
- for c in matched:
- s = c.get("sentimentScore")
- if s is not None:
- try:
- scores.append(float(s))
- except Exception:
- pass
- avg_score = sum(scores) / len(scores) if scores else 0.0
- # Keep the label aligned with the numeric score.
- # Small magnitudes are treated as neutral to avoid noisy label flips.
- if avg_score >= 0.15:
- sentiment = "positive"
- elif avg_score <= -0.15:
- sentiment = "negative"
- else:
- sentiment = "neutral"
- return {
- "entity": entity,
- "sentiment": sentiment,
- "score": round(avg_score, 3),
- "cluster_count": len(matched),
- }
- @mcp.tool(description="Describe the server tool surface, how tools fit together, and output conventions for downstream agents.")
- async def get_capabilities():
- return {
- "server": {
- "name": "news-mcp",
- "purpose": "Recent news clusters with entities and thematic keywords, entity/keyword drill-down, sentiment, emerging topics, and related-entity expansion.",
- "output_conventions": {
- "cluster_ids": "Do not surface cluster_id in user-facing prose unless explicitly requested; treat it as internal navigation metadata.",
- "sources": "Always preserve and display sources when summarizing a cluster or entity result.",
- "timestamps": "Mention timestamps consistently when comparing multiple clusters or when recency matters.",
- "clusters": "Each cluster includes entities (named entities with optional MID/canonical_label) and keywords (thematic descriptors). Both are searchable; entities are higher-signal, keywords capture subject-matter themes.",
- },
- },
- "tools": NEWS_TOOL_CARDS,
- "recipes": NEWS_COMPOSITION_RECIPES,
- "example_chains": NEWS_EXAMPLE_CHAINS,
- "agent_tips": NEWS_AGENT_TIPS,
- "guidance": [
- "Use get_latest_events for a tail, get_events_for_entity for entity/keyword deep dives, and get_related_recent_entities for neighborhood expansion.",
- "Prefer normalized/canonical entities when possible, but the server will resolve common aliases and MIDs for you.",
- "When presenting results to users, summarize the cluster; avoid exposing internal IDs unless they are needed for follow-up tool calls.",
- "For emerging topics, use detect_emerging_topics with timeframe and around parameters. Signal types: entity (named entity, highest quality), keyword (thematic descriptor), phrase (headline bigram). High velocity + high source_count = strong signal.",
- "get_events_for_entity and get_news_sentiment match both entities and thematic keywords — use keywords when the subject is a theme rather than a named entity.",
- ],
- }
- def _parse_timeframe_to_hours(timeframe: str) -> int:
- tf = str(timeframe).strip().lower()
- try:
- if tf.endswith("d"):
- days = int(tf[:-1])
- return max(1, days * 24)
- if tf.endswith("h"):
- return max(1, int(tf[:-1]))
- return max(1, int(tf))
- except Exception:
- return 24
- from contextlib import asynccontextmanager
- @asynccontextmanager
- async def _lifespan(app: FastAPI):
- asyncio.ensure_future(_background_refresh_loop())
- yield
- app = FastAPI(title="News MCP Server", lifespan=_lifespan)
- logger = logging.getLogger("news_mcp.startup")
- app.mount("/mcp", mcp.sse_app())
- # Shared store — single connection pool
- _shared_store = SQLiteClusterStore(DB_PATH)
- _refresh_lock = asyncio.Lock()
- _refresh_started = False
- async def _background_refresh_loop():
- """Non-blocking background refresher: prune then poll.
- Protected by an async lock so a second event-loop wake-up cannot
- start a parallel ingestion cycle.
- """
- global _refresh_started
- async with _refresh_lock:
- if _refresh_started:
- return
- _refresh_started = True
- logger.info("news-mcp llm config: %s", active_llm_config())
- # Prune off-thread so we do not block the event loop
- prune_result = await asyncio.to_thread(
- _shared_store.prune_if_due,
- NEWS_PRUNING_ENABLED,
- NEWS_RETENTION_DAYS,
- NEWS_PRUNE_INTERVAL_HOURS,
- )
- logger.info("startup prune_result=%s", prune_result)
- if not NEWS_BACKGROUND_REFRESH_ENABLED:
- return
- async def _loop():
- if not NEWS_BACKGROUND_REFRESH_ON_START:
- logger.info("background refresh delayed start interval_seconds=%s", NEWS_REFRESH_INTERVAL_SECONDS)
- await asyncio.sleep(float(NEWS_REFRESH_INTERVAL_SECONDS))
- while True:
- try:
- logger.info("background refresh tick start")
- await refresh_clusters(topic=None, limit=200)
- logger.info("background refresh tick complete")
- except Exception:
- logger.exception("background refresh tick failed")
- await asyncio.sleep(float(NEWS_REFRESH_INTERVAL_SECONDS))
- asyncio.create_task(_loop())
- @app.get("/")
- def root():
- return {
- "status": "ok",
- "transport": "fastmcp+sse",
- "mount": "/mcp",
- "tools": [
- "get_latest_events",
- "get_events_for_entity",
- "get_event_summary",
- "detect_emerging_topics",
- "get_news_sentiment",
- "get_related_recent_entities",
- "get_capabilities",
- ],
- "refresh": {
- "enabled": NEWS_BACKGROUND_REFRESH_ENABLED,
- "interval_seconds": NEWS_REFRESH_INTERVAL_SECONDS,
- },
- "retention": {
- "lookback_hours": DEFAULT_LOOKBACK_HOURS,
- "retention_days": NEWS_RETENTION_DAYS,
- },
- "pruning": {
- "enabled": NEWS_PRUNING_ENABLED,
- "interval_hours": NEWS_PRUNE_INTERVAL_HOURS,
- },
- }
- # ------------------------------------------------------------------
- # Dashboard REST API endpoints
- # ------------------------------------------------------------------
- from fastapi.staticfiles import StaticFiles
- from fastapi.responses import JSONResponse
- app.mount("/dashboard", StaticFiles(directory="dashboard", html=True), name="dashboard")
- import logging as _log
- API_LOG = _log.getLogger("news_mcp.api")
- def _api_ok(data: dict) -> dict:
- return data
- def _api_err(exc: Exception, ctx: str) -> JSONResponse:
- API_LOG.exception(f"API error in {ctx}")
- return JSONResponse(status_code=500, content={"error": str(exc), "ctx": ctx})
- @app.get("/api/v1/health")
- def api_health():
- """Extended health + dashboard stats."""
- try:
- store = _shared_store
- stats = store.get_dashboard_stats()
- stats["version"] = _VERSION_HASH
- return stats
- except Exception as e:
- return _api_err(e, "health")
- @app.get("/api/v1/clusters")
- def api_clusters(
- topic: str | None = None,
- hours: int = 24,
- limit: int = 50,
- offset: int = 0,
- ):
- """Paginated cluster listing."""
- try:
- store = _shared_store
- result = store.get_clusters_page(topic=topic, hours=hours, limit=limit, offset=offset)
- return {"clusters": result["clusters"], "total": result["total"], "topic": topic or "all", "hours": hours}
- except Exception as e:
- return _api_err(e, f"clusters(topic={topic},hours={hours})")
- @app.get("/api/v1/sentiment-series")
- def api_sentiment_series(
- topic: str | None = None,
- hours: int = 24,
- bucket_hours: float = 1.0,
- ):
- """Sentiment time-series for Chart.js."""
- try:
- store = _shared_store
- series = store.get_sentiment_series(topic=topic, hours=hours, bucket_hours=bucket_hours)
- return {"series": series, "topic": topic or "all"}
- except Exception as e:
- return _api_err(e, f"sentiment(topic={topic})")
- @app.get("/api/v1/entities")
- def api_entities(
- hours: int = 24,
- limit: int = 30,
- ):
- """Top entity frequencies."""
- try:
- store = _shared_store
- entities = store.get_entity_frequencies(hours=hours, limit=limit)
- return {"entities": entities, "hours": hours}
- except Exception as e:
- return _api_err(e, f"entities(hours={hours})")
- @app.get("/api/v1/keywords")
- def api_keywords(
- hours: int = 24,
- limit: int = 30,
- ):
- """Top keyword frequencies (thematic descriptors, excluding terms already counted as entities)."""
- try:
- store = _shared_store
- keywords = store.get_keyword_frequencies(hours=hours, limit=limit)
- return {"keywords": keywords, "hours": hours}
- except Exception as e:
- return _api_err(e, f"keywords(hours={hours})")
- @app.get("/api/v1/clusters/by-entity")
- def api_clusters_by_entity(
- entity: str,
- hours: int = 168,
- limit: int = 50,
- offset: int = 0,
- ):
- """Return clusters matching an entity, filtered by event time via SQL junction table."""
- try:
- store = _shared_store
- return store.get_clusters_by_entity(
- entity=entity.strip().lower(),
- hours=hours,
- limit=limit,
- offset=offset,
- )
- except Exception as e:
- return _api_err(e, f"by-entity(entity={entity},hours={hours})")
- @app.get("/api/v1/clusters/by-keyword")
- def api_clusters_by_keyword(
- keyword: str,
- hours: int = 168,
- limit: int = 50,
- offset: int = 0,
- ):
- """Return clusters matching a keyword, filtered by event time via SQL junction table."""
- try:
- store = _shared_store
- return store.get_clusters_by_keyword(
- keyword=keyword.strip().lower(),
- hours=hours,
- limit=limit,
- offset=offset,
- )
- except Exception as e:
- return _api_err(e, f"by-keyword(keyword={keyword},hours={hours})")
- @app.get("/api/v1/cluster/{cluster_id}")
- def api_cluster_detail(cluster_id: str):
- """Full cluster detail for drill-down."""
- try:
- store = _shared_store
- detail = store.get_cluster_detail(cluster_id)
- if not detail:
- return JSONResponse(status_code=404, content={"error": "Cluster not found", "id": cluster_id})
- return detail
- except Exception as e:
- return _api_err(e, f"detail({cluster_id})")
- # ------------------------------------------------------------------
- # Feed management endpoints (toggle on/off from dashboard)
- # ------------------------------------------------------------------
- @app.get("/api/v1/feeds")
- def api_feeds():
- """List all configured feeds with enabled/disabled status."""
- try:
- store = SQLiteClusterStore(DB_PATH)
- feed_list = store.get_feed_state_list()
- configured = _configured_feed_urls()
- return {
- "feeds": feed_list,
- "configured_urls": configured,
- }
- except Exception as e:
- return _api_err(e, "feeds")
- @app.post("/api/v1/feeds/toggle")
- async def api_feed_toggle(feed_url: str = Form(), enabled: bool = Form()):
- """Toggle a feed's enabled state."""
- try:
- store = SQLiteClusterStore(DB_PATH)
- ok = store.set_feed_enabled(feed_url.strip(), enabled)
- if not ok:
- return JSONResponse(
- status_code=404,
- content={"error": f"Feed not found: {feed_url}"},
- )
- return {"ok": True, "feed_url": feed_url.strip(), "enabled": enabled}
- except Exception as e:
- return _api_err(e, f"toggle({feed_url})")
- @app.get("/health")
- def health():
- return {
- "status": "ok",
- "uptime": round(time.monotonic() - _PROCESS_STARTED_AT, 3),
- "version": _VERSION_HASH,
- }
|