|
@@ -5,7 +5,7 @@ import hashlib
|
|
|
import logging
|
|
import logging
|
|
|
import sys
|
|
import sys
|
|
|
from collections import defaultdict
|
|
from collections import defaultdict
|
|
|
-from datetime import datetime, timezone, timedelta
|
|
|
|
|
|
|
+from datetime import datetime, timezone
|
|
|
from typing import Any, Dict
|
|
from typing import Any, Dict
|
|
|
|
|
|
|
|
from news_mcp.config import (
|
|
from news_mcp.config import (
|
|
@@ -51,6 +51,13 @@ async def _enrich_single_cluster(
|
|
|
) -> dict:
|
|
) -> dict:
|
|
|
"""Enrich one cluster: heuristic + optional LLM extraction, concurrency-limited.
|
|
"""Enrich one cluster: heuristic + optional LLM extraction, concurrency-limited.
|
|
|
|
|
|
|
|
|
|
+ Cache strategy: if the cluster already has entities and an enriched_at
|
|
|
|
|
+ timestamp (from a previous successful enrichment), skip the LLM call.
|
|
|
|
|
+ Clusters only need to be enriched once — the enrichment output
|
|
|
|
|
+ (entities, keywords, sentiment, topic) is derived from the cluster's
|
|
|
|
|
+ article content, which doesn't change meaningfully between polls unless
|
|
|
|
|
+ enough new articles arrive to form a different cluster.
|
|
|
|
|
+
|
|
|
On LLM failure the cluster is retried up to MAX_ENRICHMENT_RETRIES times
|
|
On LLM failure the cluster is retried up to MAX_ENRICHMENT_RETRIES times
|
|
|
with exponential backoff. If all retries are exhausted the cluster is
|
|
with exponential backoff. If all retries are exhausted the cluster is
|
|
|
marked with enrichment_failed_at and enrichment_retry_count so the next
|
|
marked with enrichment_failed_at and enrichment_retry_count so the next
|
|
@@ -61,11 +68,24 @@ async def _enrich_single_cluster(
|
|
|
|
|
|
|
|
cluster_id = c2.get("cluster_id")
|
|
cluster_id = c2.get("cluster_id")
|
|
|
if llm_enabled and cluster_id:
|
|
if llm_enabled and cluster_id:
|
|
|
- # Cache: if we already have entities/sentiment for this cluster, skip LLM call.
|
|
|
|
|
|
|
+ # --- Cache check: skip LLM if already enriched ---
|
|
|
|
|
+ # The cluster payload may already carry enriched_at if it was loaded
|
|
|
|
|
+ # from an existing DB cluster during cross-cycle seeding.
|
|
|
|
|
+ enriched_at_str = c2.get("enriched_at")
|
|
|
|
|
+ if enriched_at_str and c2.get("entities"):
|
|
|
|
|
+ logger.debug("enrich skip (cached) cluster=%s topic=%s", cluster_id, topic)
|
|
|
|
|
+ return c2
|
|
|
|
|
+
|
|
|
|
|
+ # --- Check DB: cluster may have been enriched in a previous cycle ---
|
|
|
|
|
+ # Note: cluster_id is derived from the article set, so this lookup
|
|
|
|
|
+ # only matches when the same exact article set was clustered before.
|
|
|
|
|
+ # That's fine — it means the enrichment is still valid.
|
|
|
existing = store.get_cluster_by_id(cluster_id)
|
|
existing = store.get_cluster_by_id(cluster_id)
|
|
|
- if existing and existing.get("entities"):
|
|
|
|
|
|
|
+ if existing and existing.get("entities") and existing.get("enriched_at"):
|
|
|
|
|
+ logger.info("enrich cache-hit cluster=%s topic=%s", cluster_id, topic)
|
|
|
c2 = dict(c2)
|
|
c2 = dict(c2)
|
|
|
c2["entities"] = existing.get("entities", [])
|
|
c2["entities"] = existing.get("entities", [])
|
|
|
|
|
+ c2["enriched_at"] = existing.get("enriched_at")
|
|
|
|
|
|
|
|
existing_resolutions = existing.get("entityResolutions", None)
|
|
existing_resolutions = existing.get("entityResolutions", None)
|
|
|
if isinstance(existing_resolutions, list) and existing_resolutions:
|
|
if isinstance(existing_resolutions, list) and existing_resolutions:
|
|
@@ -81,36 +101,38 @@ async def _enrich_single_cluster(
|
|
|
c2["keywords"] = existing.get("keywords")
|
|
c2["keywords"] = existing.get("keywords")
|
|
|
if existing.get("topic"):
|
|
if existing.get("topic"):
|
|
|
c2["topic"] = existing.get("topic")
|
|
c2["topic"] = existing.get("topic")
|
|
|
- else:
|
|
|
|
|
- # Retry loop with exponential backoff | semaphore held per-attempt
|
|
|
|
|
- last_err = ""
|
|
|
|
|
- for attempt in range(1 + MAX_ENRICHMENT_RETRIES):
|
|
|
|
|
- if attempt > 0:
|
|
|
|
|
- backoff = 2 ** attempt
|
|
|
|
|
- logger.info(
|
|
|
|
|
- "retry cluster=%s topic=%s attempt=%d/%d backoff=%.0fs",
|
|
|
|
|
- cluster_id, topic, attempt, MAX_ENRICHMENT_RETRIES, backoff,
|
|
|
|
|
- )
|
|
|
|
|
- await asyncio.sleep(backoff)
|
|
|
|
|
- try:
|
|
|
|
|
- async with semaphore:
|
|
|
|
|
- c2 = await classify_cluster_llm(dict(c2))
|
|
|
|
|
- break # success
|
|
|
|
|
- except Exception:
|
|
|
|
|
- last_err = str(sys.exc_info()[1])[:200] if sys.exc_info()[1] else "unknown"
|
|
|
|
|
- logger.warning(
|
|
|
|
|
- "LLM enrichment failed cluster=%s topic=%s attempt=%d/%d err=%s",
|
|
|
|
|
- cluster_id, topic, attempt, MAX_ENRICHMENT_RETRIES, last_err,
|
|
|
|
|
- )
|
|
|
|
|
- else:
|
|
|
|
|
- # Loop completed without break = all retries exhausted
|
|
|
|
|
- prev_count = c2.get("enrichment_retry_count", 0)
|
|
|
|
|
- c2["enrichment_failed_at"] = datetime.now(timezone.utc).isoformat()
|
|
|
|
|
- c2["enrichment_retry_count"] = prev_count + 1
|
|
|
|
|
- logger.error(
|
|
|
|
|
- "LLM enrichment exhausted cluster=%s topic=%s after %d retries",
|
|
|
|
|
- cluster_id, topic, MAX_ENRICHMENT_RETRIES,
|
|
|
|
|
|
|
+ return c2
|
|
|
|
|
+
|
|
|
|
|
+ # --- Actually call the LLM ---
|
|
|
|
|
+ last_err = ""
|
|
|
|
|
+ for attempt in range(1 + MAX_ENRICHMENT_RETRIES):
|
|
|
|
|
+ if attempt > 0:
|
|
|
|
|
+ backoff = 2 ** attempt
|
|
|
|
|
+ logger.info(
|
|
|
|
|
+ "retry cluster=%s topic=%s attempt=%d/%d backoff=%.0fs",
|
|
|
|
|
+ cluster_id, topic, attempt, MAX_ENRICHMENT_RETRIES, backoff,
|
|
|
)
|
|
)
|
|
|
|
|
+ await asyncio.sleep(backoff)
|
|
|
|
|
+ try:
|
|
|
|
|
+ async with semaphore:
|
|
|
|
|
+ c2 = await classify_cluster_llm(dict(c2))
|
|
|
|
|
+ c2["enriched_at"] = datetime.now(timezone.utc).isoformat()
|
|
|
|
|
+ break # success
|
|
|
|
|
+ except Exception:
|
|
|
|
|
+ last_err = str(sys.exc_info()[1])[:200] if sys.exc_info()[1] else "unknown"
|
|
|
|
|
+ logger.warning(
|
|
|
|
|
+ "LLM enrichment failed cluster=%s topic=%s attempt=%d/%d err=%s",
|
|
|
|
|
+ cluster_id, topic, attempt, MAX_ENRICHMENT_RETRIES, last_err,
|
|
|
|
|
+ )
|
|
|
|
|
+ else:
|
|
|
|
|
+ # Loop completed without break = all retries exhausted
|
|
|
|
|
+ prev_count = c2.get("enrichment_retry_count", 0)
|
|
|
|
|
+ c2["enrichment_failed_at"] = datetime.now(timezone.utc).isoformat()
|
|
|
|
|
+ c2["enrichment_retry_count"] = prev_count + 1
|
|
|
|
|
+ logger.error(
|
|
|
|
|
+ "LLM enrichment exhausted cluster=%s topic=%s after %d retries",
|
|
|
|
|
+ cluster_id, topic, MAX_ENRICHMENT_RETRIES,
|
|
|
|
|
+ )
|
|
|
|
|
|
|
|
return c2
|
|
return c2
|
|
|
|
|
|
|
@@ -286,7 +308,7 @@ async def refresh_clusters(topic: str | None = None, limit: int = 80) -> None:
|
|
|
logger.info("retry enrich failed clusters count=%d", len(failed_clusters))
|
|
logger.info("retry enrich failed clusters count=%d", len(failed_clusters))
|
|
|
retry_tasks = [
|
|
retry_tasks = [
|
|
|
_enrich_single_cluster(
|
|
_enrich_single_cluster(
|
|
|
- c, str(c.get("topic") or "other"), True, llm_semaphore, store, logger
|
|
|
|
|
|
|
+ c, str(c.get("topic") or "other"), True, llm_semaphore, store, logger,
|
|
|
)
|
|
)
|
|
|
for c in failed_clusters
|
|
for c in failed_clusters
|
|
|
]
|
|
]
|