from __future__ import annotations import tempfile from pathlib import Path from news_mcp.dedup.cluster import dedup_and_cluster_articles from news_mcp.storage.sqlite_store import SQLiteClusterStore from news_mcp.enrichment.importance import compute_importance from news_mcp.enrichment.llm_enrich import _filter_entities from news_mcp.llm import build_extraction_prompt, call_llm, load_prompt def _article(title: str, url: str = "https://example.com/x", source: str = "Src", ts: str = "Mon, 30 Mar 2026 12:00:00 GMT"): return { "title": title, "url": url, "source": source, "timestamp": ts, "summary": "summary text", } def test_dedup_merges_similar_titles(): articles = [ _article("Trump warns Iran war could spread"), _article("Trump warns Iran conflict could spread"), _article("Unrelated sports result"), ] clustered = dedup_and_cluster_articles(articles, similarity_threshold=0.75) # We expect the Trump/Iran items to be merged into one cluster in the same topic bucket. total_clusters = sum(len(v) for v in clustered.values()) assert total_clusters == 2 def test_sqlite_feed_hash_roundtrip(): with tempfile.TemporaryDirectory() as td: db = Path(td) / "news.sqlite" store = SQLiteClusterStore(db) assert store.get_feed_hash("breakingthenews") is None store.set_feed_hash("breakingthenews", "abc123") assert store.get_feed_hash("breakingthenews") == "abc123" def test_sqlite_summary_cache_roundtrip(): with tempfile.TemporaryDirectory() as td: db = Path(td) / "news.sqlite" store = SQLiteClusterStore(db) # Upsert a base cluster first. store.upsert_clusters([ { "cluster_id": "cid1", "headline": "Headline", "summary": "Summary", "entities": ["Iran"], "sentiment": "negative", "importance": 0.5, "sources": ["BreakingTheNews"], "timestamp": "Mon, 30 Mar 2026 12:00:00 GMT", "articles": [], "first_seen": "Mon, 30 Mar 2026 12:00:00 GMT", "last_updated": "Mon, 30 Mar 2026 12:00:00 GMT", } ], topic="other") store.upsert_cluster_summary( "cid1", { "headline": "Headline", "mergedSummary": "Merged summary", "keyFacts": ["Fact 1"], "sources": ["BreakingTheNews"], }, ) cached = store.get_cluster_summary("cid1", ttl_hours=24) assert cached is not None assert cached["mergedSummary"] == "Merged summary" assert cached["keyFacts"] == ["Fact 1"] def test_blacklist_filters_entities_case_insensitively(): entities = ["Bloomberg", "Reuters", "bloomberg", "CoinDesk"] filtered = _filter_entities(entities, blacklist=["bloomberg"]) assert filtered == ["Reuters", "CoinDesk"] def test_load_prompt_reads_prompt_files(): text = load_prompt("extract_entities.prompt") assert "Return STRICT JSON" in text def test_build_extraction_prompt_is_stable_without_blacklist(): cluster = { "headline": "Bloomberg reports Bitcoin rallies after US rate comments", "summary": "A report from Bloomberg says Bitcoin moved higher after comments from the Fed.", "articles": [], } prompt = build_extraction_prompt(cluster) assert "Bloomberg reports Bitcoin rallies" in prompt assert "Do NOT return empty entities" in prompt assert "Bloomberg" in prompt # present in the input, not filtered here def test_call_llm_dispatches_to_selected_provider(monkeypatch): async def fake_groq(model, messages, response_json=True): return '{"ok": true, "provider": "groq"}' async def fake_openai(model, messages, response_json=True): return '{"ok": true, "provider": "openai"}' monkeypatch.setattr("news_mcp.llm._call_groq", fake_groq) monkeypatch.setattr("news_mcp.llm._call_openai", fake_openai) import asyncio groq = asyncio.run(call_llm("groq", "x", "sys", "user")) openai = asyncio.run(call_llm("openai", "x", "sys", "user")) assert '"provider": "groq"' in groq assert '"provider": "openai"' in openai def test_importance_prefers_llm_signal(): # Two clusters with same coverage but different sentiment magnitude. base = { "sources": ["A", "B"], "articles": [{}, {}], "sentiment": "neutral", "sentimentScore": 0.0, } pos = dict(base, sentimentScore=0.9) neg = dict(base, sentimentScore=-0.8) imp_base = compute_importance(base) imp_pos = compute_importance(pos) imp_neg = compute_importance(neg) assert imp_pos >= imp_base assert imp_neg >= imp_base