소스 검색

added provider test

Lukas Goldschmidt 2 주 전
부모
커밋
14ab064bec
10개의 변경된 파일128개의 추가작업 그리고 40개의 파일을 삭제
  1. 2 1
      .env.example
  2. 1 0
      README.md
  3. 4 1
      dashboard/dashboard.js
  4. 1 0
      dashboard/index.html
  5. 1 0
      news_mcp/config.py
  6. 8 2
      news_mcp/dashboard/dashboard_store.py
  7. 32 16
      news_mcp/jobs/poller.py
  8. 7 7
      news_mcp/sources/news_feeds.py
  9. 16 13
      news_mcp/storage/sqlite_store.py
  10. 56 0
      provider_test.sh

+ 2 - 1
.env.example

@@ -1,6 +1,6 @@
 # News MCP configuration example
 
-# LLM selection
+# Provider / model selection
 NEWS_EXTRACT_PROVIDER=groq
 NEWS_EXTRACT_MODEL=llama4-16e
 NEWS_SUMMARY_PROVIDER=groq
@@ -25,6 +25,7 @@ NEWS_EMBEDDING_SIMILARITY_THRESHOLD=0.885
 # Feeds
 NEWS_FEED_URL=https://breakingthenews.net/news-feed.xml
 NEWS_FEED_URLS=
+NEWS_FEED_ITEMS_PER_POLL=50
 
 # Storage / refresh
 NEWS_MCP_DATA_DIR=

+ 1 - 0
README.md

@@ -77,6 +77,7 @@ Key variables:
 - `NEWS_ENTITY_ALIASES_FILE` (override entity alias JSON file)
 - `NEWS_FEED_URL` (single feed fallback)
 - `NEWS_FEED_URLS` (comma-separated feed URLs; overrides `NEWS_FEED_URL`)
+- `NEWS_FEED_ITEMS_PER_POLL` (per-feed fetch cap per poll; default 50)
 - `NEWS_REFRESH_INTERVAL_SECONDS` (default 900)
 - `NEWS_BACKGROUND_REFRESH_ON_START` (default true)
 - `NEWS_BACKGROUND_REFRESH_ENABLED` (default true)

+ 4 - 1
dashboard/dashboard.js

