Просмотр исходного кода

Refactor news LLM extraction pipeline

Lukas Goldschmidt 1 месяц назад
Родитель
Сommit
cdd52b9f1e

+ 30 - 0
.env.example

@@ -0,0 +1,30 @@
+# News MCP configuration example
+
+# LLM selection
+NEWS_EXTRACT_PROVIDER=groq
+NEWS_EXTRACT_MODEL=llama4-16e
+NEWS_SUMMARY_PROVIDER=groq
+NEWS_SUMMARY_MODEL=llama4-16e
+
+# API keys
+GROQ_API_KEY=
+OPENAI_API_KEY=
+
+# Extraction behavior
+ENTITY_BLACKLIST=bloomberg
+GROQ_DEBUG=false
+GROQ_ENRICH_OTHER_ONLY=false
+GROQ_MAX_CLUSTERS_PER_REFRESH=20
+
+# Feeds
+NEWS_FEED_URL=https://breakingthenews.net/news-feed.xml
+NEWS_FEED_URLS=
+
+# Storage / refresh
+NEWS_MCP_DATA_DIR=
+NEWS_MCP_DB_PATH=
+NEWS_CLUSTERS_TTL_HOURS=24
+NEWS_REFRESH_INTERVAL_SECONDS=900
+NEWS_BACKGROUND_REFRESH_ENABLED=true
+NEWS_BACKGROUND_REFRESH_ON_START=true
+NEWS_PROMPTS_DIR=

+ 18 - 3
README.md

@@ -20,8 +20,9 @@ Health:
 ## What this server provides
 - Fetches from one or more configured news feeds (`NEWS_FEED_URL` / `NEWS_FEED_URLS`)
 - Deduplicates articles into clusters (v1 fuzzy title similarity)
-- Enriches clusters with Groq (topic/entities/sentiment/keywords)
-- Caches clusters + Groq fields in SQLite
+- Enriches clusters with configurable LLM providers/models (topic/entities/sentiment/keywords)
+- Applies a case-insensitive entity blacklist after extraction
+- Caches clusters + LLM fields in SQLite
 
 ## Tools (MCP)
 
@@ -44,7 +45,11 @@ Health:
 
 See `news-mcp/.env`.
 Key variables:
-- `GROQ_API_KEY`, `GROQ_MODEL`, `GROQ_DEBUG`
+- `NEWS_EXTRACT_PROVIDER`, `NEWS_EXTRACT_MODEL`
+- `NEWS_SUMMARY_PROVIDER`, `NEWS_SUMMARY_MODEL`
+- `GROQ_API_KEY`, `OPENAI_API_KEY`
+- `ENTITY_BLACKLIST` (comma-separated, case-insensitive exact entity match)
+- `NEWS_PROMPTS_DIR` (override prompt directory)
 - `NEWS_FEED_URL` (single feed fallback)
 - `NEWS_FEED_URLS` (comma-separated feed URLs; overrides `NEWS_FEED_URL`)
 - `NEWS_REFRESH_INTERVAL_SECONDS` (default 900)
@@ -53,6 +58,16 @@ Key variables:
 - `NEWS_CLUSTERS_TTL_HOURS`
 - `GROQ_ENRICH_OTHER_ONLY` (default false; set true for cost control)
 
+## Live extraction smoke test
+
+Run a standardized, fabricated extraction test against the currently configured provider/model:
+
+```bash
+./live_tests.sh
+```
+
+The script reads `./.env`, selects OpenAI or Groq based on the configured keys, and checks that the core expected entities are extracted.
+
 ## mcporter examples (all news-mcp calls)
 
 Use your existing config path:

+ 139 - 0
live_tests.sh

