소스 검색

enrichment retry: per-cluster backoff + cross-cycle retry of failed llm calls

- llm.py: add retry loop (retries=2, exponential backoff on 429/500/502/503)
  to _call_groq and _call_openai (previously only _call_openrouter had retries)
- poller.py: _enrich_single_cluster retries up to 3 times with exponential
  backoff (2s, 4s, 8s) before marking enrichment_failed_at; tracks
  enrichment_retry_count on the payload so next cycle can re-attempt
- poller.py: after normal enrichment, queries DB for failed clusters and
  retries them concurrently; clears failure markers on success
- sqlite_store.py: add get_failed_enrichment_clusters() using json_extract
  to find clusters with enrichment_failed_at set but below retry threshold
- config.py: MAX_ENRICHMENT_RETRIES=3 (module constant, not env-configurable)
Lukas Goldschmidt 1 주 전
부모
커밋
7c83c529a8
4개의 변경된 파일152개의 추가작업 그리고 29개의 파일을 삭제
  1. 62 8
      news_mcp/jobs/poller.py
  2. 70 21
      news_mcp/llm.py
  3. 17 0
      news_mcp/storage/sqlite_store.py
  4. 3 0
      test_news_mcp.py

+ 62 - 8
news_mcp/jobs/poller.py

@@ -3,6 +3,7 @@ from __future__ import annotations
 import asyncio
 import hashlib
 import logging
+import sys
 from collections import defaultdict
 from datetime import datetime, timezone
 from typing import Any, Dict
@@ -29,6 +30,8 @@ from news_mcp.storage.sqlite_store import SQLiteClusterStore
 from news_mcp.trends_resolution import resolve_entity_via_trends
 
 
