from __future__ import annotations from fnmatch import fnmatchcase from typing import Any, Dict from news_mcp.config import NEWS_ENTITY_BLACKLIST, DEFAULT_TOPICS from news_mcp.entity_normalize import normalize_entities from news_mcp.llm import call_extraction, call_summary from news_mcp.trends_resolution import resolve_entity_via_trends def _matches_blacklist(value: str, blacklist=None) -> bool: patterns = [x.strip().lower() for x in (blacklist if blacklist is not None else NEWS_ENTITY_BLACKLIST) if x and x.strip()] key = str(value).strip().lower() if not key: return True return any(fnmatchcase(key, pattern) for pattern in patterns) def _filter_entities(entities, blacklist=None): out = [] for ent in entities or []: if _matches_blacklist(ent, blacklist=blacklist): continue out.append(ent) return out async def classify_cluster_llm(cluster: Dict[str, Any]) -> Dict[str, Any]: parsed = await call_extraction(cluster) out = dict(cluster) # Topic: prefer the LLM's classification, fall back to the heuristic topic # already on the input cluster. Validate against the allowed set so we never # promote a free-form string into the SQL row column. raw_topic = parsed.get("topic", cluster.get("topic")) topic = str(raw_topic).strip().lower() if raw_topic else None if topic and _matches_blacklist(topic): topic = "other" if topic not in {t.lower() for t in DEFAULT_TOPICS}: # Unknown / hallucinated label -> fall back to whatever the heuristic # classifier on the headline gave us, else "other". fallback = str(cluster.get("topic") or "").strip().lower() topic = fallback if fallback in {t.lower() for t in DEFAULT_TOPICS} else "other" # IMPORTANT: normalize aliases BEFORE applying the blacklist, otherwise # blacklisting "bitcoin" misses entries the LLM returned as "btc". entities = _filter_entities(normalize_entities(parsed.get("entities", []))) keywords = _filter_entities(normalize_entities(parsed.get("keywords", []))) out.update({ "topic": topic, "entities": entities, "entityResolutions": [resolve_entity_via_trends(e) for e in entities], "sentiment": parsed.get("sentiment", "neutral"), "sentimentScore": parsed.get("sentimentScore"), "keywords": keywords, }) return out async def summarize_cluster_llm(cluster: Dict[str, Any]) -> Dict[str, Any]: parsed = await call_summary(cluster) return parsed # Backward-compatible aliases during the transition away from provider-specific naming. classify_cluster_groq = classify_cluster_llm summarize_cluster_groq = summarize_cluster_llm