Ver Fonte

added openrouter, ORDER BY fix

Lukas Goldschmidt há 1 dia atrás
pai
commit
b861031e6c
7 ficheiros alterados com 257 adições e 39 exclusões
  1. 6 5
      PROJECT.md
  2. 1 1
      README.md
  3. 14 6
      news_mcp/config.py
  4. 29 8
      news_mcp/jobs/poller.py
  5. 32 5
      news_mcp/llm.py
  6. 11 14
      news_mcp/storage/sqlite_store.py
  7. 164 0
      test_news_mcp.py

+ 6 - 5
PROJECT.md

@@ -18,12 +18,13 @@ Provide a signal-extraction MCP server that converts RSS into **deduplicated, en
 - Tools expose semantic queries over cached clusters
 
 ## MCP tools (current)
-- `get_latest_events(topic, limit)`
-- `get_events_for_entity(entity, limit)`
-- `get_events_for_entity(entity, limit, timeframe)`
-- `get_event_summary(event_id)`
+- `get_latest_events(topic, limit, include_articles)`
+- `get_events_for_entity(entity, limit, timeframe, include_articles)`
+- `get_event_summary(event_id, include_articles)`
 - `detect_emerging_topics(limit)`
-- `get_related_entities(subject, timeframe, limit)`
+- `get_news_sentiment(entity, timeframe)`
+- `get_related_recent_entities(subject, timeframe, limit, include_trends)`
+- `get_capabilities()`
 
 ## Refresh & caching
 

+ 1 - 1
README.md

@@ -72,7 +72,7 @@ Key variables:
 - `NEWS_EXTRACT_PROVIDER`, `NEWS_EXTRACT_MODEL`
 - `NEWS_SUMMARY_PROVIDER`, `NEWS_SUMMARY_MODEL`
 - `GROQ_API_KEY`, `OPENAI_API_KEY`
-- `ENTITY_BLACKLIST` (comma-separated, case-insensitive exact entity match)
+- `ENTITY_BLACKLIST` (comma-separated, case-insensitive patterns; wildcards are supported)
 - `NEWS_PROMPTS_DIR` (override prompt directory)
 - `NEWS_ENTITY_ALIASES_FILE` (override entity alias JSON file)
 - `NEWS_FEED_URL` (single feed fallback)

+ 14 - 6
news_mcp/config.py

@@ -20,18 +20,26 @@ RSS_FEED_URLS = NEWS_FEED_URLS
 DEFAULT_LOOKBACK_HOURS = float(os.getenv("NEWS_DEFAULT_LOOKBACK_HOURS", os.getenv("NEWS_CLUSTERS_TTL_HOURS", "24")))
 DEFAULT_TOPICS = ["crypto", "macro", "regulation", "ai", "other"]
 
-# LLM extraction / summarization
+# LLM API keys (provider-specific keys only)
 GROQ_API_KEY = os.getenv("GROQ_API_KEY")
 OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
+OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY")
+
+# LLM provider/model selection (provider-agnostic)
 NEWS_EXTRACT_PROVIDER = os.getenv("NEWS_EXTRACT_PROVIDER", "groq")
-NEWS_EXTRACT_MODEL = os.getenv("NEWS_EXTRACT_MODEL", os.getenv("GROQ_MODEL", "llama4-16e"))
+NEWS_EXTRACT_MODEL = os.getenv("NEWS_EXTRACT_MODEL", "llama4-16e")
 NEWS_SUMMARY_PROVIDER = os.getenv("NEWS_SUMMARY_PROVIDER", "groq")
-NEWS_SUMMARY_MODEL = os.getenv("NEWS_SUMMARY_MODEL", os.getenv("GROQ_MODEL", "llama4-16e"))
-GROQ_DEBUG = os.getenv("GROQ_DEBUG", "false").lower() == "true"
+NEWS_SUMMARY_MODEL = os.getenv("NEWS_SUMMARY_MODEL", "llama4-16e")
+
+# LLM behavior
+LLM_DEBUG = os.getenv("LLM_DEBUG", "false").lower() == "true"
 
 NEWS_ENTITY_BLACKLIST = [x.strip().lower() for x in os.getenv("ENTITY_BLACKLIST", "").split(",") if x.strip()]
