| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498 |
- from __future__ import annotations
- import asyncio
- import hashlib
- import json
- import logging
- import math
- import re
- import time
- from collections import Counter
- from datetime import datetime, timezone
- from pathlib import Path
- from fastapi import FastAPI, Form
- from mcp.server.fastmcp import FastMCP
- from mcp.server.transport_security import TransportSecuritySettings
- from news_mcp.config import DEFAULT_LOOKBACK_HOURS, DEFAULT_TOPICS, DB_PATH
- from news_mcp.config import (
- NEWS_FEED_URL,
- NEWS_FEED_URLS,
- NEWS_PRUNE_INTERVAL_HOURS,
- NEWS_PRUNING_ENABLED,
- NEWS_REFRESH_INTERVAL_SECONDS,
- NEWS_BACKGROUND_REFRESH_ENABLED,
- NEWS_BACKGROUND_REFRESH_ON_START,
- NEWS_RETENTION_DAYS,
- )
- from news_mcp.jobs.poller import refresh_clusters
- from news_mcp.storage.sqlite_store import SQLiteClusterStore
- from news_mcp.enrichment.llm_enrich import summarize_cluster_llm
- from news_mcp.trends_resolution import resolve_entity_via_trends
- from news_mcp.llm import active_llm_config
- from news_mcp.entity_normalize import normalize_query
- from news_mcp.related_entities import related_recent_entities
- logging.basicConfig(
- level=logging.INFO,
- format="%(asctime)s %(levelname)s %(name)s: %(message)s",
- )
- _PROCESS_STARTED_AT = time.monotonic()
- _PACKAGE_DIR = Path(__file__).resolve().parent
- def _compute_version_hash() -> str:
- """SHA-256 of all .py files under news_mcp/, sorted by relative path.
- Deterministic across machines and environments — no git dependency.
- Works identically in Docker and native runs.
- """
- hasher = hashlib.sha256()
- for f in sorted(_PACKAGE_DIR.rglob("*.py")):
- try:
- hasher.update(f.read_bytes())
- except OSError:
- continue
- return hasher.hexdigest()[:9]
- _VERSION_HASH = _compute_version_hash()
- mcp = FastMCP(
- "news-mcp",
- transport_security=TransportSecuritySettings(enable_dns_rebinding_protection=False),
- )
- def _cluster_entity_haystack(cluster: dict) -> list[str]:
- """Collect the normalized entity + keyword clues attached to a cluster."""
- values: list[str] = []
- for ent in cluster.get("entities", []) or []:
- values.append(str(ent).strip().lower())
- for res in cluster.get("entityResolutions", []) or []:
- if not isinstance(res, dict):
- continue
- for key in ("normalized", "canonical_label", "mid"):
- val = res.get(key)
- if val:
- values.append(str(val).strip().lower())
- # Keywords are LLM-curated thematic descriptors — include them in the
- # searchable haystack so entity/theme queries match on subject-matter
- # signals, not just named entities.
- for kw in cluster.get("keywords", []) or []:
- values.append(str(kw).strip().lower())
- return [v for v in values if v]
- def _parse_cluster_timestamp(value) -> datetime:
- """Parse a stored cluster timestamp.
- payload.timestamp is guaranteed ISO 8601 UTC (YYYY-MM-DDTHH:MM:SS+00:00)
- at write time. Only datetime.fromisoformat is needed — no RFC 2822 fallback.
- """
- if not value:
- return datetime.min.replace(tzinfo=timezone.utc)
- text = str(value).strip()
- if not text:
- return datetime.min.replace(tzinfo=timezone.utc)
- try:
- dt = datetime.fromisoformat(text)
- if dt.tzinfo is None:
- dt = dt.replace(tzinfo=timezone.utc)
- return dt.astimezone(timezone.utc)
- except Exception:
- return datetime.min.replace(tzinfo=timezone.utc)
- def _sort_clusters_by_recency(clusters: list[dict]) -> list[dict]:
- return sorted(
- clusters,
- key=lambda c: (
- _parse_cluster_timestamp(c.get("timestamp")),
- float(c.get("importance", 0.0) or 0.0),
- ),
- reverse=True,
- )
- def _tool_card(name: str, description: str, inputs: list[dict], outputs: list[str], notes: list[str] | None = None) -> dict:
- return {
- "name": name,
- "description": description,
- "inputs": inputs,
- "outputs": outputs,
- "notes": notes or [],
- }
- NEWS_TOOL_CARDS = [
- _tool_card(
- "get_feeds",
- "List all configured RSS feeds with their enabled/disabled status.",
- [],
- ["feeds[]: {feed_key, enabled, last_hash, last_item_count, updated_at}"],
- ["Use this to see which feeds are currently active or disabled."],
- ),
- _tool_card(
- "toggle_feed",
- "Enable or disable a specific RSS feed by URL.",
- [
- {"name": "feed_url", "type": "string", "meaning": "the feed URL to toggle"},
- {"name": "enabled", "type": "boolean", "meaning": "true to enable, false to disable"},
- ],
- ["ok", "feed_key", "enabled"],
- ["Changes take effect on the next refresh cycle."],
- ),
- _tool_card(
- "get_latest_events",
- "Get the newest deduplicated clusters for a topic or resolved entity-like query.",
- [
- {"name": "topic", "type": "string", "default": "all topics", "meaning": "coarse category (crypto, macro, regulation, ai, other), entity-like topic, or omit for all topics"},
- {"name": "limit", "type": "integer", "default": 5, "range": "1-20"},
- {"name": "include_articles", "type": "boolean", "default": False},
- ],
- ["headline", "summary", "entities", "keywords", "sentiment", "importance", "sources", "timestamp", "articles?"],
- ["Use when you want the freshest clusters. Each cluster includes both named entities and LLM-curated thematic keywords describing what the story is about."],
- ),
- _tool_card(
- "get_events_for_entity",
- "Search recent clusters for a person, place, company, theme, or keyword by matching entities and thematic keywords.",
- [
- {"name": "entity", "type": "string", "meaning": "entity label, phrase, or keyword to search for"},
- {"name": "timeframe", "type": "string", "default": "24h", "examples": ["24h", "72h", "3d"]},
- {"name": "limit", "type": "integer", "default": 10, "range": "1-30"},
- {"name": "include_articles", "type": "boolean", "default": False},
- ],
- ["headline", "summary", "entities", "keywords", "sentiment", "importance", "sources", "timestamp", "articles?"],
- ["Matches against both named entities and thematic keywords. Use this for an entity-centered or theme-centered deep dive."],
- ),
- _tool_card(
- "get_event_summary",
- "Produce a concise LLM-written explanation for one cluster and key facts.",
- [
- {"name": "event_id", "type": "string", "meaning": "cluster_id; do not surface in user-facing prose"},
- {"name": "include_articles", "type": "boolean", "default": True},
- ],
- ["headline", "mergedSummary", "keyFacts", "sources", "entities", "keywords", "related_entities", "related_keywords", "topic", "sentiment", "importance", "articles"],
- ["Rich cluster drill-down. Returns LLM summary + cluster metadata + articles. Defaults to include articles."],
- ),
- _tool_card(
- "detect_emerging_topics",
- "Surface emerging entities, thematic keywords, and phrases that are accelerating in the recent window.",
- [
- {"name": "limit", "type": "integer", "default": 10, "range": "1-20"},
- {"name": "timeframe", "type": "string", "default": "24h", "examples": ["4h", "24h", "3d"]},
- {"name": "topic", "type": "string", "default": "all topics", "examples": ["crypto", "macro", "regulation", "ai", "other"]},
- {"name": "around", "type": "string", "default": "none", "meaning": "entity to scope results to its neighborhood (e.g. \"Bitcoin\")"},
- ],
- ["topic", "trend_score", "velocity", "recent_count", "prior_count", "source_count", "related_entities", "related_keywords", "signal_type"],
- ["Use timeframe to control lookback, topic to scope to a category, around to find what's emerging near a specific entity. Signal types: entity (named entity), keyword (thematic descriptor), phrase (headline bigram). Check velocity and source_count to distinguish real spikes from noise."],
- ),
- _tool_card(
- "get_news_sentiment",
- "Estimate sentiment around an entity or keyword over a lookback window.",
- [
- {"name": "entity", "type": "string", "meaning": "entity label, phrase, or keyword to analyze"},
- {"name": "timeframe", "type": "string", "default": "24h"},
- ],
- ["entity", "sentiment", "score", "cluster_count"],
- ["Matches clusters by entities and keywords. Use after locating a cluster set or entity neighborhood."],
- ),
- _tool_card(
- "get_related_recent_entities",
- "Find entities and thematic keywords commonly co-occurring with a subject in recent clusters, optionally blended with Google Trends suggestions.",
- [
- {"name": "subject", "type": "string", "meaning": "canonical entity or subject phrase"},
- {"name": "timeframe", "type": "string", "default": "72h"},
- {"name": "limit", "type": "integer", "default": 10, "range": "1-25"},
- {"name": "include_trends", "type": "boolean", "default": True},
- ],
- ["subject", "related[].normalized", "related[].canonical_label", "related[].mid", "related[].sources", "related[].scores"],
- ["Use this to drill from a subject into related entities and themes, then feed results into get_events_for_entity."],
- ),
- ]
- NEWS_COMPOSITION_RECIPES = [
- {
- "name": "fresh-news-tail",
- "steps": [
- "get_latest_events(topic=...)",
- "optionally get_event_summary(event_id=...) for the strongest cluster",
- ],
- "notes": ["Best for a quick tail of what is happening now. Omit topic for all topics, or pass crypto/macro/regulation/ai/other to filter."]
- },
- {
- "name": "entity-deep-dive",
- "steps": [
- "get_events_for_entity(entity=...)",
- "get_event_summary(event_id=...)",
- "get_news_sentiment(entity=..., timeframe=...)",
- ],
- "notes": ["Prefer canonical entity labels when you have them; the server normalizes for you."],
- },
- {
- "name": "subject-neighborhood",
- "steps": [
- "get_related_recent_entities(subject=...)",
- "for each strong related entity, call get_events_for_entity(entity=...)",
- ],
- "notes": ["Use this when you want a graph-like expansion around a subject."]
- },
- {
- "name": "emerging-signal",
- "steps": [
- "detect_emerging_topics(limit=..., timeframe=..., topic=..., around=...)",
- "choose a topic/entity from the results",
- "get_events_for_entity(entity=...)",
- "get_news_sentiment(entity=...)",
- ],
- "notes": ["Use timeframe to control lookback (e.g. \"4h\" for what's hot right now, \"3d\" for weekly trends), topic to scope to a category, around to find what's emerging near a specific entity. Check velocity and source_count to distinguish real spikes from noise."],
- },
- ]
- NEWS_AGENT_TIPS = [
- "If you need a fast answer, start with get_latest_events, then summarize the strongest cluster with get_event_summary.",
- "If a user asks about a person/place/company/theme, use get_events_for_entity before broadening to get_related_recent_entities.",
- "Treat cluster_id as an internal cursor, not user-facing output; use it only for follow-up tool calls.",
- "When describing clusters, keep sources and timestamps visible so the user can assess recency and provenance.",
- "Prefer a short chain of tools over many parallel calls unless you are building a neighborhood map or comparison table.",
- "For tricky names, rely on the server's resolver instead of inventing alias rules in the client.",
- "Use detect_emerging_topics with timeframe=\"4h\" for what's hot right now, timeframe=\"3d\" for weekly trends. Use topic= to scope to a category, around= to find what's emerging near a specific entity. Check velocity to distinguish accelerating signals from steady-state ones. Filter by signal_type to focus on entities, keywords, or phrases. Each result also includes related_keywords for thematic context.",
- "get_event_summary returns a rich result: headline, mergedSummary, keyFacts, entities, keywords, related_entities, related_keywords, topic, sentiment, importance, and articles (included by default). Use it for full cluster drill-down.",
- "Each cluster contains both entities (named entities with identity resolution) and keywords (thematic descriptors). Use keywords to understand what a story is about beyond the named entities.",
- "Use detect_emerging_topics with multiple timeframes (e.g. 4h vs 3d) and compare results to distinguish what's hot right now vs what's persistently trending. related_keywords help identify thematic neighborhoods.",
- ]
- NEWS_EXAMPLE_CHAINS = [
- {
- "task": "What is happening now?",
- "chain": [
- "get_latest_events(topic=...)",
- "get_event_summary(event_id=...) if one cluster looks important",
- ],
- },
- {
- "task": "Deep dive on an entity",
- "chain": [
- "get_events_for_entity(entity=..., timeframe=...)",
- "get_news_sentiment(entity=..., timeframe=...)",
- "get_event_summary(event_id=...) for the strongest cluster",
- ],
- },
- {
- "task": "Broaden from a subject",
- "chain": [
- "get_related_recent_entities(subject=..., include_trends=true)",
- "get_events_for_entity(entity=...) for the strongest related entities",
- ],
- },
- {
- "task": "Find what is emerging",
- "chain": [
- "detect_emerging_topics(limit=..., timeframe=..., topic=..., around=...) with optional scoping",
- "get_events_for_entity(entity=...) on one or two emerging terms",
- ],
- },
- {
- "task": "What's heating up around a specific entity",
- "chain": [
- "detect_emerging_topics(around=\"<entity>\", timeframe=\"4h\")",
- "get_events_for_entity(entity=...) on the top emerging neighbor",
- ],
- },
- {
- "task": "Full investigation pipeline",
- "chain": [
- "detect_emerging_topics(limit=20, timeframe=\"3d\")",
- "pick an emerging entity/keyword and note its related_entities and related_keywords",
- "get_event_summary(event_id=...) on the top cluster for full context including articles",
- "get_news_sentiment(entity=...) to gauge tone around the emerging topic",
- "detect_emerging_topics(around=<entity>, timeframe=\"4h\") to scout its neighborhood",
- ],
- },
- ]
- def _configured_feed_urls() -> list[str]:
- """Return the configured feed URLs from environment variables."""
- urls = [u.strip() for u in NEWS_FEED_URLS.split(",") if u.strip()]
- if not urls:
- urls = [NEWS_FEED_URL]
- return urls
- @mcp.tool(description="List all configured RSS feeds with their current enabled/disabled status.")
- async def get_feeds() -> list[dict]:
- """Return each feed URL with its enabled flag, last fetch stats, and timestamps."""
- store = SQLiteClusterStore(DB_PATH)
- return store.get_feed_state_list()
- @mcp.tool(description="Enable or disable a specific RSS feed by URL.")
- async def toggle_feed(feed_url: str, enabled: bool) -> dict:
- """Toggle a feed's active/inactive state.
- Changes take effect on the next background refresh cycle.
- Returns the updated feed state.
- """
- store = SQLiteClusterStore(DB_PATH)
- store.set_feed_enabled(feed_url.strip(), enabled)
- updated = store.get_feed_state(feed_url.strip())
- return {"ok": True, "feed_key": feed_url.strip(), "enabled": enabled, "details": updated}
- @mcp.tool(description="Debug dedup: inspect whether an article URL was already processed, which cluster it belongs to, and what similarity signals it would produce against existing clusters.")
- async def debug_dedup(url: str, title: str | None = None) -> dict:
- """Given an article URL (and optional title), report dedup status.
- Returns:
- - seen: whether the article_key is in seen_articles
- - article_key: the identity key derived from the URL
- - cluster_id: which cluster it belongs to (if seen)
- - similarity_signals: if title is provided, compute signals against
- the top-N most similar existing clusters
- """
- from news_mcp.article_identity import article_key, article_content_hash
- from news_mcp.dedup.cluster import _title_similarity, _normalize_title, _signals, _is_match
- from news_mcp.config import NEWS_EMBEDDINGS_ENABLED
- art = {"url": url, "title": title or ""}
- akey = article_key(art)
- result = {"url": url, "article_key": akey}
- store = SQLiteClusterStore(DB_PATH)
- with store._conn() as conn:
- # Check seen_articles
- row = conn.execute(
- "SELECT cluster_id, first_seen, url FROM seen_articles WHERE article_key=?",
- (akey,),
- ).fetchone()
- if row:
- result["seen"] = True
- result["cluster_id"] = row[0]
- result["first_seen"] = row[1]
- result["stored_url"] = row[2]
- else:
- result["seen"] = False
- # If title provided, compute similarity against top clusters
- if title:
- # Get recent clusters for comparison
- recent = store.get_latest_clusters_all_topics(ttl_hours=24, limit=20)
- signals_list = []
- for c in recent:
- c_title = c.get("headline", "")
- sigs = _signals(art, c)
- matched, signal_name, signal_value = _is_match(
- sigs, embeddings_enabled=NEWS_EMBEDDINGS_ENABLED,
- )
- signals_list.append({
- "cluster_id": c.get("cluster_id", "")[:12],
- "headline": c_title[:60],
- "title_sim": round(sigs["title"], 3),
- "jaccard": round(sigs["jaccard"], 3),
- "cosine": round(sigs["cosine"], 3) if sigs["cosine"] else None,
- "matched": matched,
- "match_signal": signal_name,
- "match_value": round(signal_value, 3) if signal_value else None,
- })
- # Sort by best title similarity
- signals_list.sort(key=lambda x: x["title_sim"], reverse=True)
- result["similarity_signals"] = signals_list[:10]
- result["title_threshold"] = 0.75 # DEFAULT_TITLE_THRESHOLD
- result["jaccard_threshold"] = 0.55 # DEFAULT_JACCARD_THRESHOLD
- return result
- @mcp.tool(description="Investigate a topic and return the newest deduplicated news clusters with entities and thematic keywords, sorted by recency.")
- async def get_latest_events(topic: str | None = None, limit: int = 5, include_articles: bool = False):
- limit = max(1, min(int(limit), 20))
- # When topic is omitted, search across all topics (no topic filter).
- # When topic is provided and matches a known topic, filter by that topic.
- # Otherwise treat the value as an entity-like query.
- topic_norm = normalize_query(topic).lower() if topic else ""
- resolved = resolve_entity_via_trends(topic_norm) if topic_norm else {}
- allowed = {t.lower() for t in DEFAULT_TOPICS}
- is_topic = topic_norm in allowed
- is_all_topics = not topic_norm
- query_terms = {
- topic_norm,
- str(resolved.get("normalized") or "").strip().lower(),
- str(resolved.get("canonical_label") or "").strip().lower(),
- str(resolved.get("mid") or "").strip().lower(),
- }
- query_terms = {q for q in query_terms if q}
- store = SQLiteClusterStore(DB_PATH)
- if is_all_topics:
- # No topic specified: return freshest clusters across all topics.
- clusters = store.get_latest_clusters_all_topics(ttl_hours=DEFAULT_LOOKBACK_HOURS, limit=limit)
- elif is_topic:
- # Cache-first: only refresh if we currently have no fresh clusters for this topic.
- clusters = store.get_latest_clusters(topic=topic_norm, ttl_hours=DEFAULT_LOOKBACK_HOURS, limit=limit)
- if not clusters:
- await refresh_clusters(topic=topic_norm, limit=200)
- clusters = store.get_latest_clusters(topic=topic_norm, ttl_hours=DEFAULT_LOOKBACK_HOURS, limit=limit)
- else:
- # Entity-aware mode: search recent clusters across all topics and match by
- # raw entity, canonical label, or MID using SQL-level junction table search.
- clusters = store.get_clusters_by_entity_or_keyword(
- query_terms=query_terms, hours=DEFAULT_LOOKBACK_HOURS, limit=limit
- )
- out = []
- for c in _sort_clusters_by_recency(clusters):
- item = {
- "cluster_id": c.get("cluster_id"),
- "headline": c.get("headline"),
- "summary": c.get("summary"),
- "entities": c.get("entities", []),
- "keywords": c.get("keywords", []),
- "sentiment": c.get("sentiment", "neutral"),
- "importance": c.get("importance", 0.0),
- "sources": c.get("sources", []),
- "timestamp": c.get("timestamp"),
- }
- if include_articles:
- # Return minimal article fields to keep responses compact.
- arts = c.get("articles", []) or []
- item["articles"] = [
- {
- "title": a.get("title"),
- "url": a.get("url"),
- "source": a.get("source"),
- "timestamp": a.get("timestamp"),
- }
- for a in arts
- if isinstance(a, dict)
- ]
- out.append(item)
- return out
- @mcp.tool(description="Investigate a person, company, place, theme, or keyword by matching entities and thematic keywords within a time window.")
- async def get_events_for_entity(entity: str, limit: int = 10, timeframe: str = "24h", include_articles: bool = False):
- limit = max(1, min(int(limit), 30))
- query = normalize_query(entity).strip().lower()
- if not query:
- return []
- resolved = resolve_entity_via_trends(query)
- query_terms = {
- query,
- str(resolved.get("normalized") or "").strip().lower(),
- str(resolved.get("canonical_label") or "").strip().lower(),
- str(resolved.get("mid") or "").strip().lower(),
- }
- query_terms = {q for q in query_terms if q}
- store = SQLiteClusterStore(DB_PATH)
- hours = _parse_timeframe_to_hours(timeframe)
- hits = store.get_clusters_by_entity_or_keyword(query_terms=query_terms, hours=hours, limit=limit)
- out = []
- for c in hits:
- item = {
- "cluster_id": c.get("cluster_id"),
- "headline": c.get("headline"),
- "summary": c.get("summary"),
- "entities": c.get("entities", []),
- "keywords": c.get("keywords", []),
- "sentiment": c.get("sentiment", "neutral"),
- "importance": c.get("importance", 0.0),
- "sources": c.get("sources", []),
- "timestamp": c.get("timestamp"),
- }
- if include_articles:
- arts = c.get("articles", []) or []
- item["articles"] = [
- {
- "title": a.get("title"),
- "url": a.get("url"),
- "source": a.get("source"),
- "timestamp": a.get("timestamp"),
- }
- for a in arts
- if isinstance(a, dict)
- ]
- out.append(item)
- return out
- @mcp.tool(description="Return entities and thematic keywords commonly co-occurring with the subject in recent clusters, optionally blended with Google Trends suggestions.")
- async def get_related_recent_entities(subject: str, timeframe: str = "72h", limit: int = 10, include_trends: bool = True):
- limit = max(1, min(int(limit), 25))
- hours = _parse_timeframe_to_hours(timeframe)
- include_trends_bool = str(include_trends).strip().lower() not in {"false", "0", "no"}
- store = SQLiteClusterStore(DB_PATH)
- result = related_recent_entities(
- store=store,
- subject=subject,
- timeframe_hours=hours,
- limit=limit,
- include_trends=include_trends_bool,
- )
- return result
- @mcp.tool(description="Investigate one cluster in depth and return a concise LLM-written explanation plus key facts, "
- "entities, keywords, related entities and keywords, sentiment, importance, and articles.")
- async def get_event_summary(event_id: str, include_articles: bool = True):
- store = SQLiteClusterStore(DB_PATH)
- # Summary cache: reuse if present within TTL.
- cluster = store.get_cluster_by_id(event_id)
- if not cluster:
- return {
- "event_id": event_id,
- "error": "NOT_FOUND",
- }
- cached_summary = store.get_cluster_summary(
- cluster_id=event_id,
- ttl_hours=DEFAULT_LOOKBACK_HOURS,
- )
- def _enrich(base: dict, src_cluster: dict) -> dict:
- base["entities"] = src_cluster.get("entities", [])
- base["keywords"] = src_cluster.get("keywords", [])
- base["topic"] = src_cluster.get("topic", "other")
- base["sentiment"] = src_cluster.get("sentiment", "neutral")
- base["sentimentScore"] = src_cluster.get("sentimentScore")
- base["importance"] = src_cluster.get("importance", 0.0)
- # Related entities: from co-occurrence in this cluster's article set
- resolved = src_cluster.get("entityResolutions", []) or []
- related_ents = []
- seen_ents = {str(e).strip().lower() for e in (src_cluster.get("entities", []) or [])}
- for res in resolved:
- if isinstance(res, dict):
- label = str(res.get("canonical_label") or res.get("normalized") or "").strip()
- if label and label.lower() not in seen_ents:
- related_ents.append(label)
- seen_ents.add(label.lower())
- base["related_entities"] = related_ents[:10]
- # Related keywords: from the cluster's own keywords (thematic descriptors)
- # plus any co-occurring keywords from recent related clusters
- base["related_keywords"] = _fetch_related_keywords(store, src_cluster, event_id)
- if include_articles:
- arts = src_cluster.get("articles", []) or []
- base["articles"] = [
- {
- "title": a.get("title"),
- "url": a.get("url"),
- "source": a.get("source"),
- "timestamp": a.get("timestamp"),
- }
- for a in arts
- if isinstance(a, dict)
- ]
- return base
- if cached_summary:
- out = {
- "event_id": event_id,
- "headline": cached_summary.get("headline"),
- "mergedSummary": cached_summary.get("mergedSummary"),
- "keyFacts": cached_summary.get("keyFacts", []),
- "sources": cached_summary.get("sources", []),
- }
- out = _enrich(out, cluster)
- return out
- summary = await summarize_cluster_llm(cluster)
- store.upsert_cluster_summary(event_id, summary)
- out = {
- "event_id": event_id,
- "headline": summary.get("headline"),
- "mergedSummary": summary.get("mergedSummary"),
- "keyFacts": summary.get("keyFacts", []),
- "sources": summary.get("sources", []),
- }
- out = _enrich(out, cluster)
- return out
- def _fetch_related_keywords(store: SQLiteClusterStore, cluster: dict, event_id: str) -> list[str]:
- """Find keywords that co-occur with this cluster's entities in recent clusters.
- This gives agents thematic context: what else was being discussed alongside
- the entities in this cluster during the same window.
- """
- entities = cluster.get("entities", []) or []
- if not entities:
- return []
- # Build a set of entity terms to search with
- entity_terms = set()
- for e in entities:
- entity_terms.add(str(e).strip().lower())
- for res in (cluster.get("entityResolutions", []) or []):
- if isinstance(res, dict):
- for key in ("normalized", "canonical_label"):
- val = res.get(key)
- if val:
- entity_terms.add(str(val).strip().lower())
- entity_terms.discard("")
- if not entity_terms:
- return []
- # Find recent clusters that share any entity, collect their keywords
- # Use payload_ts lookback of 48h for co-occurrence window
- from datetime import timedelta
- cutoff = (datetime.now(timezone.utc) - timedelta(hours=48)).isoformat()
- placeholders = ",".join("?" for _ in entity_terms)
- try:
- rows = store._conn().execute(
- f"SELECT DISTINCT c.payload FROM clusters c "
- f"JOIN cluster_entities ce ON c.cluster_id = ce.cluster_id "
- f"WHERE c.payload_ts >= ? AND c.cluster_id != ? "
- f"AND ce.entity IN ({placeholders}) "
- f"ORDER BY c.payload_ts DESC LIMIT 20",
- (cutoff, event_id, *entity_terms),
- ).fetchall()
- except Exception:
- return []
- kw_counter: dict[str, int] = {}
- cluster_kws = {str(k).strip().lower() for k in (cluster.get("keywords", []) or []) if str(k).strip()}
- for (payload_text,) in rows:
- try:
- c = json.loads(payload_text)
- except Exception:
- continue
- for kw in (c.get("keywords", []) or []):
- kw_norm = str(kw).strip()
- if not kw_norm:
- continue
- kw_key = kw_norm.lower()
- # Skip keywords that already appear in this cluster
- if kw_key in cluster_kws:
- continue
- kw_counter[kw_norm] = kw_counter.get(kw_norm, 0) + 1
- # Return top keywords by co-occurrence count
- sorted_kws = sorted(kw_counter.items(), key=lambda x: -x[1])
- return [kw for kw, _ in sorted_kws[:10]]
- @mcp.tool(description="Explore what is starting to matter: surface emerging entities, thematic keywords, and phrases from recent clusters. "
- "Use timeframe to control the lookback window, topic to scope to a category, and around to find what's emerging near a specific entity. "
- "Results include signal_type (entity / keyword / phrase) for downstream filtering.")
- async def detect_emerging_topics(limit: int = 10, timeframe: str = "24h", topic: str | None = None, around: str | None = None):
- """Surface entities and phrases that are accelerating in recent clusters.
- Args:
- limit: max results to return (1-20, default 10).
- timeframe: lookback window like "4h", "24h", "3d" (default "24h").
- topic: optional coarse topic filter ("crypto", "macro", "regulation", "ai", "other").
- around: optional entity — only return entities that co-occur with this entity
- in the recent window (e.g. "Bitcoin" to find what's emerging in Bitcoin's neighborhood).
- """
- limit = max(1, min(int(limit), 20))
- hours = _parse_timeframe_to_hours(timeframe)
- half_hours = hours / 2.0
- store = SQLiteClusterStore(DB_PATH)
- # Fetch more clusters than needed so velocity stats are meaningful even for short windows.
- clusters = store.get_latest_clusters_all_topics(ttl_hours=hours, limit=500)
- # --- optional topic filter ---
- if topic:
- topic_norm = normalize_query(topic).strip().lower()
- if topic_norm:
- clusters = [c for c in clusters if (c.get("topic") or "other").strip().lower() == topic_norm]
- # --- resolve the 'around' entity ---
- around_terms: set[str] = set()
- if around:
- around_norm = normalize_query(around).strip().lower()
- if around_norm:
- resolved = resolve_entity_via_trends(around_norm)
- around_terms = {
- around_norm,
- str(resolved.get("normalized") or "").strip().lower(),
- str(resolved.get("canonical_label") or "").strip().lower(),
- }
- around_terms.discard("")
- # split clusters into first-half vs second-half by timestamp
- # clusters are already sorted most-recent-first from the store
- now = datetime.now(timezone.utc)
- def _cluster_age_hours(c: dict) -> float:
- """Return the cluster's age in hours. payload.timestamp is ISO 8601 UTC guaranteed."""
- ts = c.get("timestamp") or c.get("last_updated")
- if not ts:
- return 0.0
- try:
- dt = datetime.fromisoformat(str(ts).strip())
- if dt.tzinfo is None:
- dt = dt.replace(tzinfo=timezone.utc)
- return max(0.0, (now - dt.astimezone(timezone.utc)).total_seconds() / 3600.0)
- except Exception:
- return 0.0
- # Generic entity filter
- _generic_tokens = {"news", "latest", "breaking", "update", "updates", "report", "reports"}
- def _is_generic_entity(ent: str) -> bool:
- e = str(ent).strip().lower()
- if not e or len(e) < 4:
- return True
- if e in _generic_tokens:
- return True
- return False
- # --- accumulate signals ---
- # recent = second half of timeframe (newer), prior = first half (older)
- entity_counts_recent = Counter()
- entity_counts_prior = Counter()
- entity_importance_recent = Counter()
- entity_sources: dict[str, set] = {} # ent -> set of source names
- entity_buckets: dict[str, set] = {} # ent -> set of time-bucket indices (for sustained-spike detection)
- entity_cooccur: dict[str, Counter] = {}
- phrase_counts_recent = Counter()
- # Keyword accumulators — same scoring pipeline as entities, but tracking
- # LLM-curated thematic descriptors instead of named entities.
- kw_counts_recent = Counter()
- kw_counts_prior = Counter()
- kw_importance_recent = Counter()
- kw_sources: dict[str, set] = {}
- kw_buckets: dict[str, set] = {}
- kw_cooccur: dict[str, Counter] = {}
- entity_kw_cooccur: dict[str, Counter] = {} # entity -> Counter of co-occurring keywords
- bucket_size_hours = max(1.0, hours / 6.0) # split window into ~6 buckets
- for c in clusters:
- ents_in_cluster = [e for e in (c.get("entities", []) or []) if not _is_generic_entity(e)]
- ents_norm = [str(e).strip().lower() for e in ents_in_cluster if str(e).strip()]
- # Keywords: deduplicate per cluster so a cluster with the same keyword
- # listed twice doesn't inflate counts.
- kws_in_cluster = list(dict.fromkeys(
- str(k).strip().lower()
- for k in (c.get("keywords", []) or [])
- if str(k).strip() and not _is_generic_entity(k)
- ))
- age_h = _cluster_age_hours(c)
- is_recent = age_h <= half_hours
- bucket_idx = int(age_h / bucket_size_hours)
- # --- around filter: only count clusters that mention the target entity ---
- if around_terms:
- haystack = set(ents_norm)
- for res in c.get("entityResolutions", []) or []:
- if isinstance(res, dict):
- for key in ("normalized", "canonical_label"):
- val = res.get(key)
- if val:
- haystack.add(str(val).strip().lower())
- if not (haystack & around_terms):
- continue
- counts = entity_counts_recent if is_recent else entity_counts_prior
- imp_acc = entity_importance_recent if is_recent else None # only importance from recent window
- for ent in ents_norm:
- if _is_generic_entity(ent):
- continue
- counts[ent] += 1
- if ent not in entity_sources:
- entity_sources[ent] = set()
- src = c.get("source") or c.get("headline", "").split(" - ")[-1] if c.get("headline") else ""
- if src:
- entity_sources[ent].add(str(src))
- if ent not in entity_buckets:
- entity_buckets[ent] = set()
- entity_buckets[ent].add(bucket_idx)
- if imp_acc is not None:
- try:
- imp_acc[ent] += float(c.get("importance", 0.0) or 0.0)
- except Exception:
- pass
- # --- keyword counting (same recent/prior split as entities) ---
- kw_counts = kw_counts_recent if is_recent else kw_counts_prior
- kw_imp_acc = kw_importance_recent if is_recent else None
- for kw in kws_in_cluster:
- kw_counts[kw] += 1
- if kw not in kw_sources:
- kw_sources[kw] = set()
- src = c.get("source") or c.get("headline", "").split(" - ")[-1] if c.get("headline") else ""
- if src:
- kw_sources[kw].add(str(src))
- if kw not in kw_buckets:
- kw_buckets[kw] = set()
- kw_buckets[kw].add(bucket_idx)
- if kw_imp_acc is not None:
- try:
- kw_imp_acc[kw] += float(c.get("importance", 0.0) or 0.0) # type: ignore[assignment]
- except Exception:
- pass
- # co-occurrence (only for clusters matching the around filter, if any)
- for i in range(len(ents_norm)):
- a = ents_norm[i]
- if _is_generic_entity(a):
- continue
- if a not in entity_cooccur:
- entity_cooccur[a] = Counter()
- for j in range(len(ents_norm)):
- if i == j:
- continue
- b = ents_norm[j]
- if _is_generic_entity(b):
- continue
- entity_cooccur[a][b] += 1
- # keyword co-occurrence: which keywords appear together in the same clusters
- for i in range(len(kws_in_cluster)):
- ka = kws_in_cluster[i]
- if ka not in kw_cooccur:
- kw_cooccur[ka] = Counter()
- for j in range(len(kws_in_cluster)):
- if i == j:
- continue
- kb = kws_in_cluster[j]
- kw_cooccur[ka][kb] += 1
- # also track entity<->keyword co-occurrence (bidirectional)
- for ent in ents_norm:
- if _is_generic_entity(ent):
- continue
- kw_cooccur[ka][ent] += 1
- # and the reverse: entity -> keyword
- if ent not in entity_kw_cooccur:
- entity_kw_cooccur[ent] = Counter()
- entity_kw_cooccur[ent][ka] += 1
- # bigram phrases (recent only)
- if is_recent:
- text = f"{c.get('headline', '')} {c.get('summary', '')}"
- words = re.findall(r"[A-Za-z][A-Za-z0-9\-]{2,}", text.lower())
- for i in range(len(words) - 1):
- phrase = f"{words[i]} {words[i+1]}"
- if len(phrase) > 6:
- phrase_counts_recent[phrase] += 1
- # --- score entities ---
- all_entities = set(entity_counts_recent.keys()) | set(entity_counts_prior.keys())
- scored = []
- for ent in all_entities:
- recent_n = entity_counts_recent.get(ent, 0)
- prior_n = entity_counts_prior.get(ent, 0)
- total_n = recent_n + prior_n
- if total_n < 1:
- continue
- # velocity: ratio of recent vs prior (smoothed to avoid division noise)
- # 0 prior → velocity = recent_n (pure emergence)
- # equal → velocity = 1.0 (steady)
- velocity = (recent_n + 0.5) / (prior_n + 0.5)
- # recency weight: what fraction of total hits are in the recent window
- recency_ratio = recent_n / total_n
- # source diversity: how many distinct outlets
- n_sources = len(entity_sources.get(ent, set()))
- # sustained: how many distinct time buckets did it appear in (max ~6)
- n_buckets = len(entity_buckets.get(ent, set()))
- # average importance (recent window only)
- avg_imp = (entity_importance_recent.get(ent, 0.0) / max(1, recent_n)) if recent_n > 0 else 0.0
- composed_score = (
- 0.35 * min(1.0, math.log1p(velocity) / math.log1p(4.0)) + # velocity (0..1, 4x = max)
- 0.25 * recency_ratio + # recency concentration
- 0.15 * min(1.0, n_sources / 5.0) + # source diversity
- 0.10 * min(1.0, n_buckets / 4.0) + # sustained (>1 bucket)
- 0.15 * min(1.0, avg_imp) # importance
- )
- related = []
- if ent in entity_cooccur:
- for other, _cnt in entity_cooccur[ent].most_common(10):
- if other != ent:
- related.append(other)
- related_kws = []
- if ent in entity_kw_cooccur:
- # Build a set of related entity names (lowercased) to deduplicate
- # keywords that are already represented in related_entities
- related_ent_names = {e.strip().lower() for e in related}
- # Also include the entity itself and its common aliases
- related_ent_names.add(ent.strip().lower())
- for kw, _cnt in entity_kw_cooccur[ent].most_common(10):
- kw_lower = kw.strip().lower()
- # Skip keywords that are just a related entity name (substring match)
- if any(kw_lower in ent_name or ent_name in kw_lower
- for ent_name in related_ent_names):
- continue
- related_kws.append(kw)
- if len(related_kws) >= 5:
- break
- scored.append({
- "topic": ent,
- "trend_score": min(0.99, round(composed_score, 3)),
- "related_entities": related[:5] if related else [ent],
- "related_keywords": related_kws[:5],
- "velocity": round(velocity, 2),
- "recent_count": recent_n,
- "prior_count": prior_n,
- "source_count": n_sources,
- "avg_importance": round(avg_imp, 3),
- "signal_type": "entity",
- })
- # --- score keywords (same velocity/recency/source/sustained/importance formula) ---
- all_keywords = set(kw_counts_recent.keys()) | set(kw_counts_prior.keys())
- kw_scored = []
- for kw in all_keywords:
- # Skip keywords that are already scored as entities — entity signal is
- # higher quality (proper nouns, resolved identities).
- if kw in all_entities:
- continue
- recent_n = kw_counts_recent.get(kw, 0)
- prior_n = kw_counts_prior.get(kw, 0)
- total_n = recent_n + prior_n
- if total_n < 1:
- continue
- velocity = (recent_n + 0.5) / (prior_n + 0.5)
- recency_ratio = recent_n / total_n
- n_sources = len(kw_sources.get(kw, set()))
- n_buckets = len(kw_buckets.get(kw, set()))
- avg_imp = (kw_importance_recent.get(kw, 0.0) / max(1, recent_n)) if recent_n > 0 else 0.0
- composed_score = (
- 0.35 * min(1.0, math.log1p(velocity) / math.log1p(4.0)) +
- 0.25 * recency_ratio +
- 0.15 * min(1.0, n_sources / 5.0) +
- 0.10 * min(1.0, n_buckets / 4.0) +
- 0.15 * min(1.0, avg_imp)
- )
- kw_related_kws = []
- kw_related_ents = []
- if kw in kw_cooccur:
- for other, _cnt in kw_cooccur[kw].most_common(10):
- if other == kw:
- continue
- # If this co-occurring term is a known entity, route to related_entities
- if other in all_entities:
- kw_related_ents.append(other)
- else:
- kw_related_kws.append(other)
- if len(kw_related_kws) >= 5 and len(kw_related_ents) >= 5:
- break
- kw_scored.append({
- "topic": kw,
- "trend_score": min(0.99, round(composed_score, 3)),
- "related_entities": kw_related_ents[:5],
- "related_keywords": kw_related_kws[:5],
- "velocity": round(velocity, 2),
- "recent_count": recent_n,
- "prior_count": prior_n,
- "source_count": n_sources,
- "avg_importance": round(avg_imp, 3),
- "signal_type": "keyword",
- })
- # sort keywords by score descending
- kw_scored.sort(key=lambda x: (-x["trend_score"], -x["velocity"], x["topic"]))
- # sort by composed score descending
- scored.sort(key=lambda x: (-x["trend_score"], -x["velocity"], x["topic"]))
- # --- merge: entities first, then keywords, then phrases ---
- emerging = list(scored) # start with entities
- seen_topics = {item["topic"] for item in emerging}
- for kw_item in kw_scored:
- if kw_item["topic"] not in seen_topics:
- emerging.append(kw_item)
- seen_topics.add(kw_item["topic"])
- # --- add phrase signals (only from recent window) ---
- for phrase, count in phrase_counts_recent.most_common(limit * 2):
- if phrase in seen_topics:
- continue
- emerging.append({
- "topic": phrase.title(),
- "trend_score": min(0.99, round(0.30 + 0.15 * min(count, 5), 2)),
- "related_entities": [],
- "related_keywords": [],
- "velocity": None,
- "recent_count": count,
- "prior_count": 0,
- "source_count": 0,
- "avg_importance": 0.0,
- "signal_type": "phrase",
- })
- seen_topics.add(phrase)
- if len(emerging) >= limit:
- break
- return emerging[:limit]
- @mcp.tool(description="Investigate whether sentiment around an entity or keyword is positive, negative, or neutral over a chosen lookback window. "
- "Matches clusters by both named entities and thematic keywords.")
- async def get_news_sentiment(entity: str, timeframe: str = "24h"):
- store = SQLiteClusterStore(DB_PATH)
- ent = normalize_query(entity).strip().lower()
- resolved = resolve_entity_via_trends(ent)
- query_terms = {
- ent,
- str(resolved.get("normalized") or "").strip().lower(),
- str(resolved.get("canonical_label") or "").strip().lower(),
- str(resolved.get("mid") or "").strip().lower(),
- }
- query_terms = {q for q in query_terms if q}
- if not ent:
- return {
- "entity": entity,
- "sentiment": "neutral",
- "score": 0.0,
- "cluster_count": 0,
- }
- # timeframe: accept '24h' or '24'
- tf = str(timeframe).strip().lower()
- try:
- hours = int(tf[:-1]) if tf.endswith("h") else int(tf)
- except Exception:
- hours = 24
- hours = max(1, min(int(hours), 168))
- clusters = store.get_clusters_by_entity_or_keyword(query_terms=query_terms, hours=hours, limit=500)
- matched = clusters
- if not matched:
- return {
- "entity": entity,
- "sentiment": "neutral",
- "score": 0.0,
- "cluster_count": 0,
- }
- scores = []
- for c in matched:
- s = c.get("sentimentScore")
- if s is not None:
- try:
- scores.append(float(s))
- except Exception:
- pass
- avg_score = sum(scores) / len(scores) if scores else 0.0
- # Keep the label aligned with the numeric score.
- # Small magnitudes are treated as neutral to avoid noisy label flips.
- if avg_score >= 0.15:
- sentiment = "positive"
- elif avg_score <= -0.15:
- sentiment = "negative"
- else:
- sentiment = "neutral"
- return {
- "entity": entity,
- "sentiment": sentiment,
- "score": round(avg_score, 3),
- "cluster_count": len(matched),
- }
- @mcp.tool(description="Describe the server tool surface, how tools fit together, and output conventions for downstream agents.")
- async def get_capabilities():
- return {
- "server": {
- "name": "news-mcp",
- "purpose": "Recent news clusters with entities and thematic keywords, entity/keyword drill-down, sentiment, emerging topics, and related-entity expansion.",
- "output_conventions": {
- "cluster_ids": "Do not surface cluster_id in user-facing prose unless explicitly requested; treat it as internal navigation metadata.",
- "sources": "Always preserve and display sources when summarizing a cluster or entity result.",
- "timestamps": "Mention timestamps consistently when comparing multiple clusters or when recency matters.",
- "clusters": "Each cluster includes entities (named entities with optional MID/canonical_label) and keywords (thematic descriptors). Both are searchable; entities are higher-signal, keywords capture subject-matter themes.",
- },
- },
- "tools": NEWS_TOOL_CARDS,
- "recipes": NEWS_COMPOSITION_RECIPES,
- "example_chains": NEWS_EXAMPLE_CHAINS,
- "agent_tips": NEWS_AGENT_TIPS,
- "guidance": [
- "Use get_latest_events for a tail, get_events_for_entity for entity/keyword deep dives, and get_related_recent_entities for neighborhood expansion.",
- "Prefer normalized/canonical entities when possible, but the server will resolve common aliases and MIDs for you.",
- "When presenting results to users, summarize the cluster; avoid exposing internal IDs unless they are needed for follow-up tool calls.",
- "For emerging topics, use detect_emerging_topics with timeframe and around parameters. Signal types: entity (named entity, highest quality), keyword (thematic descriptor), phrase (headline bigram). High velocity + high source_count = strong signal.",
- "get_events_for_entity and get_news_sentiment match both entities and thematic keywords — use keywords when the subject is a theme rather than a named entity.",
- ],
- }
- def _parse_timeframe_to_hours(timeframe: str) -> int:
- tf = str(timeframe).strip().lower()
- try:
- if tf.endswith("d"):
- days = int(tf[:-1])
- return max(1, days * 24)
- if tf.endswith("h"):
- return max(1, int(tf[:-1]))
- return max(1, int(tf))
- except Exception:
- return 24
- from contextlib import asynccontextmanager
- @asynccontextmanager
- async def _lifespan(app: FastAPI):
- asyncio.ensure_future(_background_refresh_loop())
- yield
- app = FastAPI(title="News MCP Server", lifespan=_lifespan)
- logger = logging.getLogger("news_mcp.startup")
- app.mount("/mcp", mcp.sse_app())
- # Shared store — single connection pool
- _shared_store = SQLiteClusterStore(DB_PATH)
- _refresh_lock = asyncio.Lock()
- _refresh_started = False
- async def _background_refresh_loop():
- """Non-blocking background refresher: prune then poll.
- Protected by an async lock so a second event-loop wake-up cannot
- start a parallel ingestion cycle.
- """
- global _refresh_started
- async with _refresh_lock:
- if _refresh_started:
- return
- _refresh_started = True
- logger.info("news-mcp llm config: %s", active_llm_config())
- # Prune off-thread so we do not block the event loop
- prune_result = await asyncio.to_thread(
- _shared_store.prune_if_due,
- NEWS_PRUNING_ENABLED,
- NEWS_RETENTION_DAYS,
- NEWS_PRUNE_INTERVAL_HOURS,
- )
- logger.info("startup prune_result=%s", prune_result)
- # Seed feed_state from .env — insert missing feeds, leave existing alone
- feed_urls = [u.strip() for u in NEWS_FEED_URLS.split(",") if u.strip()]
- if not feed_urls and NEWS_FEED_URL:
- feed_urls = [NEWS_FEED_URL]
- with _shared_store._conn() as conn:
- for url in feed_urls:
- conn.execute(
- "INSERT OR IGNORE INTO feed_state(feed_key, last_hash, last_item_count, enabled, updated_at) VALUES(?, '', 0, 1, '')",
- (url,),
- )
- logger.info("startup seeded %d feeds into feed_state", len(feed_urls))
- if not NEWS_BACKGROUND_REFRESH_ENABLED:
- return
- async def _loop():
- if not NEWS_BACKGROUND_REFRESH_ON_START:
- logger.info("background refresh delayed start interval_seconds=%s", NEWS_REFRESH_INTERVAL_SECONDS)
- await asyncio.sleep(float(NEWS_REFRESH_INTERVAL_SECONDS))
- while True:
- try:
- logger.info("background refresh tick start")
- await refresh_clusters(topic=None, limit=200)
- logger.info("background refresh tick complete")
- except Exception:
- logger.exception("background refresh tick failed")
- await asyncio.sleep(float(NEWS_REFRESH_INTERVAL_SECONDS))
- asyncio.create_task(_loop())
- @app.get("/")
- def root():
- return {
- "status": "ok",
- "transport": "fastmcp+sse",
- "mount": "/mcp",
- "tools": [
- "get_latest_events",
- "get_events_for_entity",
- "get_event_summary",
- "detect_emerging_topics",
- "get_news_sentiment",
- "get_related_recent_entities",
- "get_capabilities",
- ],
- "refresh": {
- "enabled": NEWS_BACKGROUND_REFRESH_ENABLED,
- "interval_seconds": NEWS_REFRESH_INTERVAL_SECONDS,
- },
- "retention": {
- "lookback_hours": DEFAULT_LOOKBACK_HOURS,
- "retention_days": NEWS_RETENTION_DAYS,
- },
- "pruning": {
- "enabled": NEWS_PRUNING_ENABLED,
- "interval_hours": NEWS_PRUNE_INTERVAL_HOURS,
- },
- }
- # ------------------------------------------------------------------
- # Dashboard REST API endpoints
- # ------------------------------------------------------------------
- from fastapi.staticfiles import StaticFiles
- from fastapi.responses import JSONResponse
- app.mount("/dashboard", StaticFiles(directory="dashboard", html=True), name="dashboard")
- import logging as _log
- API_LOG = _log.getLogger("news_mcp.api")
- def _api_ok(data: dict) -> dict:
- return data
- def _api_err(exc: Exception, ctx: str) -> JSONResponse:
- API_LOG.exception(f"API error in {ctx}")
- return JSONResponse(status_code=500, content={"error": str(exc), "ctx": ctx})
- @app.get("/api/v1/health")
- def api_health():
- """Extended health + dashboard stats."""
- try:
- store = _shared_store
- stats = store.get_dashboard_stats()
- stats["version"] = _VERSION_HASH
- return stats
- except Exception as e:
- return _api_err(e, "health")
- @app.get("/api/v1/clusters")
- def api_clusters(
- topic: str | None = None,
- hours: int = 24,
- limit: int = 50,
- offset: int = 0,
- ):
- """Paginated cluster listing."""
- try:
- store = _shared_store
- result = store.get_clusters_page(topic=topic, hours=hours, limit=limit, offset=offset)
- return {"clusters": result["clusters"], "total": result["total"], "topic": topic or "all", "hours": hours}
- except Exception as e:
- return _api_err(e, f"clusters(topic={topic},hours={hours})")
- @app.get("/api/v1/sentiment-series")
- def api_sentiment_series(
- topic: str | None = None,
- hours: int = 24,
- bucket_hours: float = 1.0,
- ):
- """Sentiment time-series for Chart.js."""
- try:
- store = _shared_store
- series = store.get_sentiment_series(topic=topic, hours=hours, bucket_hours=bucket_hours)
- return {"series": series, "topic": topic or "all"}
- except Exception as e:
- return _api_err(e, f"sentiment(topic={topic})")
- @app.get("/api/v1/entities")
- def api_entities(
- hours: int = 24,
- limit: int = 30,
- ):
- """Top entity frequencies."""
- try:
- store = _shared_store
- entities = store.get_entity_frequencies(hours=hours, limit=limit)
- return {"entities": entities, "hours": hours}
- except Exception as e:
- return _api_err(e, f"entities(hours={hours})")
- @app.get("/api/v1/keywords")
- def api_keywords(
- hours: int = 24,
- limit: int = 30,
- ):
- """Top keyword frequencies (thematic descriptors, excluding terms already counted as entities)."""
- try:
- store = _shared_store
- keywords = store.get_keyword_frequencies(hours=hours, limit=limit)
- return {"keywords": keywords, "hours": hours}
- except Exception as e:
- return _api_err(e, f"keywords(hours={hours})")
- @app.get("/api/v1/clusters/by-entity")
- def api_clusters_by_entity(
- entity: str,
- hours: int = 168,
- limit: int = 50,
- offset: int = 0,
- ):
- """Return clusters matching an entity, filtered by event time via SQL junction table."""
- try:
- store = _shared_store
- return store.get_clusters_by_entity(
- entity=entity.strip().lower(),
- hours=hours,
- limit=limit,
- offset=offset,
- )
- except Exception as e:
- return _api_err(e, f"by-entity(entity={entity},hours={hours})")
- @app.get("/api/v1/clusters/by-keyword")
- def api_clusters_by_keyword(
- keyword: str,
- hours: int = 168,
- limit: int = 50,
- offset: int = 0,
- ):
- """Return clusters matching a keyword, filtered by event time via SQL junction table."""
- try:
- store = _shared_store
- return store.get_clusters_by_keyword(
- keyword=keyword.strip().lower(),
- hours=hours,
- limit=limit,
- offset=offset,
- )
- except Exception as e:
- return _api_err(e, f"by-keyword(keyword={keyword},hours={hours})")
- @app.get("/api/v1/cluster/{cluster_id}")
- def api_cluster_detail(cluster_id: str):
- """Full cluster detail for drill-down."""
- try:
- store = _shared_store
- detail = store.get_cluster_detail(cluster_id)
- if not detail:
- return JSONResponse(status_code=404, content={"error": "Cluster not found", "id": cluster_id})
- return detail
- except Exception as e:
- return _api_err(e, f"detail({cluster_id})")
- # ------------------------------------------------------------------
- # Feed management endpoints (toggle on/off from dashboard)
- # ------------------------------------------------------------------
- @app.get("/api/v1/feeds")
- def api_feeds():
- """List all configured feeds with enabled/disabled status."""
- try:
- store = SQLiteClusterStore(DB_PATH)
- feed_list = store.get_feed_state_list()
- configured = _configured_feed_urls()
- return {
- "feeds": feed_list,
- "configured_urls": configured,
- }
- except Exception as e:
- return _api_err(e, "feeds")
- @app.post("/api/v1/feeds/toggle")
- async def api_feed_toggle(feed_url: str = Form(), enabled: bool = Form()):
- """Toggle a feed's enabled state."""
- try:
- store = SQLiteClusterStore(DB_PATH)
- ok = store.set_feed_enabled(feed_url.strip(), enabled)
- if not ok:
- return JSONResponse(
- status_code=404,
- content={"error": f"Feed not found: {feed_url}"},
- )
- return {"ok": True, "feed_url": feed_url.strip(), "enabled": enabled}
- except Exception as e:
- return _api_err(e, f"toggle({feed_url})")
- # ------------------------------------------------------------------ #
- # Site config (dashboard-tuneable parameters)
- # ------------------------------------------------------------------ #
- @app.get("/api/v1/config")
- def api_config():
- """All site config parameters (seeded from .env/defaults)."""
- try:
- from news_mcp.site_config import get_site_config
- with _shared_store._conn() as conn:
- rows = get_site_config(conn)
- return {"config": rows}
- except Exception as e:
- return _api_err(e, "config")
- @app.post("/api/v1/config/update")
- async def api_config_update(key: str = Form(), value: str = Form()):
- """Update a single config parameter at runtime."""
- try:
- from news_mcp.site_config import set_config_value
- with _shared_store._conn() as conn:
- ok = set_config_value(conn, key.strip(), value.strip())
- conn.commit()
- if not ok:
- return JSONResponse(status_code=404, content={"error": f"Config key not found: {key}"})
- return {"ok": True, "key": key.strip(), "value": value.strip()}
- except Exception as e:
- return _api_err(e, f"config/update({key})")
- @app.post("/api/v1/config/reset")
- async def api_config_reset():
- """Reset all config to .env/defaults (drops and re-seeds site_config)."""
- try:
- from news_mcp.site_config import seed_site_config
- with _shared_store._conn() as conn:
- conn.execute("DELETE FROM site_config")
- seeded = seed_site_config(conn)
- conn.commit()
- return {"ok": True, "seeded": seeded}
- except Exception as e:
- return _api_err(e, "config/reset")
- @app.get("/health")
- def health():
- return {
- "status": "ok",
- "uptime": round(time.monotonic() - _PROCESS_STARTED_AT, 3),
- "version": _VERSION_HASH,
- }
|