@@ -0,0 +1,139 @@
+#!/usr/bin/env bash
+set -euo pipefail
+
+ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
+PYTHON_BIN="${PYTHON_BIN:-$ROOT_DIR/.venv/bin/python}"
+
+if [[ -f "$ROOT_DIR/.env" ]]; then
+  set -a
+  # shellcheck disable=SC1090
+  source "$ROOT_DIR/.env"
+  set +a
+fi
+
+if [[ ! -x "$PYTHON_BIN" ]]; then
+  echo "ERROR: python not found at $PYTHON_BIN" >&2
+  exit 1
+fi
+
+if [[ -z "${NEWS_EXTRACT_PROVIDER:-}" ]]; then
+  if [[ -n "${OPENAI_API_KEY:-}" ]]; then
+    export NEWS_EXTRACT_PROVIDER="openai"
+  elif [[ -n "${GROQ_API_KEY:-}" ]]; then
+    export NEWS_EXTRACT_PROVIDER="groq"
+  else
+    export NEWS_EXTRACT_PROVIDER="openai"
+  fi
+fi
+
+case "${NEWS_EXTRACT_PROVIDER}" in
+  openai)
+    export NEWS_EXTRACT_MODEL="${NEWS_EXTRACT_MODEL:-gpt-5-nano-2025-08-07}"
+    ;;
+  groq)
+    export NEWS_EXTRACT_MODEL="${NEWS_EXTRACT_MODEL:-llama4-16e}"
+    ;;
+esac
+export ENTITY_BLACKLIST="${ENTITY_BLACKLIST:-}"
+
+case "${NEWS_EXTRACT_PROVIDER}" in
+  openai)
+    if [[ -z "${OPENAI_API_KEY:-}" ]]; then
+      echo "ERROR: OPENAI_API_KEY is not set, so the live OpenAI extraction test cannot run." >&2
+      exit 4
+    fi
+    ;;
+  groq)
+    if [[ -z "${GROQ_API_KEY:-}" ]]; then
+      echo "ERROR: GROQ_API_KEY is not set, so the live Groq extraction test cannot run." >&2
+      exit 4
+    fi
+    ;;
+esac
+
+"$PYTHON_BIN" - <<'PY'
+import asyncio
+import json
+import os
+import sys
+from news_mcp.llm import call_extraction
+
+cluster = {
+    "headline": "Reuters says Bitcoin, Ethereum, the Fed, and the ECB reacted as Trump and the EU discussed Iran and Israel",
+    "summary": (
+        "In a fictional test report, Reuters described Bitcoin, Ethereum, "
+        "the Federal Reserve, and the European Central Bank. Trump, the EU, "
+        "Iran, and Israel were all mentioned in the same narrative."
+    ),
+    "articles": [
+        {
+            "title": "Reuters says Bitcoin, Ethereum, the Fed, and the ECB reacted as Trump and the EU discussed Iran and Israel",
+            "url": "https://example.com/test",
+            "source": "TestSource",
+            "timestamp": "Tue, 31 Mar 2026 12:00:00 GMT",
+            "summary": "A fabricated test story involving several named entities.",
+        }
+    ],
+}
+
+# Quantifiable acceptance set: the model may canonicalize some entities,
+# but it must recover the core set below.
+expected_any = {
+    "Reuters",
+    "Bitcoin",
+    "Ethereum",
+    "Fed",
+    "ECB",
+    "Trump",
+    "EU",
+    "Iran",
+    "Israel",
+}
+
+canonical_map = {
+    "federal reserve": "Fed",
+    "federalreserve": "Fed",
+    "european central bank": "ECB",
+    "ecb": "ECB",
+    "european union": "EU",
+    "eu": "EU",
+    "donald trump": "Trump",
+    "trump": "Trump",
+}
+
+async def main() -> int:
+    out = await call_extraction(cluster)
+    entities = out.get("entities", [])
+    normalized = set()
+    for ent in entities:
+        key = str(ent).strip().lower()
+        normalized.add(canonical_map.get(key, str(ent).strip()))
+
+    missing = sorted(expected_any - normalized)
+    extra = sorted(normalized - expected_any)
+
+    print(json.dumps({
+        "provider": os.getenv("NEWS_EXTRACT_PROVIDER"),
+        "model": os.getenv("NEWS_EXTRACT_MODEL"),
+        "output": out,
+        "normalized_entities": sorted(normalized),
+        "missing": missing,
+        "extra": extra,
+    }, ensure_ascii=False, indent=2))
+
+    if missing:
+        print(f"FAIL: missing entities: {missing}", file=sys.stderr)
+        return 2
+
+    # Extra entities are tolerated only if they are generic / helpful.
+    allowed_extras = {"Macro", "Crypto"}
+    bad_extra = [e for e in extra if e not in allowed_extras]
+    if bad_extra:
+        print(f"FAIL: unexpected extra entities: {bad_extra}", file=sys.stderr)
+        return 3
+
+    print("PASS: live extraction smoke test matched expected core entities")
+    return 0
+
+raise SystemExit(asyncio.run(main()))
+PY

