| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414 |
- from __future__ import annotations
- import tempfile
- from pathlib import Path
- from news_mcp.dedup.cluster import dedup_and_cluster_articles
- from news_mcp.storage.sqlite_store import SQLiteClusterStore
- from news_mcp.enrichment.importance import compute_importance
- from news_mcp.enrichment.llm_enrich import _filter_entities, _matches_blacklist
- from news_mcp.entity_normalize import normalize_query, normalize_entities
- from news_mcp.llm import build_extraction_prompt, call_llm, load_prompt
- from news_mcp.trends_resolution import resolve_entity_via_trends
- from news_mcp.mcp_server_fastmcp import _sort_clusters_by_recency
- def _article(title: str, url: str = "https://example.com/x", source: str = "Src", ts: str = "Mon, 30 Mar 2026 12:00:00 GMT"):
- return {
- "title": title,
- "url": url,
- "source": source,
- "timestamp": ts,
- "summary": "summary text",
- }
- def test_dedup_merges_similar_titles():
- articles = [
- _article("Trump warns Iran war could spread"),
- _article("Trump warns Iran conflict could spread"),
- _article("Unrelated sports result"),
- ]
- clustered = dedup_and_cluster_articles(articles, similarity_threshold=0.75)
- # We expect the Trump/Iran items to be merged into one cluster in the same topic bucket.
- total_clusters = sum(len(v) for v in clustered.values())
- assert total_clusters == 2
- def test_sqlite_feed_hash_roundtrip():
- with tempfile.TemporaryDirectory() as td:
- db = Path(td) / "news.sqlite"
- store = SQLiteClusterStore(db)
- assert store.get_feed_hash("breakingthenews") is None
- store.set_feed_hash("breakingthenews", "abc123")
- assert store.get_feed_hash("breakingthenews") == "abc123"
- def test_sqlite_summary_cache_roundtrip():
- with tempfile.TemporaryDirectory() as td:
- db = Path(td) / "news.sqlite"
- store = SQLiteClusterStore(db)
- # Upsert a base cluster first.
- store.upsert_clusters([
- {
- "cluster_id": "cid1",
- "headline": "Headline",
- "summary": "Summary",
- "entities": ["Iran"],
- "sentiment": "negative",
- "importance": 0.5,
- "sources": ["BreakingTheNews"],
- "timestamp": "Mon, 30 Mar 2026 12:00:00 GMT",
- "articles": [],
- "first_seen": "Mon, 30 Mar 2026 12:00:00 GMT",
- "last_updated": "Mon, 30 Mar 2026 12:00:00 GMT",
- }
- ], topic="other")
- store.upsert_cluster_summary(
- "cid1",
- {
- "headline": "Headline",
- "mergedSummary": "Merged summary",
- "keyFacts": ["Fact 1"],
- "sources": ["BreakingTheNews"],
- },
- )
- cached = store.get_cluster_summary("cid1", ttl_hours=24)
- assert cached is not None
- assert cached["mergedSummary"] == "Merged summary"
- assert cached["keyFacts"] == ["Fact 1"]
- def test_sqlite_summary_cache_does_not_create_placeholder_row():
- with tempfile.TemporaryDirectory() as td:
- db = Path(td) / "news.sqlite"
- store = SQLiteClusterStore(db)
- store.upsert_cluster_summary(
- "missing",
- {
- "headline": "Missing",
- "mergedSummary": "Summary",
- "keyFacts": [],
- "sources": [],
- },
- )
- assert store.get_cluster_by_id("missing") is None
- assert store.get_cluster_summary("missing", ttl_hours=24) is None
- def test_prune_clusters_deletes_rows_older_than_retention():
- with tempfile.TemporaryDirectory() as td:
- db = Path(td) / "news.sqlite"
- store = SQLiteClusterStore(db)
- store.upsert_clusters([
- {
- "cluster_id": "fresh",
- "headline": "Fresh",
- "summary": "Fresh summary",
- "entities": ["Bitcoin"],
- "timestamp": "Wed, 01 Apr 2026 12:00:00 GMT",
- "articles": [],
- },
- {
- "cluster_id": "stale",
- "headline": "Stale",
- "summary": "Stale summary",
- "entities": ["Iran"],
- "timestamp": "Wed, 01 Apr 2026 11:00:00 GMT",
- "articles": [],
- },
- ], topic="other")
- with store._conn() as conn:
- conn.execute(
- "UPDATE clusters SET updated_at=? WHERE cluster_id=?",
- ("2025-01-01T00:00:00+00:00", "stale"),
- )
- deleted = store.prune_clusters(retention_days=30)
- assert deleted == 1
- assert store.get_cluster_by_id("stale") is None
- assert store.get_cluster_by_id("fresh") is not None
- assert store.get_prune_state(pruning_enabled=True, retention_days=30, interval_hours=24)["last_prune_at"] is not None
- def test_prune_if_due_skips_deletes_when_pruning_disabled():
- with tempfile.TemporaryDirectory() as td:
- db = Path(td) / "news.sqlite"
- store = SQLiteClusterStore(db)
- store.upsert_clusters([
- {
- "cluster_id": "stale",
- "headline": "Stale",
- "summary": "Stale summary",
- "entities": ["Iran"],
- "timestamp": "Wed, 01 Apr 2026 11:00:00 GMT",
- "articles": [],
- }
- ], topic="other")
- with store._conn() as conn:
- conn.execute(
- "UPDATE clusters SET updated_at=? WHERE cluster_id=?",
- ("2025-01-01T00:00:00+00:00", "stale"),
- )
- result = store.prune_if_due(pruning_enabled=False, retention_days=30, interval_hours=24)
- assert result["enabled"] is False
- assert result["deleted"] == 0
- assert store.get_cluster_by_id("stale") is not None
- def test_get_latest_clusters_orders_by_updated_at_before_limit():
- with tempfile.TemporaryDirectory() as td:
- db = Path(td) / "news.sqlite"
- store = SQLiteClusterStore(db)
- store.upsert_clusters(
- [
- {
- "cluster_id": "old",
- "headline": "Old",
- "summary": "Old summary",
- "entities": ["Iran"],
- "timestamp": "Wed, 01 Apr 2026 09:00:00 GMT",
- "articles": [],
- },
- {
- "cluster_id": "new",
- "headline": "New",
- "summary": "New summary",
- "entities": ["Bitcoin"],
- "timestamp": "Wed, 01 Apr 2026 11:00:00 GMT",
- "articles": [],
- },
- ],
- topic="crypto",
- )
- with store._conn() as conn:
- conn.execute("UPDATE clusters SET updated_at=? WHERE cluster_id=?", ("2025-01-01T00:00:00+00:00", "new"))
- conn.execute("UPDATE clusters SET updated_at=? WHERE cluster_id=?", ("2026-01-01T00:00:00+00:00", "old"))
- latest = store.get_latest_clusters(topic="crypto", ttl_hours=24 * 365, limit=1)
- assert len(latest) == 1
- assert latest[0]["cluster_id"] == "new"
- def test_get_entity_metadata_prefers_mid_scoped_row():
- with tempfile.TemporaryDirectory() as td:
- db = Path(td) / "news.sqlite"
- store = SQLiteClusterStore(db)
- store.upsert_entity_metadata("Bitcoin", canonical_label="Bitcoin", mid=None, sources=["local"])
- store.upsert_entity_metadata("Bitcoin", canonical_label="Bitcoin", mid="/m/Bitcoin", sources=["trends"])
- store.record_entity_request("Bitcoin", mid="/m/Bitcoin")
- meta = store.get_entity_metadata("Bitcoin")
- assert meta is not None
- assert meta["mid"] == "/m/Bitcoin"
- def test_blacklist_filters_entities_case_insensitively():
- entities = ["Bloomberg", "Reuters", "bloomberg", "CoinDesk"]
- filtered = _filter_entities(entities, blacklist=["bloomberg"])
- assert filtered == ["Reuters", "CoinDesk"]
- def test_blacklist_supports_wildcards():
- assert _matches_blacklist("Bloomberg Economics", blacklist=["bloomberg*"])
- assert _matches_blacklist("bloomberg", blacklist=["*berg"])
- assert not _matches_blacklist("Reuters", blacklist=["bloomberg*"])
- def test_query_normalization_keeps_common_shorthand_working():
- assert normalize_query("btc") == "Bitcoin"
- assert normalize_query("Trump") == "Donald Trump"
- assert normalize_query("nvidia") == "nvidia"
- def test_entity_normalization_deduplicates_aliases():
- assert normalize_entities(["btc", "Bitcoin", "BTC", "Ethereum"]) == ["Bitcoin", "Ethereum"]
- def test_load_prompt_reads_prompt_files():
- text = load_prompt("extract_entities.prompt")
- assert "Return STRICT JSON" in text
- def test_resolve_entity_falls_back_cleanly_when_provider_unavailable(monkeypatch):
- import news_mcp.trends_resolution as trends_resolution
- trends_resolution.resolve_entity_via_trends.cache_clear()
- trends_resolution._provider.cache_clear()
- monkeypatch.setattr(trends_resolution, "_provider", lambda: None)
- resolved = resolve_entity_via_trends("btc")
- assert resolved["normalized"] == "Bitcoin"
- assert resolved["canonical_label"] == "Bitcoin"
- assert resolved["mid"] is None
- assert resolved["candidates"] == []
- assert resolved["source"] == "fallback"
- trends_resolution.resolve_entity_via_trends.cache_clear()
- def test_sort_clusters_by_recency_prefers_newer_timestamp_over_importance():
- clusters = [
- {"headline": "older", "timestamp": "Wed, 01 Apr 2026 10:00:00 GMT", "importance": 0.9},
- {"headline": "newer", "timestamp": "Wed, 01 Apr 2026 11:00:00 GMT", "importance": 0.1},
- ]
- sorted_clusters = _sort_clusters_by_recency(clusters)
- assert [c["headline"] for c in sorted_clusters] == ["newer", "older"]
- def test_build_extraction_prompt_is_stable_without_blacklist():
- cluster = {
- "headline": "Bloomberg reports Bitcoin rallies after US rate comments",
- "summary": "A report from Bloomberg says Bitcoin moved higher after comments from the Fed.",
- "articles": [],
- }
- prompt = build_extraction_prompt(cluster)
- assert "Bloomberg reports Bitcoin rallies" in prompt
- assert "Do NOT return empty entities" in prompt
- assert "Bloomberg" in prompt # present in the input, not filtered here
- def test_call_llm_dispatches_to_selected_provider(monkeypatch):
- async def fake_groq(model, messages, response_json=True):
- return '{"ok": true, "provider": "groq"}'
- async def fake_openai(model, messages, response_json=True):
- return '{"ok": true, "provider": "openai"}'
- monkeypatch.setattr("news_mcp.llm._call_groq", fake_groq)
- monkeypatch.setattr("news_mcp.llm._call_openai", fake_openai)
- import asyncio
- groq = asyncio.run(call_llm("groq", "x", "sys", "user"))
- openai = asyncio.run(call_llm("openai", "x", "sys", "user"))
- assert '"provider": "groq"' in groq
- assert '"provider": "openai"' in openai
- def test_refresh_skips_reprocessing_when_feed_hash_is_unchanged(monkeypatch):
- import news_mcp.jobs.poller as poller
- import hashlib
- from news_mcp.config import NEWS_FEED_URL, NEWS_FEED_URLS
- calls = {"fetch": 0, "cluster": 0, "enrich": 0, "classify": 0}
- rss_urls = [u.strip() for u in NEWS_FEED_URLS.split(",") if u.strip()] or [NEWS_FEED_URL]
- material = "\n".join(
- [
- "Bitcoin rallies|https://example.com/a|Wed, 01 Apr 2026 12:00:00 GMT",
- ]
- )
- expected_hash = hashlib.sha1(material.encode("utf-8")).hexdigest()
- async def fake_to_thread(fn, limit):
- calls["fetch"] += 1
- return [
- {
- "title": "Bitcoin rallies",
- "url": "https://example.com/a",
- "source": "Src",
- "timestamp": "Wed, 01 Apr 2026 12:00:00 GMT",
- "summary": "summary",
- }
- ]
- def fake_cluster(articles):
- calls["cluster"] += 1
- return {
- "crypto": [
- {
- "cluster_id": "cid",
- "headline": "Bitcoin rallies",
- "summary": "summary",
- "entities": [],
- "sentiment": "neutral",
- "importance": 0.0,
- "sources": ["Src"],
- "timestamp": "Wed, 01 Apr 2026 12:00:00 GMT",
- "articles": [],
- }
- ]
- }
- def fake_enrich(cluster):
- calls["enrich"] += 1
- return cluster
- async def fake_classify(cluster):
- calls["classify"] += 1
- return cluster
- class DummyStore:
- def __init__(self, *args, **kwargs):
- self.meta = {}
- self.feed_hash = expected_hash
- def get_feed_hash(self, feed_key):
- return self.feed_hash
- def set_feed_hash(self, feed_key, last_hash):
- self.feed_hash = last_hash
- def get_cluster_by_id(self, cluster_id):
- return None
- def upsert_clusters(self, clusters, topic):
- self.meta["upserted"] = (len(clusters), topic)
- def prune_if_due(self, **kwargs):
- self.meta["prune"] = kwargs
- return {"deleted": 0}
- def set_meta(self, key, value):
- self.meta[key] = value
- monkeypatch.setattr(poller, "SQLiteClusterStore", DummyStore)
- monkeypatch.setattr(poller, "fetch_news_articles", lambda limit: [{"title": "Bitcoin rallies", "url": "https://example.com/a", "timestamp": "Wed, 01 Apr 2026 12:00:00 GMT"}])
- monkeypatch.setattr(poller.asyncio, "to_thread", fake_to_thread)
- monkeypatch.setattr(poller, "dedup_and_cluster_articles", fake_cluster)
- monkeypatch.setattr(poller, "enrich_cluster", fake_enrich)
- monkeypatch.setattr(poller, "classify_cluster_llm", fake_classify)
- poller.store = None
- async def run_once():
- await poller.refresh_clusters(topic=None, limit=80)
- import asyncio
- asyncio.run(run_once())
- assert calls["fetch"] == 1
- assert calls["cluster"] == 0
- assert calls["enrich"] == 0
- assert calls["classify"] == 0
- def test_importance_prefers_llm_signal():
- # Two clusters with same coverage but different sentiment magnitude.
- base = {
- "sources": ["A", "B"],
- "articles": [{}, {}],
- "sentiment": "neutral",
- "sentimentScore": 0.0,
- }
- pos = dict(base, sentimentScore=0.9)
- neg = dict(base, sentimentScore=-0.8)
- imp_base = compute_importance(base)
- imp_pos = compute_importance(pos)
- imp_neg = compute_importance(neg)
- assert imp_pos >= imp_base
- assert imp_neg >= imp_base
|