from __future__ import annotations import asyncio import json from pathlib import Path from typing import Any, Dict, Iterable, List import httpx from news_mcp.config import ( GROQ_API_KEY, NEWS_EXTRACT_PROVIDER, NEWS_EXTRACT_MODEL, NEWS_SUMMARY_PROVIDER, NEWS_SUMMARY_MODEL, OPENAI_API_KEY, OPENROUTER_API_KEY, PROMPTS_DIR, llm_rate_limit, ) SYSTEM_PROMPT = "You are a news signal extraction engine. Return STRICT JSON only." class LLMError(RuntimeError): pass # --------------------------------------------------------------------------- # Per-provider rate limiter (token bucket). # --------------------------------------------------------------------------- _rate_limiters: dict[str, _RateLimiter] = {} def _get_rate_limiter(provider: str) -> "_RateLimiter | None": """Return (or lazily create) the rate limiter for *provider*. Returns None when rate limiting is disabled for this provider. """ provider = provider.strip().lower() rl = llm_rate_limit(provider) if rl <= 0.0: return None if provider not in _rate_limiters: _rate_limiters[provider] = _RateLimiter(rl) return _rate_limiters[provider] class _RateLimiter: """Simple async token-bucket rate limiter shared across all calls for one provider.""" def __init__(self, rate: float): self._interval = 1.0 / rate # seconds between tokens self._last_used = 0.0 # monotonic timestamp of last acquire self._lock = asyncio.Lock() async def acquire(self): async with self._lock: now = asyncio.get_event_loop().time() wait = self._last_used + self._interval - now if wait > 0: await asyncio.sleep(wait) self._last_used = asyncio.get_event_loop().time() def load_prompt(name: str) -> str: path = PROMPTS_DIR / name return path.read_text(encoding="utf-8") def _render_prompt(template: str, **kwargs: Any) -> str: rendered = template for key, value in kwargs.items(): rendered = rendered.replace("{" + key + "}", str(value)) return rendered def active_llm_config() -> dict[str, str]: return { "extract_provider": NEWS_EXTRACT_PROVIDER, "extract_model": NEWS_EXTRACT_MODEL, "summary_provider": NEWS_SUMMARY_PROVIDER, "summary_model": NEWS_SUMMARY_MODEL, "openrouter_key_set": bool(OPENROUTER_API_KEY), } async def _call_groq(model: str, messages: List[Dict[str, str]], response_json: bool = True, retries: int = 2) -> str: if not GROQ_API_KEY: raise LLMError("GROQ_API_KEY is not configured") req = {"model": model, "messages": messages, "temperature": 0.2} if response_json: req["response_format"] = {"type": "json_object"} last_err = "" async with httpx.AsyncClient(timeout=45.0) as client: for attempt in range(1 + retries): resp = await client.post( "https://api.groq.com/openai/v1/chat/completions", headers={"Authorization": f"Bearer {GROQ_API_KEY}"}, json=req, ) if resp.status_code != 200: last_err = f"HTTP {resp.status_code}: {resp.text[:300]}" if resp.status_code in (429, 500, 502, 503): await asyncio.sleep(2 ** attempt) continue resp.raise_for_status() data = resp.json() if "error" in data: last_err = f"API error: {data['error']}" break choices = data.get("choices", []) if not choices: last_err = f"No choices in response: {str(data)[:300]}" if attempt < retries: await asyncio.sleep(2 ** attempt) continue break content = choices[0].get("message", {}).get("content") if content: return content last_err = f"Empty content in choice: {str(choices[0])[:200]}" if attempt < retries: await asyncio.sleep(2 ** attempt) continue break raise LLMError(f"Groq failed after {1+retries} attempts: {last_err}") async def _call_openai(model: str, messages: List[Dict[str, str]], response_json: bool = True, retries: int = 2) -> str: if not OPENAI_API_KEY: raise LLMError("OPENAI_API_KEY is not configured") req = {"model": model, "messages": messages} if response_json: req["response_format"] = {"type": "json_object"} last_err = "" async with httpx.AsyncClient(timeout=45.0) as client: for attempt in range(1 + retries): resp = await client.post( "https://api.openai.com/v1/chat/completions", headers={"Authorization": f"Bearer {OPENAI_API_KEY}"}, json=req, ) if resp.status_code != 200: last_err = f"HTTP {resp.status_code}: {resp.text[:300]}" if resp.status_code in (429, 500, 502, 503): await asyncio.sleep(2 ** attempt) continue resp.raise_for_status() data = resp.json() if "error" in data: last_err = f"API error: {data['error']}" break choices = data.get("choices", []) if not choices: last_err = f"No choices in response: {str(data)[:300]}" if attempt < retries: await asyncio.sleep(2 ** attempt) continue break content = choices[0].get("message", {}).get("content") if content: return content last_err = f"Empty content in choice: {str(choices[0])[:200]}" if attempt < retries: await asyncio.sleep(2 ** attempt) continue break raise LLMError(f"OpenAI failed after {1+retries} attempts: {last_err}") OR_OPENROUTER_URL = "https://openrouter.ai/api/v1/chat/completions" async def _call_openrouter(model: str, messages: List[Dict[str, str]], response_json: bool = True, retries: int = 2) -> str: if not OPENROUTER_API_KEY: raise LLMError("OPENROUTER_API_KEY is not configured") req = {"model": model, "messages": messages, "temperature": 0.2} if response_json: req["response_format"] = {"type": "json_object"} headers = { "Authorization": f"Bearer {OPENROUTER_API_KEY}", "HTTP-Referer": "https://github.com/gr1m0/bolt.new-rss", "X-Title": "news-mcp", } last_err = "" async with httpx.AsyncClient(timeout=45.0) as client: for attempt in range(1 + retries): resp = await client.post( OR_OPENROUTER_URL, headers=headers, json=req, ) if resp.status_code != 200: last_err = f"HTTP {resp.status_code}: {resp.text[:300]}" if resp.status_code in (429, 500, 502, 503): await asyncio.sleep(2 ** attempt) continue resp.raise_for_status() data = resp.json() if "error" in data: last_err = f"API error: {data['error']}" break choices = data.get("choices", []) if not choices: last_err = f"No choices in response: {str(data)[:300]}" if attempt < retries: await asyncio.sleep(2 ** attempt) continue break msg = choices[0].get("message", {}) content = msg.get("content") if content: return content last_err = f"Empty content in choice: {str(msg)[:200]}" break raise LLMError(f"OpenRouter failed after {1+retries} attempts: {last_err}") async def call_llm(provider: str, model: str, system_prompt: str, user_prompt: str) -> str: messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}, ] provider = provider.lower().strip() # Rate-limit before dispatching to the provider-specific caller. rl = _get_rate_limiter(provider) if rl is not None: await rl.acquire() if provider == "groq": return await _call_groq(model, messages) if provider == "openai": return await _call_openai(model, messages) if provider == "openrouter": return await _call_openrouter(model, messages) raise LLMError(f"Unsupported provider: {provider}. Valid: groq, openai, openrouter") def build_extraction_prompt(cluster: Dict[str, Any]) -> str: prompt = load_prompt("extract_entities.prompt") return _render_prompt(prompt, cluster_json=json.dumps(cluster, ensure_ascii=False)) async def call_extraction(cluster: Dict[str, Any]) -> Dict[str, Any]: user_prompt = build_extraction_prompt(cluster) content = await call_llm(NEWS_EXTRACT_PROVIDER, NEWS_EXTRACT_MODEL, SYSTEM_PROMPT, user_prompt) return json.loads(content) async def call_summary(cluster: Dict[str, Any]) -> Dict[str, Any]: prompt = load_prompt("summarize_cluster.prompt") user_prompt = _render_prompt(prompt, cluster_json=json.dumps(cluster, ensure_ascii=False)) content = await call_llm(NEWS_SUMMARY_PROVIDER, NEWS_SUMMARY_MODEL, "You are a summarization engine for news clusters. Return strict JSON only.", user_prompt) return json.loads(content)