+ 8 - 16
news_mcp/config.py

@@ -3,43 +3,35 @@ from pathlib import Path
 
 from dotenv import load_dotenv
 
-# Load .env from project folder so Groq/debug flags are available under uvicorn/nohup.
 _HERE = Path(__file__).resolve().parent.parent
 load_dotenv(_HERE / ".env")
 
 DATA_DIR = Path(os.getenv("NEWS_MCP_DATA_DIR", Path(__file__).resolve().parent / "data"))
 DATA_DIR.mkdir(parents=True, exist_ok=True)
-
 DB_PATH = Path(os.getenv("NEWS_MCP_DB_PATH", str(DATA_DIR / "news.sqlite")))
+PROMPTS_DIR = Path(os.getenv("NEWS_PROMPTS_DIR", str(_HERE / "prompts")))
 
-# Backward-compatible aliases for older config names.
 NEWS_FEED_URL = os.getenv("NEWS_FEED_URL", os.getenv("NEWS_RSS_FEED_URL", "https://breakingthenews.net/news-feed.xml"))
-# Optional multi-feed mode: comma-separated feed URLs.
-# If set (non-empty), this overrides NEWS_FEED_URL.
 NEWS_FEED_URLS = os.getenv("NEWS_FEED_URLS", os.getenv("NEWS_RSS_FEED_URLS", "")).strip()
-
-# Legacy names kept for compatibility.
 RSS_FEED_URL = NEWS_FEED_URL
 RSS_FEED_URLS = NEWS_FEED_URLS
 
-# Clusters TTL (hours)
 CLUSTERS_TTL_HOURS = float(os.getenv("NEWS_CLUSTERS_TTL_HOURS", "24"))
-
 DEFAULT_TOPICS = ["crypto", "macro", "regulation", "ai", "other"]
 
-# Optional LLM enrichment (Groq)
+# LLM extraction / summarization
 GROQ_API_KEY = os.getenv("GROQ_API_KEY")
-GROQ_MODEL = os.getenv("GROQ_MODEL", "llama4-16e")
+OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
+NEWS_EXTRACT_PROVIDER = os.getenv("NEWS_EXTRACT_PROVIDER", "groq")
+NEWS_EXTRACT_MODEL = os.getenv("NEWS_EXTRACT_MODEL", os.getenv("GROQ_MODEL", "llama4-16e"))
+NEWS_SUMMARY_PROVIDER = os.getenv("NEWS_SUMMARY_PROVIDER", "groq")
+NEWS_SUMMARY_MODEL = os.getenv("NEWS_SUMMARY_MODEL", os.getenv("GROQ_MODEL", "llama4-16e"))
 GROQ_DEBUG = os.getenv("GROQ_DEBUG", "false").lower() == "true"
 
-# Groq enrichment is the default for all incoming news.
-# Set GROQ_ENRICH_OTHER_ONLY=true only if you want to restrict it for cost control.
+NEWS_ENTITY_BLACKLIST = [x.strip().lower() for x in os.getenv("ENTITY_BLACKLIST", "").split(",") if x.strip()]
 GROQ_ENRICH_OTHER_ONLY = os.getenv("GROQ_ENRICH_OTHER_ONLY", "false").lower() == "true"
-
-# Limit enriched clusters per refresh call.
 GROQ_MAX_CLUSTERS_PER_REFRESH = int(os.getenv("GROQ_MAX_CLUSTERS_PER_REFRESH", "20"))
 
-# Background refresh
 NEWS_REFRESH_INTERVAL_SECONDS = int(os.getenv("NEWS_REFRESH_INTERVAL_SECONDS", "900"))
 NEWS_BACKGROUND_REFRESH_ENABLED = os.getenv("NEWS_BACKGROUND_REFRESH_ENABLED", "true").lower() == "true"
 NEWS_BACKGROUND_REFRESH_ON_START = os.getenv("NEWS_BACKGROUND_REFRESH_ON_START", "true").lower() == "true"

+ 35 - 0
news_mcp/enrichment/llm_enrich.py

