from __future__ import annotations import json import logging from typing import Any, Dict, List import httpx from news_mcp.config import GROQ_API_KEY, GROQ_MODEL, GROQ_DEBUG logger = logging.getLogger(__name__) _SYSTEM = "You are a news signal extraction engine. Return STRICT JSON only." def _build_prompt(articles: List[Dict[str, Any]], headline: str, summary: str | None) -> str: # Keep prompt compact: clusters already deduped. sample = articles[:6] return json.dumps( { "cluster": { "headline": headline, "summary": summary or "", "articles": [ { "title": a.get("title"), "url": a.get("url"), "source": a.get("source"), "timestamp": a.get("timestamp"), "summary": a.get("summary", ""), } for a in sample ], } }, ensure_ascii=False, ) async def classify_cluster_groq(cluster: Dict[str, Any]) -> Dict[str, Any]: if not GROQ_API_KEY: # No enrichment configured. return cluster headline = cluster.get("headline", "") summary = cluster.get("summary", "") articles = cluster.get("articles", []) user_payload = _build_prompt(articles=articles, headline=headline, summary=summary) prompt = ( f"Input cluster JSON:\n{user_payload}\n\n" "You MUST extract a news signal from the headline AND summary. Do not leave entities empty when the text mentions obvious names.\n" "Task:\n" "1) infer the best top-level topic\n" "2) extract concise entities from the cluster\n" "3) assign sentiment from the wording/context\n" "4) provide short keywords that justify the classification\n\n" "Entity rules (strict):\n" "- Use short strings (1-5 words).\n" "- Include all obvious named entities mentioned in headline or summary: people, countries, regions, organizations, ministries, presidents, leaders, wars/conflicts if named.\n" "- Also include finance/crypto entities when present: BTC, ETH, Bitcoin, Ethereum, ETF, SEC, ECB, Fed, euro, inflation, rates.\n" "- If the cluster mentions Iran, UAE, Egypt, Germany, Europe, Trump, Merz, Sisi, those should appear in entities.\n" "- Do NOT return empty entities if any such names/places appear.\n\n" "Sentiment rules:\n" "- positive: clearly encouraging, improving, or supportive tone\n" "- negative: clearly alarming, worsening, severe, conflict, loss, risk, warning tone\n" "- neutral: factual, balanced, or mixed\n" "- sentimentScore must be a number from -1.0 to 1.0 and should reflect the sentiment label.\n\n" "Return STRICT JSON with EXACT keys only:\n" "{ topic, entities, sentiment, sentimentScore, keywords }\n" "where topic is one of [crypto, macro, regulation, ai, other].\n" ) if GROQ_DEBUG: msg = f"[GROQ PROMPT] {prompt}" logger.warning(msg) print(msg, flush=True) req = { "model": GROQ_MODEL, "messages": [ {"role": "system", "content": _SYSTEM}, {"role": "user", "content": prompt}, ], "temperature": 0.2, "response_format": {"type": "json_object"}, } async with httpx.AsyncClient(timeout=30.0) as client: resp = await client.post( "https://api.groq.com/openai/v1/chat/completions", headers={"Authorization": f"Bearer {GROQ_API_KEY}"}, json=req, ) resp.raise_for_status() data = resp.json() content = data["choices"][0]["message"]["content"] if GROQ_DEBUG: msg = f"[GROQ RAW RESPONSE] {content}" logger.warning(msg) print(msg, flush=True) parsed = json.loads(content) # Normalize output types into our cluster shape. topic = parsed.get("topic") or cluster.get("topic") entities = parsed.get("entities") or [] sentiment = parsed.get("sentiment") or "neutral" sentiment_score = parsed.get("sentimentScore") keywords = parsed.get("keywords") or [] out = dict(cluster) if topic: out["topic"] = topic out["entities"] = entities out["sentiment"] = sentiment if sentiment_score is not None: out["sentimentScore"] = float(sentiment_score) out["keywords"] = keywords return out async def summarize_cluster_groq(cluster: Dict[str, Any]) -> Dict[str, Any]: """Produce a compact agent-facing summary. Returns: { "headline": str, "mergedSummary": str, "keyFacts": [str,...], "sources": [str,...] } """ if not GROQ_API_KEY: return { "headline": cluster.get("headline"), "mergedSummary": cluster.get("summary"), "keyFacts": [], "sources": cluster.get("sources", []), } headline = cluster.get("headline", "") summary = cluster.get("summary", "") articles = cluster.get("articles", []) sample = articles[:5] req = { "model": GROQ_MODEL, "messages": [ { "role": "system", "content": "You are a summarization engine for news clusters. Return strict JSON only.", }, { "role": "user", "content": json.dumps( { "headline": headline, "summary": summary, "articles": [ { "title": a.get("title"), "url": a.get("url"), "source": a.get("source"), "timestamp": a.get("timestamp"), } for a in sample ], }, ensure_ascii=False, ) + "\n\nReturn keys: headline, mergedSummary, keyFacts (5-8 strings), sources. mergedSummary should be 2-4 sentences.", }, ], "temperature": 0.2, "response_format": {"type": "json_object"}, } async with httpx.AsyncClient(timeout=45.0) as client: resp = await client.post( "https://api.groq.com/openai/v1/chat/completions", headers={"Authorization": f"Bearer {GROQ_API_KEY}"}, json=req, ) resp.raise_for_status() data = resp.json() content = data["choices"][0]["message"]["content"] parsed = json.loads(content) return parsed