+MAX_ENRICHMENT_RETRIES = 3  # per-cluster retries before giving up for this cycle
+
 async def _enrich_single_cluster(
     c: dict,
     topic: str,
@@ -37,7 +40,13 @@ async def _enrich_single_cluster(
     store: SQLiteClusterStore,
     logger: logging.Logger,
 ) -> dict:
-    """Enrich one cluster: heuristic + optional LLM extraction, concurrency-limited."""
+    """Enrich one cluster: heuristic + optional LLM extraction, concurrency-limited.
+
+    On LLM failure the cluster is retried up to MAX_ENRICHMENT_RETRIES times
+    with exponential backoff.  If all retries are exhausted the cluster is
+    marked with enrichment_failed_at and enrichment_retry_count so the next
+    polling cycle can re-attempt it.
+    """
     c2 = enrich_cluster(c)
     c2.setdefault("topic", topic)
 
@@ -64,16 +73,35 @@ async def _enrich_single_cluster(
             if existing.get("topic"):
                 c2["topic"] = existing.get("topic")
         else:
-            # Acquire semaphore before making outbound LLM call
-            async with semaphore:
+            # Retry loop with exponential backoff | semaphore held per-attempt
+            last_err = ""
+            for attempt in range(1 + MAX_ENRICHMENT_RETRIES):
+                if attempt > 0:
+                    backoff = 2 ** attempt
+                    logger.info(
+                        "retry cluster=%s topic=%s attempt=%d/%d backoff=%.0fs",
+                        cluster_id, topic, attempt, MAX_ENRICHMENT_RETRIES, backoff,
+                    )
+                    await asyncio.sleep(backoff)
                 try:
-                    c2 = await classify_cluster_llm(c2)
+                    async with semaphore:
+                        c2 = await classify_cluster_llm(dict(c2))
+                    break  # success
                 except Exception:
-                    logger.exception(
-                        "LLM enrichment failed for cluster %s (topic %s)",
-                        c2.get("cluster_id"), topic,
+                    last_err = str(sys.exc_info()[1])[:200] if sys.exc_info()[1] else "unknown"
+                    logger.warning(
+                        "LLM enrichment failed cluster=%s topic=%s attempt=%d/%d err=%s",
+                        cluster_id, topic, attempt, MAX_ENRICHMENT_RETRIES, last_err,
                     )
-                    c2["enrichment_failed_at"] = datetime.now(timezone.utc).isoformat()
+            else:
+                # Loop completed without break = all retries exhausted
+                prev_count = c2.get("enrichment_retry_count", 0)
+                c2["enrichment_failed_at"] = datetime.now(timezone.utc).isoformat()
+                c2["enrichment_retry_count"] = prev_count + 1
+                logger.error(
+                    "LLM enrichment exhausted cluster=%s topic=%s after %d retries",
+                    cluster_id, topic, MAX_ENRICHMENT_RETRIES,
+                )
 
     return c2
 
@@ -210,6 +238,32 @@ async def refresh_clusters(topic: str | None = None, limit: int = 80) -> None:
             store.upsert_clusters(group, topic=final_topic)
             logger.info("refresh stored topic=%s clusters=%s", final_topic, len(group))
 
+    # Retry previously failed enrichments
+    failed_clusters = store.get_failed_enrichment_clusters(max_retries=3)
+    if failed_clusters:
+        logger.info("retry enrich failed clusters count=%d", len(failed_clusters))
+        retry_tasks = [
+            _enrich_single_cluster(
+                c, str(c.get("topic") or "other"), True, llm_semaphore, store, logger
+            )
+            for c in failed_clusters
+        ]
+        retry_results = await asyncio.gather(*retry_tasks, return_exceptions=False)
+        # Persist retried results
+        by_topic_retry: Dict[str, list] = {}
+        for c2 in retry_results:
+            # Clear stale failure marker on success
+            if not c2.get("enrichment_failed_at") or c2.get("entities"):
+                c2.pop("enrichment_failed_at", None)
+                c2.pop("enrichment_retry_count", None)
+            t = str(c2.get("topic") or "other").strip().lower()
+            if t not in {x.lower() for x in DEFAULT_TOPICS}:
+                t = "other"
+            by_topic_retry.setdefault(t, []).append(c2)
+        for t, group in by_topic_retry.items():
+            store.upsert_clusters(group, topic=t)
+            logger.info("retry stored topic=%s clusters=%s", t, len(group))
+
     prune_result = store.prune_if_due(
         pruning_enabled=NEWS_PRUNING_ENABLED,
         retention_days=NEWS_RETENTION_DAYS,

+ 70 - 21
news_mcp/llm.py

@@ -48,39 +48,88 @@ def active_llm_config() -> dict[str, str]:
     }
 
 
-async def _call_groq(model: str, messages: List[Dict[str, str]], response_json: bool = True) -> str:
+async def _call_groq(model: str, messages: List[Dict[str, str]], response_json: bool = True, retries: int = 2) -> str:
     if not GROQ_API_KEY:
         raise LLMError("GROQ_API_KEY is not configured")
     req = {"model": model, "messages": messages, "temperature": 0.2}
     if response_json:
         req["response_format"] = {"type": "json_object"}
+    last_err = ""
     async with httpx.AsyncClient(timeout=45.0) as client:
-        resp = await client.post(
-            "https://api.groq.com/openai/v1/chat/completions",
-            headers={"Authorization": f"Bearer {GROQ_API_KEY}"},
-            json=req,
-        )
-        resp.raise_for_status()
-        data = resp.json()
-    return data["choices"][0]["message"]["content"]
-
-
-async def _call_openai(model: str, messages: List[Dict[str, str]], response_json: bool = True) -> str:
-    # OpenAI-compatible chat endpoint; uses NEWS_OPENAI_API_KEY.
+        for attempt in range(1 + retries):
+            resp = await client.post(
+                "https://api.groq.com/openai/v1/chat/completions",
+                headers={"Authorization": f"Bearer {GROQ_API_KEY}"},
+                json=req,
+            )
+            if resp.status_code != 200:
+                last_err = f"HTTP {resp.status_code}: {resp.text[:300]}"
+                if resp.status_code in (429, 500, 502, 503):
+                    await asyncio.sleep(2 ** attempt)
+                    continue
+                resp.raise_for_status()
+            data = resp.json()
+            if "error" in data:
+                last_err = f"API error: {data['error']}"
+                break
+            choices = data.get("choices", [])
+            if not choices:
+                last_err = f"No choices in response: {str(data)[:300]}"
+                if attempt < retries:
+                    await asyncio.sleep(2 ** attempt)
+                    continue
+                break
+            content = choices[0].get("message", {}).get("content")
+            if content:
+                return content
+            last_err = f"Empty content in choice: {str(choices[0])[:200]}"
+            if attempt < retries:
+                await asyncio.sleep(2 ** attempt)
+                continue
+            break
+    raise LLMError(f"Groq failed after {1+retries} attempts: {last_err}")
+
+
+async def _call_openai(model: str, messages: List[Dict[str, str]], response_json: bool = True, retries: int = 2) -> str:
     if not OPENAI_API_KEY:
         raise LLMError("OPENAI_API_KEY is not configured")
     req = {"model": model, "messages": messages}
     if response_json:
         req["response_format"] = {"type": "json_object"}
+    last_err = ""
     async with httpx.AsyncClient(timeout=45.0) as client:
-        resp = await client.post(
-            "https://api.openai.com/v1/chat/completions",
-            headers={"Authorization": f"Bearer {OPENAI_API_KEY}"},
-            json=req,
-        )
-        resp.raise_for_status()
-        data = resp.json()
-    return data["choices"][0]["message"]["content"]
+        for attempt in range(1 + retries):
+            resp = await client.post(
+                "https://api.openai.com/v1/chat/completions",
+                headers={"Authorization": f"Bearer {OPENAI_API_KEY}"},
+                json=req,
+            )
+            if resp.status_code != 200:
+                last_err = f"HTTP {resp.status_code}: {resp.text[:300]}"
+                if resp.status_code in (429, 500, 502, 503):
+                    await asyncio.sleep(2 ** attempt)
+                    continue
+                resp.raise_for_status()
+            data = resp.json()
+            if "error" in data:
+                last_err = f"API error: {data['error']}"
+                break
+            choices = data.get("choices", [])
+            if not choices:
+                last_err = f"No choices in response: {str(data)[:300]}"
+                if attempt < retries:
+                    await asyncio.sleep(2 ** attempt)
+                    continue
+                break
+            content = choices[0].get("message", {}).get("content")
+            if content:
+                return content
+            last_err = f"Empty content in choice: {str(choices[0])[:200]}"
+            if attempt < retries:
+                await asyncio.sleep(2 ** attempt)
+                continue
+            break
+    raise LLMError(f"OpenAI failed after {1+retries} attempts: {last_err}")
 
 
 OR_OPENROUTER_URL = "https://openrouter.ai/api/v1/chat/completions"

+ 17 - 0
news_mcp/storage/sqlite_store.py

@@ -468,6 +468,23 @@ class SQLiteClusterStore:
                 (entity_id, normalized_label, None, mid, json.dumps([], ensure_ascii=False), now, now),
             )
 
+    def get_failed_enrichment_clusters(self, max_retries: int = 3) -> list[dict]:
+        """Return clusters whose last enrichment failed and haven't exceeded max_retries.
+
+        These are candidates for re-enrichment on the next polling cycle.
+        """
+        with self._conn() as conn:
+            cur = conn.execute(
+                "SELECT payload FROM clusters "
+                "WHERE json_extract(payload, '$.enrichment_failed_at') IS NOT NULL "
+                "AND (json_extract(payload, '$.enrichment_retry_count') IS NULL "
+                "     OR json_extract(payload, '$.enrichment_retry_count') < ?) "
+                "ORDER BY updated_at DESC LIMIT 500",
+                (max_retries,),
+            )
+            rows = cur.fetchall()
+        return [json.loads(r[0]) for r in rows]
+
     def prune_clusters(self, retention_days: float) -> int:
         retention_days = float(retention_days)
         if retention_days <= 0:

+ 3 - 0
test_news_mcp.py

@@ -628,6 +628,9 @@ def test_poller_persists_clusters_under_post_enrichment_topic(monkeypatch):
         def prune_if_due(self, **kwargs):
             return {"deleted": 0}
 
+        def get_failed_enrichment_clusters(self, max_retries=3):
+            return []
+
         def set_meta(self, key, value):
             pass