from __future__ import annotations import asyncio import json from pathlib import Path from typing import Any, Dict, Iterable, List import httpx from news_mcp.config import ( GROQ_API_KEY, NEWS_EXTRACT_PROVIDER, NEWS_EXTRACT_MODEL, NEWS_SUMMARY_PROVIDER, NEWS_SUMMARY_MODEL, OPENAI_API_KEY, OPENROUTER_API_KEY, PROMPTS_DIR, ) SYSTEM_PROMPT = "You are a news signal extraction engine. Return STRICT JSON only." class LLMError(RuntimeError): pass def load_prompt(name: str) -> str: path = PROMPTS_DIR / name return path.read_text(encoding="utf-8") def _render_prompt(template: str, **kwargs: Any) -> str: rendered = template for key, value in kwargs.items(): rendered = rendered.replace("{" + key + "}", str(value)) return rendered def active_llm_config() -> dict[str, str]: return { "extract_provider": NEWS_EXTRACT_PROVIDER, "extract_model": NEWS_EXTRACT_MODEL, "summary_provider": NEWS_SUMMARY_PROVIDER, "summary_model": NEWS_SUMMARY_MODEL, "openrouter_key_set": bool(OPENROUTER_API_KEY), } async def _call_groq(model: str, messages: List[Dict[str, str]], response_json: bool = True) -> str: if not GROQ_API_KEY: raise LLMError("GROQ_API_KEY is not configured") req = {"model": model, "messages": messages, "temperature": 0.2} if response_json: req["response_format"] = {"type": "json_object"} async with httpx.AsyncClient(timeout=45.0) as client: resp = await client.post( "https://api.groq.com/openai/v1/chat/completions", headers={"Authorization": f"Bearer {GROQ_API_KEY}"}, json=req, ) resp.raise_for_status() data = resp.json() return data["choices"][0]["message"]["content"] async def _call_openai(model: str, messages: List[Dict[str, str]], response_json: bool = True) -> str: # OpenAI-compatible chat endpoint; uses NEWS_OPENAI_API_KEY. if not OPENAI_API_KEY: raise LLMError("OPENAI_API_KEY is not configured") req = {"model": model, "messages": messages} if response_json: req["response_format"] = {"type": "json_object"} async with httpx.AsyncClient(timeout=45.0) as client: resp = await client.post( "https://api.openai.com/v1/chat/completions", headers={"Authorization": f"Bearer {OPENAI_API_KEY}"}, json=req, ) resp.raise_for_status() data = resp.json() return data["choices"][0]["message"]["content"] OR_OPENROUTER_URL = "https://openrouter.ai/api/v1/chat/completions" async def _call_openrouter(model: str, messages: List[Dict[str, str]], response_json: bool = True, retries: int = 2) -> str: if not OPENROUTER_API_KEY: raise LLMError("OPENROUTER_API_KEY is not configured") req = {"model": model, "messages": messages, "temperature": 0.2} if response_json: req["response_format"] = {"type": "json_object"} headers = { "Authorization": f"Bearer {OPENROUTER_API_KEY}", "HTTP-Referer": "https://github.com/gr1m0/bolt.new-rss", "X-Title": "news-mcp", } last_err = "" async with httpx.AsyncClient(timeout=45.0) as client: for attempt in range(1 + retries): resp = await client.post( OR_OPENROUTER_URL, headers=headers, json=req, ) if resp.status_code != 200: last_err = f"HTTP {resp.status_code}: {resp.text[:300]}" if resp.status_code in (429, 500, 502, 503): await asyncio.sleep(2 ** attempt) continue resp.raise_for_status() data = resp.json() if "error" in data: last_err = f"API error: {data['error']}" break choices = data.get("choices", []) if not choices: last_err = f"No choices in response: {str(data)[:300]}" if attempt < retries: await asyncio.sleep(2 ** attempt) continue break msg = choices[0].get("message", {}) content = msg.get("content") if content: return content last_err = f"Empty content in choice: {str(msg)[:200]}" break raise LLMError(f"OpenRouter failed after {1+retries} attempts: {last_err}") async def call_llm(provider: str, model: str, system_prompt: str, user_prompt: str) -> str: messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}, ] provider = provider.lower().strip() if provider == "groq": return await _call_groq(model, messages) if provider == "openai": return await _call_openai(model, messages) if provider == "openrouter": return await _call_openrouter(model, messages) raise LLMError(f"Unsupported provider: {provider}. Valid: groq, openai, openrouter") def build_extraction_prompt(cluster: Dict[str, Any]) -> str: prompt = load_prompt("extract_entities.prompt") return _render_prompt(prompt, cluster_json=json.dumps(cluster, ensure_ascii=False)) async def call_extraction(cluster: Dict[str, Any]) -> Dict[str, Any]: user_prompt = build_extraction_prompt(cluster) content = await call_llm(NEWS_EXTRACT_PROVIDER, NEWS_EXTRACT_MODEL, SYSTEM_PROMPT, user_prompt) return json.loads(content) async def call_summary(cluster: Dict[str, Any]) -> Dict[str, Any]: prompt = load_prompt("summarize_cluster.prompt") user_prompt = _render_prompt(prompt, cluster_json=json.dumps(cluster, ensure_ascii=False)) content = await call_llm(NEWS_SUMMARY_PROVIDER, NEWS_SUMMARY_MODEL, "You are a summarization engine for news clusters. Return strict JSON only.", user_prompt) return json.loads(content)