from __future__ import annotations import tempfile from pathlib import Path from news_mcp.dedup.cluster import dedup_and_cluster_articles from news_mcp.storage.sqlite_store import SQLiteClusterStore from news_mcp.enrichment.importance import compute_importance from news_mcp.enrichment.llm_enrich import _filter_entities, _matches_blacklist from news_mcp.entity_normalize import normalize_query, normalize_entities from news_mcp.llm import build_extraction_prompt, call_llm, load_prompt from news_mcp.trends_resolution import resolve_entity_via_trends from news_mcp.mcp_server_fastmcp import _sort_clusters_by_recency def _article(title: str, url: str = "https://example.com/x", source: str = "Src", ts: str = "Mon, 30 Mar 2026 12:00:00 GMT"): return { "title": title, "url": url, "source": source, "timestamp": ts, "summary": "summary text", } def test_dedup_merges_similar_titles(): articles = [ _article("Trump warns Iran war could spread"), _article("Trump warns Iran conflict could spread"), _article("Unrelated sports result"), ] clustered = dedup_and_cluster_articles(articles, similarity_threshold=0.75) # We expect the Trump/Iran items to be merged into one cluster in the same topic bucket. total_clusters = sum(len(v) for v in clustered.values()) assert total_clusters == 2 def test_sqlite_feed_hash_roundtrip(): with tempfile.TemporaryDirectory() as td: db = Path(td) / "news.sqlite" store = SQLiteClusterStore(db) assert store.get_feed_hash("breakingthenews") is None store.set_feed_hash("breakingthenews", "abc123") assert store.get_feed_hash("breakingthenews") == "abc123" def test_sqlite_summary_cache_roundtrip(): with tempfile.TemporaryDirectory() as td: db = Path(td) / "news.sqlite" store = SQLiteClusterStore(db) # Upsert a base cluster first. store.upsert_clusters([ { "cluster_id": "cid1", "headline": "Headline", "summary": "Summary", "entities": ["Iran"], "sentiment": "negative", "importance": 0.5, "sources": ["BreakingTheNews"], "timestamp": "Mon, 30 Mar 2026 12:00:00 GMT", "articles": [], "first_seen": "Mon, 30 Mar 2026 12:00:00 GMT", "last_updated": "Mon, 30 Mar 2026 12:00:00 GMT", } ], topic="other") store.upsert_cluster_summary( "cid1", { "headline": "Headline", "mergedSummary": "Merged summary", "keyFacts": ["Fact 1"], "sources": ["BreakingTheNews"], }, ) cached = store.get_cluster_summary("cid1", ttl_hours=24) assert cached is not None assert cached["mergedSummary"] == "Merged summary" assert cached["keyFacts"] == ["Fact 1"] def test_prune_clusters_deletes_rows_older_than_retention(): with tempfile.TemporaryDirectory() as td: db = Path(td) / "news.sqlite" store = SQLiteClusterStore(db) store.upsert_clusters([ { "cluster_id": "fresh", "headline": "Fresh", "summary": "Fresh summary", "entities": ["Bitcoin"], "timestamp": "Wed, 01 Apr 2026 12:00:00 GMT", "articles": [], }, { "cluster_id": "stale", "headline": "Stale", "summary": "Stale summary", "entities": ["Iran"], "timestamp": "Wed, 01 Apr 2026 11:00:00 GMT", "articles": [], }, ], topic="other") with store._conn() as conn: conn.execute( "UPDATE clusters SET updated_at=? WHERE cluster_id=?", ("2025-01-01T00:00:00+00:00", "stale"), ) deleted = store.prune_clusters(retention_days=30) assert deleted == 1 assert store.get_cluster_by_id("stale") is None assert store.get_cluster_by_id("fresh") is not None assert store.get_prune_state(pruning_enabled=True, retention_days=30, interval_hours=24)["last_prune_at"] is not None def test_prune_if_due_skips_deletes_when_pruning_disabled(): with tempfile.TemporaryDirectory() as td: db = Path(td) / "news.sqlite" store = SQLiteClusterStore(db) store.upsert_clusters([ { "cluster_id": "stale", "headline": "Stale", "summary": "Stale summary", "entities": ["Iran"], "timestamp": "Wed, 01 Apr 2026 11:00:00 GMT", "articles": [], } ], topic="other") with store._conn() as conn: conn.execute( "UPDATE clusters SET updated_at=? WHERE cluster_id=?", ("2025-01-01T00:00:00+00:00", "stale"), ) result = store.prune_if_due(pruning_enabled=False, retention_days=30, interval_hours=24) assert result["enabled"] is False assert result["deleted"] == 0 assert store.get_cluster_by_id("stale") is not None def test_blacklist_filters_entities_case_insensitively(): entities = ["Bloomberg", "Reuters", "bloomberg", "CoinDesk"] filtered = _filter_entities(entities, blacklist=["bloomberg"]) assert filtered == ["Reuters", "CoinDesk"] def test_blacklist_supports_wildcards(): assert _matches_blacklist("Bloomberg Economics", blacklist=["bloomberg*"]) assert _matches_blacklist("bloomberg", blacklist=["*berg"]) assert not _matches_blacklist("Reuters", blacklist=["bloomberg*"]) def test_query_normalization_keeps_common_shorthand_working(): assert normalize_query("btc") == "Bitcoin" assert normalize_query("Trump") == "Donald Trump" assert normalize_query("nvidia") == "nvidia" def test_entity_normalization_deduplicates_aliases(): assert normalize_entities(["btc", "Bitcoin", "BTC", "Ethereum"]) == ["Bitcoin", "Ethereum"] def test_load_prompt_reads_prompt_files(): text = load_prompt("extract_entities.prompt") assert "Return STRICT JSON" in text def test_resolve_entity_falls_back_cleanly_when_provider_unavailable(monkeypatch): import news_mcp.trends_resolution as trends_resolution trends_resolution.resolve_entity_via_trends.cache_clear() trends_resolution._provider.cache_clear() monkeypatch.setattr(trends_resolution, "_provider", lambda: None) resolved = resolve_entity_via_trends("btc") assert resolved["normalized"] == "Bitcoin" assert resolved["canonical_label"] == "Bitcoin" assert resolved["mid"] is None assert resolved["candidates"] == [] assert resolved["source"] == "fallback" trends_resolution.resolve_entity_via_trends.cache_clear() def test_sort_clusters_by_recency_prefers_newer_timestamp_over_importance(): clusters = [ {"headline": "older", "timestamp": "Wed, 01 Apr 2026 10:00:00 GMT", "importance": 0.9}, {"headline": "newer", "timestamp": "Wed, 01 Apr 2026 11:00:00 GMT", "importance": 0.1}, ] sorted_clusters = _sort_clusters_by_recency(clusters) assert [c["headline"] for c in sorted_clusters] == ["newer", "older"] def test_build_extraction_prompt_is_stable_without_blacklist(): cluster = { "headline": "Bloomberg reports Bitcoin rallies after US rate comments", "summary": "A report from Bloomberg says Bitcoin moved higher after comments from the Fed.", "articles": [], } prompt = build_extraction_prompt(cluster) assert "Bloomberg reports Bitcoin rallies" in prompt assert "Do NOT return empty entities" in prompt assert "Bloomberg" in prompt # present in the input, not filtered here def test_call_llm_dispatches_to_selected_provider(monkeypatch): async def fake_groq(model, messages, response_json=True): return '{"ok": true, "provider": "groq"}' async def fake_openai(model, messages, response_json=True): return '{"ok": true, "provider": "openai"}' monkeypatch.setattr("news_mcp.llm._call_groq", fake_groq) monkeypatch.setattr("news_mcp.llm._call_openai", fake_openai) import asyncio groq = asyncio.run(call_llm("groq", "x", "sys", "user")) openai = asyncio.run(call_llm("openai", "x", "sys", "user")) assert '"provider": "groq"' in groq assert '"provider": "openai"' in openai def test_importance_prefers_llm_signal(): # Two clusters with same coverage but different sentiment magnitude. base = { "sources": ["A", "B"], "articles": [{}, {}], "sentiment": "neutral", "sentimentScore": 0.0, } pos = dict(base, sentimentScore=0.9) neg = dict(base, sentimentScore=-0.8) imp_base = compute_importance(base) imp_pos = compute_importance(pos) imp_neg = compute_importance(neg) assert imp_pos >= imp_base assert imp_neg >= imp_base