@@ -0,0 +1,35 @@
+from __future__ import annotations
+
+from typing import Any, Dict
+
+from news_mcp.config import NEWS_ENTITY_BLACKLIST
+from news_mcp.llm import call_extraction, call_summary
+
+
+def _filter_entities(entities, blacklist=None):
+    banned = set(x.strip().lower() for x in (blacklist if blacklist is not None else NEWS_ENTITY_BLACKLIST))
+    out = []
+    for ent in entities or []:
+        key = str(ent).strip().lower()
+        if not key or key in banned:
+            continue
+        out.append(ent)
+    return out
+
+
+async def classify_cluster_groq(cluster: Dict[str, Any]) -> Dict[str, Any]:
+    parsed = await call_extraction(cluster)
+    out = dict(cluster)
+    out.update({
+        "topic": parsed.get("topic", cluster.get("topic")),
+        "entities": _filter_entities(parsed.get("entities", [])),
+        "sentiment": parsed.get("sentiment", "neutral"),
+        "sentimentScore": parsed.get("sentimentScore"),
+        "keywords": parsed.get("keywords", []),
+    })
+    return out
+
+
+async def summarize_cluster_groq(cluster: Dict[str, Any]) -> Dict[str, Any]:
+    parsed = await call_summary(cluster)
+    return parsed

+ 1 - 1
news_mcp/jobs/poller.py

@@ -6,7 +6,7 @@ from typing import Any, Dict
 from news_mcp.config import CLUSTERS_TTL_HOURS, DB_PATH, NEWS_FEED_URL, NEWS_FEED_URLS
 from news_mcp.dedup.cluster import dedup_and_cluster_articles
 from news_mcp.enrichment.enrich import enrich_cluster
-from news_mcp.enrichment.groq_enrich import classify_cluster_groq
+from news_mcp.enrichment.llm_enrich import classify_cluster_groq
 from news_mcp.sources.news_feeds import fetch_news_articles
 from news_mcp.storage.sqlite_store import SQLiteClusterStore
 

+ 113 - 0
news_mcp/llm.py

