from __future__ import annotations import tempfile from pathlib import Path from news_mcp.dedup.cluster import dedup_and_cluster_articles from news_mcp.storage.sqlite_store import SQLiteClusterStore from news_mcp.enrichment.importance import compute_importance from news_mcp.enrichment.llm_enrich import _filter_entities, _matches_blacklist from news_mcp.entity_normalize import normalize_query, normalize_entities from news_mcp.llm import build_extraction_prompt, call_llm, load_prompt from news_mcp.trends_resolution import resolve_entity_via_trends from news_mcp.mcp_server_fastmcp import _sort_clusters_by_recency def _article(title: str, url: str = "https://example.com/x", source: str = "Src", ts: str = "Mon, 30 Mar 2026 12:00:00 GMT"): return { "title": title, "url": url, "source": source, "timestamp": ts, "summary": "summary text", } def test_dedup_merges_similar_titles(): articles = [ _article("Trump warns Iran war could spread"), _article("Trump warns Iran conflict could spread"), _article("Unrelated sports result"), ] clustered = dedup_and_cluster_articles(articles, similarity_threshold=0.75) # We expect the Trump/Iran items to be merged into one cluster in the same topic bucket. total_clusters = sum(len(v) for v in clustered.values()) assert total_clusters == 2 def test_sqlite_feed_hash_roundtrip(): with tempfile.TemporaryDirectory() as td: db = Path(td) / "news.sqlite" store = SQLiteClusterStore(db) assert store.get_feed_hash("breakingthenews") is None store.set_feed_hash("breakingthenews", "abc123") assert store.get_feed_hash("breakingthenews") == "abc123" def test_sqlite_summary_cache_roundtrip(): with tempfile.TemporaryDirectory() as td: db = Path(td) / "news.sqlite" store = SQLiteClusterStore(db) # Upsert a base cluster first. store.upsert_clusters([ { "cluster_id": "cid1", "headline": "Headline", "summary": "Summary", "entities": ["Iran"], "sentiment": "negative", "importance": 0.5, "sources": ["BreakingTheNews"], "timestamp": "Mon, 30 Mar 2026 12:00:00 GMT", "articles": [], "first_seen": "Mon, 30 Mar 2026 12:00:00 GMT", "last_updated": "Mon, 30 Mar 2026 12:00:00 GMT", } ], topic="other") store.upsert_cluster_summary( "cid1", { "headline": "Headline", "mergedSummary": "Merged summary", "keyFacts": ["Fact 1"], "sources": ["BreakingTheNews"], }, ) cached = store.get_cluster_summary("cid1", ttl_hours=24) assert cached is not None assert cached["mergedSummary"] == "Merged summary" assert cached["keyFacts"] == ["Fact 1"] def test_sqlite_summary_cache_does_not_create_placeholder_row(): with tempfile.TemporaryDirectory() as td: db = Path(td) / "news.sqlite" store = SQLiteClusterStore(db) store.upsert_cluster_summary( "missing", { "headline": "Missing", "mergedSummary": "Summary", "keyFacts": [], "sources": [], }, ) assert store.get_cluster_by_id("missing") is None assert store.get_cluster_summary("missing", ttl_hours=24) is None def test_prune_clusters_deletes_rows_older_than_retention(): with tempfile.TemporaryDirectory() as td: db = Path(td) / "news.sqlite" store = SQLiteClusterStore(db) store.upsert_clusters([ { "cluster_id": "fresh", "headline": "Fresh", "summary": "Fresh summary", "entities": ["Bitcoin"], "timestamp": "Wed, 01 Apr 2026 12:00:00 GMT", "articles": [], }, { "cluster_id": "stale", "headline": "Stale", "summary": "Stale summary", "entities": ["Iran"], "timestamp": "Wed, 01 Apr 2026 11:00:00 GMT", "articles": [], }, ], topic="other") with store._conn() as conn: conn.execute( "UPDATE clusters SET updated_at=? WHERE cluster_id=?", ("2025-01-01T00:00:00+00:00", "stale"), ) deleted = store.prune_clusters(retention_days=30) assert deleted == 1 assert store.get_cluster_by_id("stale") is None assert store.get_cluster_by_id("fresh") is not None assert store.get_prune_state(pruning_enabled=True, retention_days=30, interval_hours=24)["last_prune_at"] is not None def test_prune_if_due_skips_deletes_when_pruning_disabled(): with tempfile.TemporaryDirectory() as td: db = Path(td) / "news.sqlite" store = SQLiteClusterStore(db) store.upsert_clusters([ { "cluster_id": "stale", "headline": "Stale", "summary": "Stale summary", "entities": ["Iran"], "timestamp": "Wed, 01 Apr 2026 11:00:00 GMT", "articles": [], } ], topic="other") with store._conn() as conn: conn.execute( "UPDATE clusters SET updated_at=? WHERE cluster_id=?", ("2025-01-01T00:00:00+00:00", "stale"), ) result = store.prune_if_due(pruning_enabled=False, retention_days=30, interval_hours=24) assert result["enabled"] is False assert result["deleted"] == 0 assert store.get_cluster_by_id("stale") is not None def test_get_latest_clusters_orders_by_updated_at_before_limit(): with tempfile.TemporaryDirectory() as td: db = Path(td) / "news.sqlite" store = SQLiteClusterStore(db) store.upsert_clusters( [ { "cluster_id": "old", "headline": "Old", "summary": "Old summary", "entities": ["Iran"], "timestamp": "Wed, 01 Apr 2026 09:00:00 GMT", "articles": [], }, { "cluster_id": "new", "headline": "New", "summary": "New summary", "entities": ["Bitcoin"], "timestamp": "Wed, 01 Apr 2026 11:00:00 GMT", "articles": [], }, ], topic="crypto", ) with store._conn() as conn: conn.execute("UPDATE clusters SET updated_at=? WHERE cluster_id=?", ("2025-01-01T00:00:00+00:00", "new")) conn.execute("UPDATE clusters SET updated_at=? WHERE cluster_id=?", ("2026-01-01T00:00:00+00:00", "old")) latest = store.get_latest_clusters(topic="crypto", ttl_hours=24 * 365, limit=1) assert len(latest) == 1 assert latest[0]["cluster_id"] == "new" def test_get_entity_metadata_prefers_mid_scoped_row(): with tempfile.TemporaryDirectory() as td: db = Path(td) / "news.sqlite" store = SQLiteClusterStore(db) store.upsert_entity_metadata("Bitcoin", canonical_label="Bitcoin", mid=None, sources=["local"]) store.upsert_entity_metadata("Bitcoin", canonical_label="Bitcoin", mid="/m/Bitcoin", sources=["trends"]) store.record_entity_request("Bitcoin", mid="/m/Bitcoin") meta = store.get_entity_metadata("Bitcoin") assert meta is not None assert meta["mid"] == "/m/Bitcoin" def test_blacklist_filters_entities_case_insensitively(): entities = ["Bloomberg", "Reuters", "bloomberg", "CoinDesk"] filtered = _filter_entities(entities, blacklist=["bloomberg"]) assert filtered == ["Reuters", "CoinDesk"] def test_blacklist_supports_wildcards(): assert _matches_blacklist("Bloomberg Economics", blacklist=["bloomberg*"]) assert _matches_blacklist("bloomberg", blacklist=["*berg"]) assert not _matches_blacklist("Reuters", blacklist=["bloomberg*"]) def test_query_normalization_keeps_common_shorthand_working(): assert normalize_query("btc") == "Bitcoin" assert normalize_query("Trump") == "Donald Trump" assert normalize_query("nvidia") == "nvidia" def test_entity_normalization_deduplicates_aliases(): assert normalize_entities(["btc", "Bitcoin", "BTC", "Ethereum"]) == ["Bitcoin", "Ethereum"] def test_load_prompt_reads_prompt_files(): text = load_prompt("extract_entities.prompt") assert "Return STRICT JSON" in text def test_resolve_entity_falls_back_cleanly_when_provider_unavailable(monkeypatch): import news_mcp.trends_resolution as trends_resolution trends_resolution.resolve_entity_via_trends.cache_clear() trends_resolution._provider.cache_clear() monkeypatch.setattr(trends_resolution, "_provider", lambda: None) resolved = resolve_entity_via_trends("btc") assert resolved["normalized"] == "Bitcoin" assert resolved["canonical_label"] == "Bitcoin" assert resolved["mid"] is None assert resolved["candidates"] == [] assert resolved["source"] == "fallback" trends_resolution.resolve_entity_via_trends.cache_clear() def test_sort_clusters_by_recency_prefers_newer_timestamp_over_importance(): clusters = [ {"headline": "older", "timestamp": "Wed, 01 Apr 2026 10:00:00 GMT", "importance": 0.9}, {"headline": "newer", "timestamp": "Wed, 01 Apr 2026 11:00:00 GMT", "importance": 0.1}, ] sorted_clusters = _sort_clusters_by_recency(clusters) assert [c["headline"] for c in sorted_clusters] == ["newer", "older"] def test_build_extraction_prompt_is_stable_without_blacklist(): cluster = { "headline": "Bloomberg reports Bitcoin rallies after US rate comments", "summary": "A report from Bloomberg says Bitcoin moved higher after comments from the Fed.", "articles": [], } prompt = build_extraction_prompt(cluster) assert "Bloomberg reports Bitcoin rallies" in prompt assert "Do NOT return empty entities" in prompt assert "Bloomberg" in prompt # present in the input, not filtered here def test_call_llm_dispatches_to_selected_provider(monkeypatch): async def fake_groq(model, messages, response_json=True): return '{"ok": true, "provider": "groq"}' async def fake_openai(model, messages, response_json=True): return '{"ok": true, "provider": "openai"}' monkeypatch.setattr("news_mcp.llm._call_groq", fake_groq) monkeypatch.setattr("news_mcp.llm._call_openai", fake_openai) import asyncio groq = asyncio.run(call_llm("groq", "x", "sys", "user")) openai = asyncio.run(call_llm("openai", "x", "sys", "user")) assert '"provider": "groq"' in groq assert '"provider": "openai"' in openai def test_refresh_skips_reprocessing_when_feed_hash_is_unchanged(monkeypatch): import news_mcp.jobs.poller as poller import hashlib from news_mcp.config import NEWS_FEED_URL, NEWS_FEED_URLS calls = {"fetch": 0, "cluster": 0, "enrich": 0, "classify": 0} rss_urls = [u.strip() for u in NEWS_FEED_URLS.split(",") if u.strip()] or [NEWS_FEED_URL] material = "\n".join( [ "Bitcoin rallies|https://example.com/a|Wed, 01 Apr 2026 12:00:00 GMT", ] ) expected_hash = hashlib.sha1(material.encode("utf-8")).hexdigest() async def fake_to_thread(fn, limit): calls["fetch"] += 1 return [ { "title": "Bitcoin rallies", "url": "https://example.com/a", "source": "Src", "timestamp": "Wed, 01 Apr 2026 12:00:00 GMT", "summary": "summary", } ] def fake_cluster(articles): calls["cluster"] += 1 return { "crypto": [ { "cluster_id": "cid", "headline": "Bitcoin rallies", "summary": "summary", "entities": [], "sentiment": "neutral", "importance": 0.0, "sources": ["Src"], "timestamp": "Wed, 01 Apr 2026 12:00:00 GMT", "articles": [], } ] } def fake_enrich(cluster): calls["enrich"] += 1 return cluster async def fake_classify(cluster): calls["classify"] += 1 return cluster class DummyStore: def __init__(self, *args, **kwargs): self.meta = {} self.feed_hash = expected_hash def get_feed_hash(self, feed_key): return self.feed_hash def set_feed_hash(self, feed_key, last_hash): self.feed_hash = last_hash def get_cluster_by_id(self, cluster_id): return None def upsert_clusters(self, clusters, topic): self.meta["upserted"] = (len(clusters), topic) def prune_if_due(self, **kwargs): self.meta["prune"] = kwargs return {"deleted": 0} def set_meta(self, key, value): self.meta[key] = value monkeypatch.setattr(poller, "SQLiteClusterStore", DummyStore) monkeypatch.setattr(poller, "fetch_news_articles", lambda limit: [{"title": "Bitcoin rallies", "url": "https://example.com/a", "timestamp": "Wed, 01 Apr 2026 12:00:00 GMT"}]) monkeypatch.setattr(poller.asyncio, "to_thread", fake_to_thread) monkeypatch.setattr(poller, "dedup_and_cluster_articles", fake_cluster) monkeypatch.setattr(poller, "enrich_cluster", fake_enrich) monkeypatch.setattr(poller, "classify_cluster_llm", fake_classify) poller.store = None async def run_once(): await poller.refresh_clusters(topic=None, limit=80) import asyncio asyncio.run(run_once()) assert calls["fetch"] == 1 assert calls["cluster"] == 0 assert calls["enrich"] == 0 assert calls["classify"] == 0 def test_importance_prefers_llm_signal(): # Two clusters with same coverage but different sentiment magnitude. base = { "sources": ["A", "B"], "articles": [{}, {}], "sentiment": "neutral", "sentimentScore": 0.0, } pos = dict(base, sentimentScore=0.9) neg = dict(base, sentimentScore=-0.8) imp_base = compute_importance(base) imp_pos = compute_importance(pos) imp_neg = compute_importance(neg) assert imp_pos >= imp_base assert imp_neg >= imp_base