@@ -45,6 +45,7 @@ async function loadHealth() {
     var d = await res.json();
     var sc = $('stat-clusters'); if (sc) sc.textContent = d.total_clusters;
     var se = $('stat-entities'); if (se) se.textContent = d.total_entities;
+    var sce = $('stat-cluster-entities'); if (sce) sce.textContent = d.cluster_entities;
     var sf = $('stat-fresh');
     if (sf) {
       if (d.data_fresh) { sf.textContent = 'Fresh'; sf.className = 'value green'; }
@@ -94,7 +95,9 @@ function renderFeedStatus(feeds) {
   var html = '';
   for (var k in feeds) {
     var v = feeds[k];
-    html += '<div class="feed-item"><span>' + esc(k.replace(/_/g,' ')) + '</span><span class="muted">' + (v.updated_at ? new Date(v.updated_at).toLocaleString() : 'n/a') + '</span></div>';
+    var label = k.replace(/^https?:\/\//, '').replace(/\/$/, '');
+    var count = (v.last_item_count == null) ? 'n/a' : String(v.last_item_count);
+    html += '<div class="feed-item"><span>' + esc(label) + '</span><span class="badge">' + esc(count) + ' items</span></div>';
   }
   el.innerHTML = html;
 }

+ 1 - 0
dashboard/index.html

@@ -30,6 +30,7 @@
     <div id="health-stats" class="stat-grid">
       <div class="stat-box"><div class="label">Total Clusters</div><div class="value blue" id="stat-clusters">—</div></div>
       <div class="stat-box"><div class="label">Total Entities</div><div class="value blue" id="stat-entities">—</div></div>
+      <div class="stat-box"><div class="label">Extracted Entities</div><div class="value blue" id="stat-cluster-entities">—</div></div>
       <div class="stat-box"><div class="label">Data Fresh</div><div class="value" id="stat-fresh">—</div></div>
       <div class="stat-box"><div class="label">Last Refresh</div><div class="value" style="font-size:1rem" id="stat-refresh">—</div></div>
     </div>

+ 1 - 0
news_mcp/config.py

@@ -16,6 +16,7 @@ NEWS_FEED_URL = os.getenv("NEWS_FEED_URL", os.getenv("NEWS_RSS_FEED_URL", "https
 NEWS_FEED_URLS = os.getenv("NEWS_FEED_URLS", os.getenv("NEWS_RSS_FEED_URLS", "")).strip()
 RSS_FEED_URL = NEWS_FEED_URL
 RSS_FEED_URLS = NEWS_FEED_URLS
+NEWS_FEED_ITEMS_PER_POLL = int(os.getenv("NEWS_FEED_ITEMS_PER_POLL", "50"))
 
 DEFAULT_LOOKBACK_HOURS = float(os.getenv("NEWS_DEFAULT_LOOKBACK_HOURS", os.getenv("NEWS_CLUSTERS_TTL_HOURS", "24")))
 DEFAULT_TOPICS = ["crypto", "macro", "regulation", "ai", "other"]

+ 8 - 2
news_mcp/dashboard/dashboard_store.py

@@ -29,6 +29,10 @@ class DashboardStore:
         with self._store._conn() as conn:
             total_clusters = conn.execute("SELECT COUNT(*) FROM clusters").fetchone()[0]
             total_entities = conn.execute("SELECT COUNT(*) FROM entity_metadata").fetchone()[0]
+            cluster_entities = conn.execute(
+                "SELECT COUNT(DISTINCT e.value) "
+                "FROM clusters, json_each(clusters.payload, '$.entities') AS e"
+            ).fetchone()[0]
             topic_counts = dict(conn.execute(
                 "SELECT topic, COUNT(*) FROM clusters GROUP BY topic"
             ).fetchall())
@@ -50,17 +54,19 @@ class DashboardStore:
 
         feeds = {}
         with self._store._conn() as conn:
-            for row in conn.execute("SELECT feed_key, last_hash, updated_at FROM feed_state"):
-                feeds[row[0]] = {"last_hash": row[1], "updated_at": row[2]}
+            for row in conn.execute("SELECT feed_key, last_hash, last_item_count, updated_at FROM feed_state ORDER BY updated_at DESC"):
+                feeds[row[0]] = {"last_hash": row[1], "last_item_count": row[2], "updated_at": row[3]}
 
         return {
             "total_clusters": total_clusters,
             "total_entities": total_entities,
+            "cluster_entities": cluster_entities,
             "clusters_by_topic": topic_counts,
             "last_refresh_at": last_refresh,
             "last_prune_at": last_prune,
             "data_fresh": fresh,
             "feeds": feeds,
+            "feed_count": len(feeds),
             "pruning": {
                 "enabled": NEWS_PRUNING_ENABLED,
                 "retention_days": NEWS_RETENTION_DAYS,

+ 32 - 16
news_mcp/jobs/poller.py

@@ -2,6 +2,7 @@ from __future__ import annotations
 
 import asyncio
 import logging
+from collections import defaultdict
 from datetime import datetime, timezone
 from typing import Any, Dict
 
@@ -30,20 +31,36 @@ async def refresh_clusters(topic: str | None = None, limit: int = 80) -> None:
     articles = await asyncio.to_thread(fetch_news_articles, limit)
     logger.info("refresh fetched articles=%s", len(articles))
 
-    # Skip expensive work if the feed content (titles/urls/timestamps) didn't change.
+    # Drop legacy aggregate feed-state rows so the dashboard only reflects
+    # real per-feed poll status from this point forward.
+    with store._conn() as conn:
+        conn.execute("DELETE FROM feed_state WHERE feed_key LIKE 'newsfeeds:%'")
+
+    # Track feed freshness per RSS URL so unchanged feeds can be skipped.
     import hashlib
-    rss_urls = [u.strip() for u in NEWS_FEED_URLS.split(",") if u.strip()]
-    if not rss_urls:
-        rss_urls = [NEWS_FEED_URL]
-    feed_key = "newsfeeds:" + hashlib.sha1(",".join(rss_urls).encode("utf-8")).hexdigest()
-    material = "\n".join(
-        f"{a.get('title','')}|{a.get('url','')}|{a.get('timestamp','')}"
-        for a in articles
-    )
-    last_hash = hashlib.sha1(material.encode("utf-8")).hexdigest()
-    prev_hash = store.get_feed_hash(feed_key)
-    if prev_hash == last_hash:
-        logger.info("refresh unchanged feed_key=%s topic=%s", feed_key, topic)
+    per_feed: dict[str, list[dict[str, Any]]] = defaultdict(list)
+    for article in articles:
+        feed_url = str(article.get("feed_url") or NEWS_FEED_URL).strip() or NEWS_FEED_URL
+        per_feed[feed_url].append(article)
+
+    changed_articles: list[dict[str, Any]] = []
+    for feed_url, feed_articles in per_feed.items():
+        material = "\n".join(
+            f"{a.get('title','')}|{a.get('url','')}|{a.get('timestamp','')}"
+            for a in feed_articles
+        )
+        last_hash = hashlib.sha1(material.encode("utf-8")).hexdigest()
+        feed_key = feed_url
+        prev_hash = store.get_feed_hash(feed_key)
+        if prev_hash == last_hash:
+            logger.info("refresh unchanged feed_url=%s count=%s topic=%s", feed_url, len(feed_articles), topic)
+        else:
+            logger.info("refresh changed feed_url=%s count=%s topic=%s", feed_url, len(feed_articles), topic)
+            store.set_feed_state(feed_key, last_hash, len(feed_articles))
+            changed_articles.extend(feed_articles)
+
+    if not changed_articles:
+        logger.info("refresh unchanged all feeds topic=%s", topic)
         store.set_meta("last_refresh_at", datetime.now(timezone.utc).isoformat())
         prune_result = store.prune_if_due(
             pruning_enabled=NEWS_PRUNING_ENABLED,
@@ -52,9 +69,8 @@ async def refresh_clusters(topic: str | None = None, limit: int = 80) -> None:
         )
         logger.info("refresh prune_result=%s", prune_result)
         return
-    else:
-        logger.info("refresh changed feed_key=%s topic=%s", feed_key, topic)
-        store.set_feed_hash(feed_key, last_hash)
+
+    articles = changed_articles
     clustered_by_topic = dedup_and_cluster_articles(articles)
     logger.info("refresh clustered topics=%s", list(clustered_by_topic.keys()))
 

+ 7 - 7
news_mcp/sources/news_feeds.py

@@ -9,7 +9,7 @@ from urllib.request import Request, urlopen
 
 import feedparser
 
-from news_mcp.config import NEWS_FEED_URL, NEWS_FEED_URLS
+from news_mcp.config import NEWS_FEED_ITEMS_PER_POLL, NEWS_FEED_URL, NEWS_FEED_URLS
 
 
 logger = logging.getLogger(__name__)
@@ -45,14 +45,14 @@ def _fetch_feed(feed_url: str):
         return feedparser.parse(resp.read())
 
 
-def fetch_news_articles(limit: int = 50) -> List[Dict[str, Any]]:
+def fetch_news_articles(limit: int = NEWS_FEED_ITEMS_PER_POLL) -> List[Dict[str, Any]]:
     feed_urls = _feed_urls()
     articles: List[Dict[str, Any]] = []
 
     logger.info("news ingestion start feeds=%s limit=%s timeout_s=%s", len(feed_urls), limit, FEED_FETCH_TIMEOUT_SECONDS)
 
-    # Evenly pull from feeds; keep total below `limit`.
-    per_feed_limit = max(1, int(limit / max(1, len(feed_urls))))
+    # Apply the configured cap per feed.
+    per_feed_limit = max(1, int(limit))
 
     for feed_url in feed_urls:
         try:
@@ -88,9 +88,9 @@ def fetch_news_articles(limit: int = 50) -> List[Dict[str, Any]]:
                 }
             )
 
-            if len(articles) >= limit:
-                logger.info("news ingestion limit reached feed_url=%s total_kept=%s", feed_url, len(articles))
-                return articles
+            if len(articles) - kept_before >= per_feed_limit:
+                logger.info("news ingestion per-feed limit reached feed_url=%s kept=%s", feed_url, len(articles) - kept_before)
+                break
 
         logger.info(
             "news feed completed feed_url=%s kept=%s",

+ 16 - 13
news_mcp/storage/sqlite_store.py

@@ -168,10 +168,15 @@ class SQLiteClusterStore:
                 CREATE TABLE IF NOT EXISTS feed_state (
                   feed_key TEXT PRIMARY KEY,
                   last_hash TEXT NOT NULL,
+                  last_item_count INTEGER,
                   updated_at TEXT NOT NULL
                 )
                 """
             )
+            try:
+                conn.execute("ALTER TABLE feed_state ADD COLUMN last_item_count INTEGER")
+            except sqlite3.OperationalError:
+                pass
 
             conn.execute(
                 """
@@ -335,12 +340,15 @@ class SQLiteClusterStore:
             return row[0] if row else None
 
     def set_feed_hash(self, feed_key: str, last_hash: str) -> None:
+        self.set_feed_state(feed_key, last_hash, None)
+
+    def set_feed_state(self, feed_key: str, last_hash: str, last_item_count: int | None = None) -> None:
         now = datetime.now(timezone.utc).isoformat()
         with self._conn() as conn:
             conn.execute(
-                "INSERT INTO feed_state(feed_key, last_hash, updated_at) VALUES(?,?,?) "
-                "ON CONFLICT(feed_key) DO UPDATE SET last_hash=excluded.last_hash, updated_at=excluded.updated_at",
-                (feed_key, last_hash, now),
+                "INSERT INTO feed_state(feed_key, last_hash, last_item_count, updated_at) VALUES(?,?,?,?) "
+                "ON CONFLICT(feed_key) DO UPDATE SET last_hash=excluded.last_hash, last_item_count=excluded.last_item_count, updated_at=excluded.updated_at",
+                (feed_key, last_hash, last_item_count, now),
             )
 
     def get_feed_state(self, feed_key: str) -> dict | None:
@@ -355,18 +363,13 @@ class SQLiteClusterStore:
             return {"last_hash": row[0], "updated_at": row[1]}
 
     def get_all_feed_states(self) -> list[dict[str, Any]]:
-        """All feed_state rows.
-
-        The live writer keys feed state as ``newsfeeds:<sha1(comma_joined_urls)>``,
-        so a hardcoded literal lookup never matches when more than one feed is
-        configured. Use this for surfacing health information.
-        """
+        """All feed_state rows."""
         with self._conn() as conn:
             cur = conn.execute(
-                "SELECT feed_key, last_hash, updated_at FROM feed_state ORDER BY updated_at DESC"
+                "SELECT feed_key, last_hash, last_item_count, updated_at FROM feed_state ORDER BY updated_at DESC"
             )
             return [
-                {"feed_key": row[0], "last_hash": row[1], "updated_at": row[2]}
+                {"feed_key": row[0], "last_hash": row[1], "last_item_count": row[2], "updated_at": row[3]}
                 for row in cur.fetchall()
             ]
 
@@ -548,8 +551,8 @@ class SQLiteClusterStore:
             ).fetchall())
             last_refresh = self.get_meta("last_refresh_at")
             feeds = {}
-            for row in conn.execute("SELECT feed_key, last_hash, updated_at FROM feed_state"):
-                feeds[row[0]] = {"last_hash": row[1], "updated_at": row[2]}
+            for row in conn.execute("SELECT feed_key, last_hash, last_item_count, updated_at FROM feed_state"):
+                feeds[row[0]] = {"last_hash": row[1], "last_item_count": row[2], "updated_at": row[3]}
             last_prune = self.get_meta(META_LAST_PRUNE_AT)
             prune_state = self.get_prune_state(
                 pruning_enabled=NEWS_PRUNING_ENABLED,

+ 56 - 0
provider_test.sh

@@ -0,0 +1,56 @@
+#!/usr/bin/env bash
+set -euo pipefail
+
+cd "$(dirname "$0")"
+
+if [ -f ".venv/bin/activate" ]; then
+  # shellcheck disable=SC1091
+  source .venv/bin/activate
+fi
+
+export PYTHONPATH="$(pwd):${PYTHONPATH:-}"
+
+python - <<'PY'
+import asyncio
+import json
+from news_mcp.config import NEWS_EXTRACT_PROVIDER, NEWS_EXTRACT_MODEL, NEWS_SUMMARY_PROVIDER, NEWS_SUMMARY_MODEL
+from news_mcp.llm import active_llm_config
+from news_mcp.enrichment.llm_enrich import classify_cluster_llm
+
+
+SAMPLE_CLUSTER = {
+    "cluster_id": "provider-test",
+    "headline": "Test headline for provider verification",
+    "summary": "This is a tiny sample cluster used to verify provider, model, and key configuration.",
+    "topic": "other",
+    "entities": [],
+    "sentiment": "neutral",
+    "importance": 0.0,
+    "sources": ["provider_test.sh"],
+    "timestamp": "2026-05-22T00:00:00+00:00",
+    "articles": [
+        {
+            "title": "Test headline for provider verification",
+            "url": "https://example.com/provider-test",
+            "source": "provider_test.sh",
+            "timestamp": "2026-05-22T00:00:00+00:00",
+            "summary": "This is a tiny sample cluster used to verify provider, model, and key configuration.",
+        }
+    ],
+}
+
+
+async def main() -> None:
+    print(json.dumps(active_llm_config(), indent=2, sort_keys=True))
+    print()
+    print("extract_provider=", NEWS_EXTRACT_PROVIDER)
+    print("extract_model=", NEWS_EXTRACT_MODEL)
+    print("summary_provider=", NEWS_SUMMARY_PROVIDER)
+    print("summary_model=", NEWS_SUMMARY_MODEL)
+    print()
+    result = await classify_cluster_llm(dict(SAMPLE_CLUSTER))
+    print(json.dumps(result, indent=2, sort_keys=True, ensure_ascii=False))
+
+
+asyncio.run(main())
+PY