@@ -0,0 +1,113 @@
+from __future__ import annotations
+
+import json
+from pathlib import Path
+from typing import Any, Dict, Iterable, List
+
+import httpx
+
+from news_mcp.config import (
+    NEWS_EXTRACT_PROVIDER,
+    NEWS_EXTRACT_MODEL,
+    NEWS_SUMMARY_PROVIDER,
+    NEWS_SUMMARY_MODEL,
+    PROMPTS_DIR,
+)
+
+
+SYSTEM_PROMPT = "You are a news signal extraction engine. Return STRICT JSON only."
+
+
+class LLMError(RuntimeError):
+    pass
+
+
+def load_prompt(name: str) -> str:
+    path = PROMPTS_DIR / name
+    return path.read_text(encoding="utf-8")
+
+
+def _render_prompt(template: str, **kwargs: Any) -> str:
+    rendered = template
+    for key, value in kwargs.items():
+        rendered = rendered.replace("{" + key + "}", str(value))
+    return rendered
+
+
+def active_llm_config() -> dict[str, str]:
+    return {
+        "extract_provider": NEWS_EXTRACT_PROVIDER,
+        "extract_model": NEWS_EXTRACT_MODEL,
+        "summary_provider": NEWS_SUMMARY_PROVIDER,
+        "summary_model": NEWS_SUMMARY_MODEL,
+    }
+
+
+async def _call_groq(model: str, messages: List[Dict[str, str]], response_json: bool = True) -> str:
+    from news_mcp.config import GROQ_API_KEY
+
+    if not GROQ_API_KEY:
+        raise LLMError("GROQ_API_KEY is not configured")
+    req = {"model": model, "messages": messages, "temperature": 0.2}
+    if response_json:
+        req["response_format"] = {"type": "json_object"}
+    async with httpx.AsyncClient(timeout=45.0) as client:
+        resp = await client.post(
+            "https://api.groq.com/openai/v1/chat/completions",
+            headers={"Authorization": f"Bearer {GROQ_API_KEY}"},
+            json=req,
+        )
+        resp.raise_for_status()
+        data = resp.json()
+    return data["choices"][0]["message"]["content"]
+
+
+async def _call_openai(model: str, messages: List[Dict[str, str]], response_json: bool = True) -> str:
+    # OpenAI-compatible chat endpoint; uses NEWS_OPENAI_API_KEY.
+    from news_mcp.config import OPENAI_API_KEY
+
+    if not OPENAI_API_KEY:
+        raise LLMError("OPENAI_API_KEY is not configured")
+    req = {"model": model, "messages": messages}
+    if response_json:
+        req["response_format"] = {"type": "json_object"}
+    async with httpx.AsyncClient(timeout=45.0) as client:
+        resp = await client.post(
+            "https://api.openai.com/v1/chat/completions",
+            headers={"Authorization": f"Bearer {OPENAI_API_KEY}"},
+            json=req,
+        )
+        resp.raise_for_status()
+        data = resp.json()
+    return data["choices"][0]["message"]["content"]
+
+
+async def call_llm(provider: str, model: str, system_prompt: str, user_prompt: str) -> str:
+    messages = [
+        {"role": "system", "content": system_prompt},
+        {"role": "user", "content": user_prompt},
+    ]
+    provider = provider.lower().strip()
+    if provider == "groq":
+        return await _call_groq(model, messages)
+    if provider == "openai":
+        return await _call_openai(model, messages)
+    raise LLMError(f"Unsupported provider: {provider}")
+
+
+def build_extraction_prompt(cluster: Dict[str, Any]) -> str:
+    prompt = load_prompt("extract_entities.prompt")
+    return _render_prompt(prompt, cluster_json=json.dumps(cluster, ensure_ascii=False))
+
+
+async def call_extraction(cluster: Dict[str, Any]) -> Dict[str, Any]:
+    user_prompt = build_extraction_prompt(cluster)
+    content = await call_llm(NEWS_EXTRACT_PROVIDER, NEWS_EXTRACT_MODEL, SYSTEM_PROMPT, user_prompt)
+    return json.loads(content)
+
+
+async def call_summary(cluster: Dict[str, Any]) -> Dict[str, Any]:
+    prompt = load_prompt("summarize_cluster.prompt")
+    user_prompt = _render_prompt(prompt, cluster_json=json.dumps(cluster, ensure_ascii=False))
+    content = await call_llm(NEWS_SUMMARY_PROVIDER, NEWS_SUMMARY_MODEL, "You are a summarization engine for news clusters. Return strict JSON only.", user_prompt)
+    return json.loads(content)

+ 6 - 1
news_mcp/mcp_server_fastmcp.py

@@ -8,8 +8,10 @@ from news_mcp.config import CLUSTERS_TTL_HOURS, DEFAULT_TOPICS, DB_PATH
 from news_mcp.config import NEWS_REFRESH_INTERVAL_SECONDS, NEWS_BACKGROUND_REFRESH_ENABLED, NEWS_BACKGROUND_REFRESH_ON_START
 from news_mcp.jobs.poller import refresh_clusters
 from news_mcp.storage.sqlite_store import SQLiteClusterStore
-from news_mcp.enrichment.groq_enrich import summarize_cluster_groq
+from news_mcp.enrichment.llm_enrich import summarize_cluster_groq
+from news_mcp.llm import active_llm_config
 from collections import Counter
