| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194 |
- from __future__ import annotations
- import json
- import logging
- from typing import Any, Dict, List
- import httpx
- from news_mcp.config import GROQ_API_KEY, GROQ_MODEL, GROQ_DEBUG
- logger = logging.getLogger(__name__)
- _SYSTEM = "You are a news signal extraction engine. Return STRICT JSON only."
- def _build_prompt(articles: List[Dict[str, Any]], headline: str, summary: str | None) -> str:
- # Keep prompt compact: clusters already deduped.
- sample = articles[:6]
- return json.dumps(
- {
- "cluster": {
- "headline": headline,
- "summary": summary or "",
- "articles": [
- {
- "title": a.get("title"),
- "url": a.get("url"),
- "source": a.get("source"),
- "timestamp": a.get("timestamp"),
- "summary": a.get("summary", ""),
- }
- for a in sample
- ],
- }
- },
- ensure_ascii=False,
- )
- async def classify_cluster_groq(cluster: Dict[str, Any]) -> Dict[str, Any]:
- if not GROQ_API_KEY:
- # No enrichment configured.
- return cluster
- headline = cluster.get("headline", "")
- summary = cluster.get("summary", "")
- articles = cluster.get("articles", [])
- user_payload = _build_prompt(articles=articles, headline=headline, summary=summary)
- prompt = (
- f"Input cluster JSON:\n{user_payload}\n\n"
- "You MUST extract a news signal from the headline AND summary. Do not leave entities empty when the text mentions obvious names.\n"
- "Task:\n"
- "1) infer the best top-level topic\n"
- "2) extract concise entities from the cluster\n"
- "3) assign sentiment from the wording/context\n"
- "4) provide short keywords that justify the classification\n\n"
- "Entity rules (strict):\n"
- "- Use short strings (1-5 words).\n"
- "- Include all obvious named entities mentioned in headline or summary: people, countries, regions, organizations, ministries, presidents, leaders, wars/conflicts if named.\n"
- "- Also include finance/crypto entities when present: BTC, ETH, Bitcoin, Ethereum, ETF, SEC, ECB, Fed, euro, inflation, rates.\n"
- "- If the cluster mentions Iran, UAE, Egypt, Germany, Europe, Trump, Merz, Sisi, those should appear in entities.\n"
- "- Do NOT return empty entities if any such names/places appear.\n\n"
- "Sentiment rules:\n"
- "- positive: clearly encouraging, improving, or supportive tone\n"
- "- negative: clearly alarming, worsening, severe, conflict, loss, risk, warning tone\n"
- "- neutral: factual, balanced, or mixed\n"
- "- sentimentScore must be a number from -1.0 to 1.0 and should reflect the sentiment label.\n\n"
- "Return STRICT JSON with EXACT keys only:\n"
- "{ topic, entities, sentiment, sentimentScore, keywords }\n"
- "where topic is one of [crypto, macro, regulation, ai, other].\n"
- )
- if GROQ_DEBUG:
- msg = f"[GROQ PROMPT] {prompt}"
- logger.warning(msg)
- print(msg, flush=True)
- req = {
- "model": GROQ_MODEL,
- "messages": [
- {"role": "system", "content": _SYSTEM},
- {"role": "user", "content": prompt},
- ],
- "temperature": 0.2,
- "response_format": {"type": "json_object"},
- }
- async with httpx.AsyncClient(timeout=30.0) as client:
- resp = await client.post(
- "https://api.groq.com/openai/v1/chat/completions",
- headers={"Authorization": f"Bearer {GROQ_API_KEY}"},
- json=req,
- )
- resp.raise_for_status()
- data = resp.json()
- content = data["choices"][0]["message"]["content"]
- if GROQ_DEBUG:
- msg = f"[GROQ RAW RESPONSE] {content}"
- logger.warning(msg)
- print(msg, flush=True)
- parsed = json.loads(content)
- # Normalize output types into our cluster shape.
- topic = parsed.get("topic") or cluster.get("topic")
- entities = parsed.get("entities") or []
- sentiment = parsed.get("sentiment") or "neutral"
- sentiment_score = parsed.get("sentimentScore")
- keywords = parsed.get("keywords") or []
- out = dict(cluster)
- if topic:
- out["topic"] = topic
- out["entities"] = entities
- out["sentiment"] = sentiment
- if sentiment_score is not None:
- out["sentimentScore"] = float(sentiment_score)
- out["keywords"] = keywords
- return out
- async def summarize_cluster_groq(cluster: Dict[str, Any]) -> Dict[str, Any]:
- """Produce a compact agent-facing summary.
- Returns:
- {
- "headline": str,
- "mergedSummary": str,
- "keyFacts": [str,...],
- "sources": [str,...]
- }
- """
- if not GROQ_API_KEY:
- return {
- "headline": cluster.get("headline"),
- "mergedSummary": cluster.get("summary"),
- "keyFacts": [],
- "sources": cluster.get("sources", []),
- }
- headline = cluster.get("headline", "")
- summary = cluster.get("summary", "")
- articles = cluster.get("articles", [])
- sample = articles[:5]
- req = {
- "model": GROQ_MODEL,
- "messages": [
- {
- "role": "system",
- "content": "You are a summarization engine for news clusters. Return strict JSON only.",
- },
- {
- "role": "user",
- "content": json.dumps(
- {
- "headline": headline,
- "summary": summary,
- "articles": [
- {
- "title": a.get("title"),
- "url": a.get("url"),
- "source": a.get("source"),
- "timestamp": a.get("timestamp"),
- }
- for a in sample
- ],
- },
- ensure_ascii=False,
- )
- + "\n\nReturn keys: headline, mergedSummary, keyFacts (5-8 strings), sources. mergedSummary should be 2-4 sentences.",
- },
- ],
- "temperature": 0.2,
- "response_format": {"type": "json_object"},
- }
- async with httpx.AsyncClient(timeout=45.0) as client:
- resp = await client.post(
- "https://api.groq.com/openai/v1/chat/completions",
- headers={"Authorization": f"Bearer {GROQ_API_KEY}"},
- json=req,
- )
- resp.raise_for_status()
- data = resp.json()
- content = data["choices"][0]["message"]["content"]
- parsed = json.loads(content)
- return parsed
|