-GROQ_ENRICH_OTHER_ONLY = os.getenv("GROQ_ENRICH_OTHER_ONLY", "false").lower() == "true"
-GROQ_MAX_CLUSTERS_PER_REFRESH = int(os.getenv("GROQ_MAX_CLUSTERS_PER_REFRESH", "20"))
+
+# Enrichment: 0 = no limit (enrich every cluster); >0 caps per refresh cycle
+ENRICHMENT_MAX_PER_REFRESH = int(os.getenv("ENRICHMENT_MAX_PER_REFRESH", "0"))
+# When true, only the "other" topic gets LLM enrichment (legacy guard)
+ENRICH_OTHER_TOPICS_ONLY = os.getenv("ENRICH_OTHER_TOPICS_ONLY", "false").lower() == "true"
 
 # Optional embeddings path (Ollama first when enabled, fallback otherwise).
 NEWS_EMBEDDINGS_ENABLED = os.getenv("NEWS_EMBEDDINGS_ENABLED", "false").lower() == "true"

+ 29 - 8
news_mcp/jobs/poller.py

@@ -14,8 +14,8 @@ from news_mcp.sources.news_feeds import fetch_news_articles
 from news_mcp.storage.sqlite_store import SQLiteClusterStore
 
 from news_mcp.config import (
-    GROQ_ENRICH_OTHER_ONLY,
-    GROQ_MAX_CLUSTERS_PER_REFRESH,
+    ENRICH_OTHER_TOPICS_ONLY,
+    ENRICHMENT_MAX_PER_REFRESH,
     NEWS_PRUNE_INTERVAL_HOURS,
     NEWS_PRUNING_ENABLED,
     NEWS_RETENTION_DAYS,
@@ -44,6 +44,14 @@ async def refresh_clusters(topic: str | None = None, limit: int = 80) -> None:
     prev_hash = store.get_feed_hash(feed_key)
     if prev_hash == last_hash:
         logger.info("refresh unchanged feed_key=%s topic=%s", feed_key, topic)
+        store.set_meta("last_refresh_at", datetime.now(timezone.utc).isoformat())
+        prune_result = store.prune_if_due(
+            pruning_enabled=NEWS_PRUNING_ENABLED,
+            retention_days=NEWS_RETENTION_DAYS,
+            interval_hours=NEWS_PRUNE_INTERVAL_HOURS,
+        )
+        logger.info("refresh prune_result=%s", prune_result)
+        return
     else:
         logger.info("refresh changed feed_key=%s topic=%s", feed_key, topic)
         store.set_feed_hash(feed_key, last_hash)
@@ -54,13 +62,21 @@ async def refresh_clusters(topic: str | None = None, limit: int = 80) -> None:
         if topic and t != topic:
             continue
         enriched = []
-        # Always compute cheap enrichment first.
-        for idx, c in enumerate(clusters[:GROQ_MAX_CLUSTERS_PER_REFRESH]):
+
+        # Determine how many clusters to LLM-enrich.
+        # ENRICHMENT_MAX_PER_REFRESH=0 means enrich every cluster (no cap).
+        enrich_limit = ENRICHMENT_MAX_PER_REFRESH or len(clusters)
+
+        # Track whether the LLM pipeline is available for this topic.
+        _llm_enabled_for_topic = (
+            (not ENRICH_OTHER_TOPICS_ONLY) or (t == "other")
+        )
+
+        for idx, c in enumerate(clusters[:enrich_limit]):
             c2 = enrich_cluster(c)
 
-            # Groq enrichment only when configured.
-            if (not GROQ_ENRICH_OTHER_ONLY) or (t == "other"):
-                # Cache Groq: if we already have entities/sentiment for this cluster, skip.
+            if _llm_enabled_for_topic:
+                # Cache: if we already have entities/sentiment for this cluster, skip LLM call.
                 existing = store.get_cluster_by_id(c2.get("cluster_id"))
                 if existing and existing.get("entities"):
                     c2 = dict(c2)
@@ -83,7 +99,12 @@ async def refresh_clusters(topic: str | None = None, limit: int = 80) -> None:
                     if existing.get("keywords"):
                         c2["keywords"] = existing.get("keywords")
                 else:
-                    c2 = await classify_cluster_llm(c2)
+                    try:
+                        c2 = await classify_cluster_llm(c2)
+                    except Exception:
+                        logger.exception("LLM enrichment failed for cluster %s (topic %s)", c2.get("cluster_id"), t)
+                        # Mark so we can retry on next refresh.
+                        c2["enrichment_failed_at"] = datetime.now(timezone.utc).isoformat()
 
             enriched.append(c2)
 

+ 32 - 5
news_mcp/llm.py

@@ -7,10 +7,13 @@ from typing import Any, Dict, Iterable, List
 import httpx
 
 from news_mcp.config import (
+    GROQ_API_KEY,
     NEWS_EXTRACT_PROVIDER,
     NEWS_EXTRACT_MODEL,
     NEWS_SUMMARY_PROVIDER,
     NEWS_SUMMARY_MODEL,
+    OPENAI_API_KEY,
+    OPENROUTER_API_KEY,
     PROMPTS_DIR,
 )
 
@@ -40,12 +43,11 @@ def active_llm_config() -> dict[str, str]:
         "extract_model": NEWS_EXTRACT_MODEL,
         "summary_provider": NEWS_SUMMARY_PROVIDER,
         "summary_model": NEWS_SUMMARY_MODEL,
+        "openrouter_key_set": bool(OPENROUTER_API_KEY),
     }
 
 
 async def _call_groq(model: str, messages: List[Dict[str, str]], response_json: bool = True) -> str:
-    from news_mcp.config import GROQ_API_KEY
-
     if not GROQ_API_KEY:
         raise LLMError("GROQ_API_KEY is not configured")
     req = {"model": model, "messages": messages, "temperature": 0.2}
@@ -64,8 +66,6 @@ async def _call_groq(model: str, messages: List[Dict[str, str]], response_json:
 
 async def _call_openai(model: str, messages: List[Dict[str, str]], response_json: bool = True) -> str:
     # OpenAI-compatible chat endpoint; uses NEWS_OPENAI_API_KEY.
-    from news_mcp.config import OPENAI_API_KEY
-
     if not OPENAI_API_KEY:
         raise LLMError("OPENAI_API_KEY is not configured")
     req = {"model": model, "messages": messages}
@@ -82,6 +82,31 @@ async def _call_openai(model: str, messages: List[Dict[str, str]], response_json
     return data["choices"][0]["message"]["content"]
 
 
+OR_OPENROUTER_URL = "https://openrouter.ai/api/v1/chat/completions"
+
+
+async def _call_openrouter(model: str, messages: List[Dict[str, str]], response_json: bool = True) -> str:
+    if not OPENROUTER_API_KEY:
+        raise LLMError("OPENROUTER_API_KEY is not configured")
+    req = {"model": model, "messages": messages, "temperature": 0.2}
+    if response_json:
+        req["response_format"] = {"type": "json_object"}
+    headers = {
+        "Authorization": f"Bearer {OPENROUTER_API_KEY}",
+        "HTTP-Referer": "https://github.com/gr1m0/bolt.new-rss",
+        "X-Title": "news-mcp",
+    }
+    async with httpx.AsyncClient(timeout=45.0) as client:
+        resp = await client.post(
+            OR_OPENROUTER_URL,
+            headers=headers,
+            json=req,
+        )
+        resp.raise_for_status()
+        data = resp.json()
+    return data["choices"][0]["message"]["content"]
+
+
 async def call_llm(provider: str, model: str, system_prompt: str, user_prompt: str) -> str:
     messages = [
         {"role": "system", "content": system_prompt},
@@ -92,7 +117,9 @@ async def call_llm(provider: str, model: str, system_prompt: str, user_prompt: s
         return await _call_groq(model, messages)
     if provider == "openai":
         return await _call_openai(model, messages)
-    raise LLMError(f"Unsupported provider: {provider}")
+    if provider == "openrouter":
+        return await _call_openrouter(model, messages)
+    raise LLMError(f"Unsupported provider: {provider}. Valid: groq, openai, openrouter")
 
 
 def build_extraction_prompt(cluster: Dict[str, Any]) -> str:

+ 11 - 14
news_mcp/storage/sqlite_store.py

@@ -198,17 +198,11 @@ class SQLiteClusterStore:
         now = datetime.now(timezone.utc).isoformat()
         with self._conn() as conn:
             conn.execute(
-                "INSERT INTO clusters(cluster_id, topic, payload, updated_at, summary_payload, summary_updated_at) "
-                "VALUES(?,?,?,?,?,?) "
-                "ON CONFLICT(cluster_id) DO UPDATE SET "
-                "summary_payload=excluded.summary_payload, summary_updated_at=excluded.summary_updated_at",
+                "UPDATE clusters SET summary_payload=?, summary_updated_at=? WHERE cluster_id=?",
                 (
-                    cluster_id,
-                    "",  # topic not used for update
-                    json.dumps({}, ensure_ascii=False),
-                    now,
                     json.dumps(summary_payload, ensure_ascii=False),
                     now,
+                    cluster_id,
                 ),
             )
 
@@ -257,11 +251,10 @@ class SQLiteClusterStore:
             except Exception:
                 return None
 
-        # Pull a wider candidate set, then filter by payload.timestamp.
         with self._conn() as conn:
             cur = conn.execute(
-                "SELECT payload FROM clusters WHERE topic=? LIMIT ?",
-                (topic, int(max(200, limit) * 10)),
+                "SELECT payload FROM clusters WHERE topic=? ORDER BY updated_at DESC",
+                (topic,),
             )
             candidates = [json.loads(r[0]) for r in cur.fetchall()]
 
@@ -303,8 +296,7 @@ class SQLiteClusterStore:
 
         with self._conn() as conn:
             cur = conn.execute(
-                "SELECT payload FROM clusters LIMIT ?",
-                (int(max(500, limit) * 10),),
+                "SELECT payload FROM clusters ORDER BY updated_at DESC",
             )
             candidates = [json.loads(r[0]) for r in cur.fetchall()]
 
@@ -407,7 +399,12 @@ class SQLiteClusterStore:
             return None
         with self._conn() as conn:
             cur = conn.execute(
-                "SELECT entity_id, canonical_label, mid, sources_json, updated_at, last_requested_at FROM entity_metadata WHERE normalized_label=?",
+                "SELECT entity_id, canonical_label, mid, sources_json, updated_at, last_requested_at "
+                "FROM entity_metadata "
+                "WHERE normalized_label=? "
+                "ORDER BY CASE WHEN mid IS NOT NULL THEN 0 ELSE 1 END, "
+                "COALESCE(last_requested_at, updated_at) DESC, updated_at DESC "
+                "LIMIT 1",
                 (normalized_label,),
             )
             row = cur.fetchone()

+ 164 - 0
test_news_mcp.py

@@ -79,6 +79,24 @@ def test_sqlite_summary_cache_roundtrip():
         assert cached["keyFacts"] == ["Fact 1"]
 
 
+def test_sqlite_summary_cache_does_not_create_placeholder_row():
+    with tempfile.TemporaryDirectory() as td:
+        db = Path(td) / "news.sqlite"
+        store = SQLiteClusterStore(db)
+        store.upsert_cluster_summary(
+            "missing",
+            {
+                "headline": "Missing",
+                "mergedSummary": "Summary",
+                "keyFacts": [],
+                "sources": [],
+            },
+        )
+
+        assert store.get_cluster_by_id("missing") is None
+        assert store.get_cluster_summary("missing", ttl_hours=24) is None
+
+
 def test_prune_clusters_deletes_rows_older_than_retention():
     with tempfile.TemporaryDirectory() as td:
         db = Path(td) / "news.sqlite"
@@ -144,6 +162,54 @@ def test_prune_if_due_skips_deletes_when_pruning_disabled():
         assert store.get_cluster_by_id("stale") is not None
 
 
+def test_get_latest_clusters_orders_by_updated_at_before_limit():
+    with tempfile.TemporaryDirectory() as td:
+        db = Path(td) / "news.sqlite"
+        store = SQLiteClusterStore(db)
+        store.upsert_clusters(
+            [
+                {
+                    "cluster_id": "old",
+                    "headline": "Old",
+                    "summary": "Old summary",
+                    "entities": ["Iran"],
+                    "timestamp": "Wed, 01 Apr 2026 09:00:00 GMT",
+                    "articles": [],
+                },
+                {
+                    "cluster_id": "new",
+                    "headline": "New",
+                    "summary": "New summary",
+                    "entities": ["Bitcoin"],
+                    "timestamp": "Wed, 01 Apr 2026 11:00:00 GMT",
+                    "articles": [],
+                },
+            ],
+            topic="crypto",
+        )
+
+        with store._conn() as conn:
+            conn.execute("UPDATE clusters SET updated_at=? WHERE cluster_id=?", ("2025-01-01T00:00:00+00:00", "new"))
+            conn.execute("UPDATE clusters SET updated_at=? WHERE cluster_id=?", ("2026-01-01T00:00:00+00:00", "old"))
+
+        latest = store.get_latest_clusters(topic="crypto", ttl_hours=24 * 365, limit=1)
+        assert len(latest) == 1
+        assert latest[0]["cluster_id"] == "new"
+
+
+def test_get_entity_metadata_prefers_mid_scoped_row():
+    with tempfile.TemporaryDirectory() as td:
+        db = Path(td) / "news.sqlite"
+        store = SQLiteClusterStore(db)
+        store.upsert_entity_metadata("Bitcoin", canonical_label="Bitcoin", mid=None, sources=["local"])
+        store.upsert_entity_metadata("Bitcoin", canonical_label="Bitcoin", mid="/m/Bitcoin", sources=["trends"])
+        store.record_entity_request("Bitcoin", mid="/m/Bitcoin")
+
+        meta = store.get_entity_metadata("Bitcoin")
+        assert meta is not None
+        assert meta["mid"] == "/m/Bitcoin"
+
+
 def test_blacklist_filters_entities_case_insensitively():
     entities = ["Bloomberg", "Reuters", "bloomberg", "CoinDesk"]
     filtered = _filter_entities(entities, blacklist=["bloomberg"])
@@ -231,6 +297,104 @@ def test_call_llm_dispatches_to_selected_provider(monkeypatch):
     assert '"provider": "openai"' in openai
 
 
+def test_refresh_skips_reprocessing_when_feed_hash_is_unchanged(monkeypatch):
+    import news_mcp.jobs.poller as poller
+    import hashlib
+    from news_mcp.config import NEWS_FEED_URL, NEWS_FEED_URLS
+
+    calls = {"fetch": 0, "cluster": 0, "enrich": 0, "classify": 0}
+    rss_urls = [u.strip() for u in NEWS_FEED_URLS.split(",") if u.strip()] or [NEWS_FEED_URL]
+    material = "\n".join(
+        [
+            "Bitcoin rallies|https://example.com/a|Wed, 01 Apr 2026 12:00:00 GMT",
+        ]
+    )
+    expected_hash = hashlib.sha1(material.encode("utf-8")).hexdigest()
+
+    async def fake_to_thread(fn, limit):
+        calls["fetch"] += 1
+        return [
+            {
+                "title": "Bitcoin rallies",
+                "url": "https://example.com/a",
+                "source": "Src",
+                "timestamp": "Wed, 01 Apr 2026 12:00:00 GMT",
+                "summary": "summary",
+            }
+        ]
+
+    def fake_cluster(articles):
+        calls["cluster"] += 1
+        return {
+            "crypto": [
+                {
+                    "cluster_id": "cid",
+                    "headline": "Bitcoin rallies",
+                    "summary": "summary",
+                    "entities": [],
+                    "sentiment": "neutral",
+                    "importance": 0.0,
+                    "sources": ["Src"],
+                    "timestamp": "Wed, 01 Apr 2026 12:00:00 GMT",
+                    "articles": [],
+                }
+            ]
+        }
+
+    def fake_enrich(cluster):
+        calls["enrich"] += 1
+        return cluster
+
+    async def fake_classify(cluster):
+        calls["classify"] += 1
+        return cluster
+
+    class DummyStore:
+        def __init__(self, *args, **kwargs):
+            self.meta = {}
+            self.feed_hash = expected_hash
+
+        def get_feed_hash(self, feed_key):
+            return self.feed_hash
+
+        def set_feed_hash(self, feed_key, last_hash):
+            self.feed_hash = last_hash
+
+        def get_cluster_by_id(self, cluster_id):
+            return None
+
+        def upsert_clusters(self, clusters, topic):
+            self.meta["upserted"] = (len(clusters), topic)
+
+        def prune_if_due(self, **kwargs):
+            self.meta["prune"] = kwargs
+            return {"deleted": 0}
+
+        def set_meta(self, key, value):
+            self.meta[key] = value
+
+    monkeypatch.setattr(poller, "SQLiteClusterStore", DummyStore)
+    monkeypatch.setattr(poller, "fetch_news_articles", lambda limit: [{"title": "Bitcoin rallies", "url": "https://example.com/a", "timestamp": "Wed, 01 Apr 2026 12:00:00 GMT"}])
+    monkeypatch.setattr(poller.asyncio, "to_thread", fake_to_thread)
+    monkeypatch.setattr(poller, "dedup_and_cluster_articles", fake_cluster)
+    monkeypatch.setattr(poller, "enrich_cluster", fake_enrich)
+    monkeypatch.setattr(poller, "classify_cluster_llm", fake_classify)
+
+    poller.store = None
+
+    async def run_once():
+        await poller.refresh_clusters(topic=None, limit=80)
+
+    import asyncio
+
+    asyncio.run(run_once())
+
+    assert calls["fetch"] == 1
+    assert calls["cluster"] == 0
+    assert calls["enrich"] == 0
+    assert calls["classify"] == 0
+
+
 def test_importance_prefers_llm_signal():
     # Two clusters with same coverage but different sentiment magnitude.
     base = {