+import logging
 
 
 mcp = FastMCP(
@@ -254,6 +256,8 @@ async def get_news_sentiment(entity: str, timeframe: str = "24h"):
 
 app = FastAPI(title="News MCP Server")
 
+logger = logging.getLogger("news_mcp.startup")
+
 app.mount("/mcp", mcp.sse_app())
 
 
@@ -268,6 +272,7 @@ async def _start_background_refresh():
     if not NEWS_BACKGROUND_REFRESH_ENABLED:
         return
     _background_task_started = True
+    logger.info("news-mcp llm config: %s", active_llm_config())
 
     async def _loop():
         if not NEWS_BACKGROUND_REFRESH_ON_START:

+ 26 - 0
prompts/extract_entities.prompt

@@ -0,0 +1,26 @@
+Input cluster JSON:
+{cluster_json}
+
+You MUST extract a news signal from the headline AND summary. Do not leave entities empty when the text mentions obvious names.
+Task:
+1) infer the best top-level topic
+2) extract concise entities from the cluster
+3) assign sentiment from the wording/context
+4) provide short keywords that justify the classification
+
+Entity rules (strict):
+- Use short strings (1-5 words).
+- Include all obvious named entities mentioned in headline or summary: people, countries, regions, organizations, ministries, presidents, leaders, wars/conflicts if named.
+- Also include finance/crypto entities when present: BTC, ETH, Bitcoin, Ethereum, ETF, SEC, ECB, Fed, euro, inflation, rates.
+- Prefer canonical entity forms over aliases when obvious (for example, use full organization or place names where helpful).
+- Do NOT return empty entities if any such names/places appear.
+
+Sentiment rules:
+- positive: clearly encouraging, improving, or supportive tone
+- negative: clearly alarming, worsening, severe, conflict, loss, risk, warning tone
+- neutral: factual, balanced, or mixed
+- sentimentScore must be a number from -1.0 to 1.0 and should reflect the sentiment label.
+
+Return STRICT JSON with EXACT keys only:
+{ topic, entities, sentiment, sentimentScore, keywords }
+where topic is one of [crypto, macro, regulation, ai, other].

+ 11 - 0
prompts/summarize_cluster.prompt

@@ -0,0 +1,11 @@
+Input cluster JSON:
+{cluster_json}
+
+Return strict JSON only with keys:
+headline, mergedSummary, keyFacts, sources
+
+Requirements:
+- mergedSummary should be 2-4 sentences.
+- keyFacts should be 5-8 short strings.
+- Preserve the most important facts and sources.
+- Keep the tone neutral and concise.

+ 44 - 0
test_news_mcp.py

@@ -6,6 +6,8 @@ from pathlib import Path
 from news_mcp.dedup.cluster import dedup_and_cluster_articles
 from news_mcp.storage.sqlite_store import SQLiteClusterStore
 from news_mcp.enrichment.importance import compute_importance
+from news_mcp.enrichment.llm_enrich import _filter_entities
+from news_mcp.llm import build_extraction_prompt, call_llm, load_prompt
 
 
 def _article(title: str, url: str = "https://example.com/x", source: str = "Src", ts: str = "Mon, 30 Mar 2026 12:00:00 GMT"):
@@ -74,6 +76,48 @@ def test_sqlite_summary_cache_roundtrip():
         assert cached["keyFacts"] == ["Fact 1"]
 
 
+def test_blacklist_filters_entities_case_insensitively():
+    entities = ["Bloomberg", "Reuters", "bloomberg", "CoinDesk"]
+    filtered = _filter_entities(entities, blacklist=["bloomberg"])
+    assert filtered == ["Reuters", "CoinDesk"]
+
+
+def test_load_prompt_reads_prompt_files():
+    text = load_prompt("extract_entities.prompt")
+    assert "Return STRICT JSON" in text
+
+
+def test_build_extraction_prompt_is_stable_without_blacklist():
+    cluster = {
+        "headline": "Bloomberg reports Bitcoin rallies after US rate comments",
+        "summary": "A report from Bloomberg says Bitcoin moved higher after comments from the Fed.",
+        "articles": [],
+    }
+    prompt = build_extraction_prompt(cluster)
+    assert "Bloomberg reports Bitcoin rallies" in prompt
+    assert "Do NOT return empty entities" in prompt
+    assert "Bloomberg" in prompt  # present in the input, not filtered here
+
+
+def test_call_llm_dispatches_to_selected_provider(monkeypatch):
+    async def fake_groq(model, messages, response_json=True):
+        return '{"ok": true, "provider": "groq"}'
+
+    async def fake_openai(model, messages, response_json=True):
+        return '{"ok": true, "provider": "openai"}'
+
+    monkeypatch.setattr("news_mcp.llm._call_groq", fake_groq)
+    monkeypatch.setattr("news_mcp.llm._call_openai", fake_openai)
+
+    import asyncio
+
+    groq = asyncio.run(call_llm("groq", "x", "sys", "user"))
+    openai = asyncio.run(call_llm("openai", "x", "sys", "user"))
+
+    assert '"provider": "groq"' in groq
+    assert '"provider": "openai"' in openai
+
+
 def test_importance_prefers_llm_signal():
     # Two clusters with same coverage but different sentiment magnitude.
     base = {