poller.py 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600
  1. from __future__ import annotations
  2. import asyncio
  3. import hashlib
  4. import logging
  5. import sys
  6. from collections import defaultdict
  7. from dataclasses import dataclass, field
  8. from datetime import datetime, timezone, timedelta
  9. from typing import Any, Dict
  10. from news_mcp.config import (
  11. DEFAULT_TOPICS,
  12. DB_PATH,
  13. ENRICH_OTHER_TOPICS_ONLY,
  14. NEWS_EXTRACT_PROVIDER,
  15. NEWS_FEED_URL,
  16. NEWS_FEED_URLS,
  17. NEWS_PRUNE_INTERVAL_HOURS,
  18. NEWS_PRUNING_ENABLED,
  19. NEWS_RETENTION_DAYS,
  20. NEWS_CLUSTER_MAX_AGE_HOURS,
  21. llm_concurrency,
  22. llm_rate_limit,
  23. )
  24. from news_mcp.dedup.cluster import dedup_and_cluster_articles, _cluster_is_within_age_window, _parse_ts
  25. from news_mcp.enrichment.enrich import enrich_cluster
  26. from news_mcp.enrichment.llm_enrich import classify_cluster_llm
  27. from news_mcp.sources.news_feeds import fetch_news_articles
  28. from news_mcp.storage.sqlite_store import SQLiteClusterStore
  29. # --------------------------------------------------------------------------- #
  30. # Per-feed + per-cycle statistics
  31. # --------------------------------------------------------------------------- #
  32. @dataclass
  33. class FeedStats:
  34. """Per-feed statistics for one polling cycle."""
  35. feed_url: str
  36. fetched: int = 0 # total items fetched from the feed
  37. duplicate: int = 0 # unchanged hash → skipped entirely
  38. stale: int = 0 # older than retention window (dropped)
  39. seen: int = 0 # already in seen_articles table → skipped
  40. ingested: int = 0 # passed dedup + retention + seen, entered clustering
  41. enriched: int = 0 # newly LLM-enriched this cycle
  42. already_enriched: int = 0 # cache hit — already had entities+keywords
  43. failed: int = 0 # LLM enrichment failed after retries
  44. @dataclass
  45. class PollStats:
  46. """Aggregated statistics for one polling cycle."""
  47. started_at: str = ""
  48. feeds: list[FeedStats] = field(default_factory=list)
  49. total_clusters: int = 0
  50. total_newly_enriched: int = 0
  51. total_already_enriched: int = 0
  52. total_failed: int = 0
  53. def summary(self) -> dict:
  54. return {
  55. "started_at": self.started_at,
  56. "feeds": [
  57. {
  58. "feed_url": f.feed_url,
  59. "fetched": f.fetched,
  60. "duplicate": f.duplicate,
  61. "stale": f.stale,
  62. "seen": f.seen,
  63. "ingested": f.ingested,
  64. }
  65. for f in self.feeds
  66. ],
  67. "total_clusters": self.total_clusters,
  68. "total_newly_enriched": self.total_newly_enriched,
  69. "total_already_enriched": self.total_already_enriched,
  70. "total_failed": self.total_failed,
  71. }
  72. # --------------------------------------------------------------------------- #
  73. # Poller
  74. # --------------------------------------------------------------------------- #
  75. class ClusterPoller:
  76. """One polling cycle: fetch → dedup → cluster → enrich-once → store."""
  77. MAX_ENRICHMENT_RETRIES = 3
  78. def __init__(
  79. self,
  80. store: SQLiteClusterStore,
  81. logger: logging.Logger | None = None,
  82. ):
  83. self.store = store
  84. self.logger = logger or logging.getLogger("news_mcp.refresh")
  85. self.stats = PollStats()
  86. # ------------------------------------------------------------------ #
  87. # Public entry point
  88. # ------------------------------------------------------------------ #
  89. async def poll(self, topic_filter: str | None = None) -> PollStats:
  90. """Run one full polling cycle. Returns statistics."""
  91. self.stats = PollStats(started_at=datetime.now(timezone.utc).isoformat())
  92. # 1. Load configured + enabled feed URLs
  93. configured_urls = self._load_feed_urls()
  94. enabled_urls = self.store.get_enabled_feed_urls(configured_urls)
  95. self.logger.info("poll start: enabled_feeds=%d configured=%d", len(enabled_urls), len(configured_urls))
  96. # 2. Fetch articles from enabled feeds only, per-feed dedup
  97. feed_map, feed_stats = await self._fetch_feeds(enabled_urls)
  98. # Flatten all fresh articles (stats already tracked per-feed in feed_stats)
  99. all_fresh = [a for articles in feed_map.values() for a in articles]
  100. if not all_fresh:
  101. self.logger.info("poll: no fresh articles from any feed")
  102. self.stats.feeds = feed_stats
  103. self._save_feed_stats(feed_stats)
  104. self._prune_and_finalize(enabled_urls, feed_map)
  105. return self.stats
  106. # 3. Retention filter
  107. articles = self._apply_retention(all_fresh, feed_map)
  108. if not articles:
  109. self.logger.info("poll: all %d fresh articles dropped by retention", len(all_fresh))
  110. self.stats.feeds = feed_stats
  111. self._save_feed_stats(feed_stats)
  112. self._prune_and_finalize(enabled_urls, feed_map)
  113. return self.stats
  114. # 3b. Seen-articles filter: drop articles whose URL was already
  115. # processed with the same content. Three outcomes:
  116. # new → never seen, full clustering + enrichment
  117. # seen_unchanged → same URL, same content hash → skip entirely
  118. # seen_changed → same URL, different content → re-cluster to update
  119. # the existing cluster (triggers re-enrichment)
  120. new_articles, seen_unchanged, seen_changed = self.store.filter_already_seen(articles)
  121. changed_cluster_ids: set[str] = set()
  122. if seen_unchanged or seen_changed:
  123. for a in seen_unchanged:
  124. fu = a.get("feed_url", "")
  125. for fs in feed_stats:
  126. if fs.feed_url == fu:
  127. fs.seen += 1
  128. break
  129. for a in seen_changed:
  130. fu = a.get("feed_url", "")
  131. for fs in feed_stats:
  132. if fs.feed_url == fu:
  133. fs.seen += 1 # still "seen" (by URL), but content changed
  134. break
  135. self.logger.info(
  136. "seen_articles: total=%d new=%d unchanged=%d changed=%d",
  137. len(articles), len(new_articles), len(seen_unchanged), len(seen_changed),
  138. )
  139. # Merge changed articles with new ones for clustering
  140. articles = new_articles + seen_changed
  141. if not articles:
  142. self.logger.info("poll: all articles already seen (nothing new to cluster)")
  143. self.stats.feeds = feed_stats
  144. self._save_feed_stats(feed_stats)
  145. self._prune_and_finalize(enabled_urls, feed_map)
  146. return self.stats
  147. # 3c. For changed-content articles whose feed has re-enrichment enabled,
  148. # clear enriched_at in the cluster payload JSON so the next enrichment
  149. # cycle re-processes them with the updated article data.
  150. # Feeds with re-enrich disabled are left alone — their clusters keep
  151. # existing enrichment and the changed articles are silently skipped.
  152. if seen_changed:
  153. from news_mcp.article_identity import article_key as _ak
  154. import json as _json
  155. # Group changed articles by feed_url
  156. from collections import defaultdict as _dd
  157. changed_by_feed: dict[str, list] = _dd(list)
  158. for a in seen_changed:
  159. fu = a.get("feed_url", "")
  160. changed_by_feed[fu].append(a)
  161. # Only process feeds that have re-enrichment enabled
  162. re_enrich_feeds = set()
  163. for fu in changed_by_feed:
  164. if self.store.is_re_enrich_enabled(fu):
  165. re_enrich_feeds.add(fu)
  166. # Move disabled-feed articles from seen_changed → seen_unchanged
  167. truly_changed = []
  168. for a in seen_changed:
  169. fu = a.get("feed_url", "")
  170. if fu in re_enrich_feeds:
  171. truly_changed.append(a)
  172. else:
  173. # Re-enrich disabled → treat as unchanged (skip silently)
  174. seen_unchanged.append(a)
  175. fu_stats = [fs for fs in feed_stats if fs.feed_url == fu]
  176. if fu_stats:
  177. fu_stats[0].seen += 1
  178. seen_changed = truly_changed
  179. if seen_changed:
  180. changed_keys = {_ak(a) for a in seen_changed}
  181. with self.store._conn() as conn:
  182. for ak in changed_keys:
  183. row = conn.execute(
  184. "SELECT sa.cluster_id, c.payload FROM seen_articles sa "
  185. "JOIN clusters c ON c.cluster_id = sa.cluster_id "
  186. "WHERE sa.article_key=?",
  187. (ak,),
  188. ).fetchone()
  189. if row:
  190. changed_cluster_ids.add(row[0])
  191. payload = _json.loads(row[1])
  192. payload.pop("enriched_at", None)
  193. conn.execute(
  194. "UPDATE clusters SET payload=? WHERE cluster_id=?",
  195. (_json.dumps(payload, ensure_ascii=False), row[0]),
  196. )
  197. if changed_cluster_ids:
  198. self.logger.info(
  199. "content_changed: clusters=%d will re-enrich (feeds=%s)",
  200. len(changed_cluster_ids),
  201. ",".join(sorted(re_enrich_feeds)),
  202. )
  203. else:
  204. self.logger.info("content_changed: all %d changed articles on re-enrich disabled feeds → skipped", len(changed_by_feed) and sum(len(v) for v in changed_by_feed.values()))
  205. # 4. Pre-seed existing clusters for cross-cycle merging
  206. existing_clusters = self._preseed_clusters()
  207. # 5. Cluster (sync, may do concurrent embeddings internally)
  208. clustered_by_topic = await self._cluster(articles, existing_clusters)
  209. # 6. Enrich every cluster that needs it, store immediately
  210. await self._enrich_all(clustered_by_topic)
  211. # 7. Retry previously failed enrichments
  212. await self._retry_failed()
  213. # 8. Persist feed stats + prune
  214. self.stats.feeds = feed_stats
  215. self._save_feed_stats(feed_stats)
  216. self._prune_and_finalize(enabled_urls, feed_map)
  217. self.logger.info(
  218. "poll complete: clusters=%d newly_enriched=%d already_enriched=%d failed=%d",
  219. self.stats.total_clusters,
  220. self.stats.total_newly_enriched,
  221. self.stats.total_already_enriched,
  222. self.stats.total_failed,
  223. )
  224. return self.stats
  225. # ------------------------------------------------------------------ #
  226. # Phase 1: Load feed URLs
  227. # ------------------------------------------------------------------ #
  228. @staticmethod
  229. def _load_feed_urls() -> list[str]:
  230. urls = [u.strip() for u in NEWS_FEED_URLS.split(",") if u.strip()]
  231. if not urls:
  232. urls = [NEWS_FEED_URL]
  233. return urls
  234. # ------------------------------------------------------------------ #
  235. # Phase 2: Fetch + per-feed dedup
  236. # ------------------------------------------------------------------ #
  237. async def _fetch_feeds(
  238. self, feed_urls: list[str],
  239. ) -> tuple[dict[str, list[dict]], list[FeedStats]]:
  240. """Fetch all feeds concurrently. Returns {feed_url: fresh_articles}
  241. and per-feed stats. Unchanged feeds (same content hash) are dropped."""
  242. articles = await fetch_news_articles(limit=9999, url_list=feed_urls)
  243. # limit=9999 effectively means no per-feed cap — fetches everything
  244. # the feed gives us. fetch_news_articles applies max(1, limit).
  245. # Group by feed URL
  246. per_feed: dict[str, list[dict]] = defaultdict(list)
  247. for a in articles:
  248. fu = str(a.get("feed_url") or NEWS_FEED_URL).strip() or NEWS_FEED_URL
  249. per_feed[fu].append(a)
  250. # Per-feed content hash dedup
  251. feed_map: dict[str, list[dict]] = {}
  252. feed_stats_list: list[FeedStats] = []
  253. for feed_url in feed_urls:
  254. feed_articles = per_feed.get(feed_url, [])
  255. stats = FeedStats(feed_url=feed_url, fetched=len(feed_articles))
  256. if not feed_articles:
  257. self.logger.info("feed empty: feed_url=%s", feed_url)
  258. feed_stats_list.append(stats)
  259. continue
  260. material = "\n".join(
  261. f"{a.get('title','')}|{a.get('url','')}"
  262. for a in feed_articles
  263. )
  264. content_hash = hashlib.sha1(material.encode("utf-8")).hexdigest()
  265. prev_hash = self.store.get_feed_hash(feed_url)
  266. if prev_hash == content_hash:
  267. stats.duplicate = len(feed_articles)
  268. self.logger.info("feed unchanged: feed_url=%s items=%d", feed_url, len(feed_articles))
  269. feed_stats_list.append(stats)
  270. continue
  271. feed_map[feed_url] = feed_articles
  272. self.logger.info(
  273. "feed changed: feed_url=%s items=%d hash_prev=%s hash_now=%s",
  274. feed_url, len(feed_articles),
  275. (prev_hash or "-")[:12], content_hash[:12],
  276. )
  277. feed_stats_list.append(stats)
  278. return feed_map, feed_stats_list
  279. # ------------------------------------------------------------------ #
  280. # Phase 3: Retention filter
  281. # ------------------------------------------------------------------ #
  282. def _apply_retention(
  283. self, articles: list[dict], feed_map: dict[str, list[dict]],
  284. ) -> list[dict]:
  285. """Drop articles older than NEWS_RETENTION_DAYS. Updates FeedStats."""
  286. if NEWS_RETENTION_DAYS <= 0:
  287. return articles
  288. cutoff = datetime.now(timezone.utc) - timedelta(days=NEWS_RETENTION_DAYS)
  289. # Build a lookup: article_url → feed_url for stats
  290. article_feed: dict[str, str] = {}
  291. for fu, arts in feed_map.items():
  292. for a in arts:
  293. article_feed[a.get("url", "")] = fu
  294. fresh = []
  295. dropped = 0
  296. for a in articles:
  297. ts_str = a.get("timestamp", "")
  298. if not ts_str:
  299. fresh.append(a)
  300. continue
  301. dt = _parse_ts(ts_str)
  302. if dt is None or dt >= cutoff:
  303. fresh.append(a)
  304. else:
  305. dropped += 1
  306. fu = article_feed.get(a.get("url", ""), "")
  307. if fu:
  308. # Find matching FeedStats and increment stale
  309. for fs in self.stats.feeds:
  310. if fs.feed_url == fu:
  311. fs.stale += 1
  312. break
  313. if dropped:
  314. self.logger.info("retention: dropped=%d remaining=%d retention_days=%.0f", dropped, len(fresh), NEWS_RETENTION_DAYS)
  315. return fresh
  316. # ------------------------------------------------------------------ #
  317. # Phase 4: Pre-seed existing clusters
  318. # ------------------------------------------------------------------ #
  319. def _preseed_clusters(self) -> list[dict]:
  320. """Load recent clusters from DB for cross-cycle article merging."""
  321. max_age = NEWS_CLUSTER_MAX_AGE_HOURS
  322. if max_age == 0:
  323. return []
  324. lookback = max_age if max_age > 0 else 72
  325. all_recent = self.store.get_latest_clusters_all_topics(ttl_hours=lookback, limit=500)
  326. recent = [c for c in all_recent if _cluster_is_within_age_window(c, max_age_hours=max_age)]
  327. self.logger.info("pre-seeded: existing_clusters=%d max_age_h=%.1f", len(recent), max_age)
  328. return recent
  329. # ------------------------------------------------------------------ #
  330. # Phase 5: Clustering
  331. # ------------------------------------------------------------------ #
  332. async def _cluster(
  333. self, articles: list[dict], existing_clusters: list[dict],
  334. ) -> dict[str, list[dict]]:
  335. """Run dedup_and_cluster_articles. Returns {topic: [clusters]}."""
  336. self.logger.info("clustering: articles=%d existing_clusters=%d", len(articles), len(existing_clusters))
  337. clustered = await asyncio.to_thread(
  338. dedup_and_cluster_articles,
  339. articles,
  340. None, # default similarity_threshold
  341. existing_clusters=existing_clusters if existing_clusters else None,
  342. max_age_hours=NEWS_CLUSTER_MAX_AGE_HOURS,
  343. )
  344. self.logger.info("clustered: topics=%s", list(clustered.keys()))
  345. return clustered
  346. # ------------------------------------------------------------------ #
  347. # Phase 6: Enrich + store
  348. # ------------------------------------------------------------------ #
  349. async def _enrich_all(self, clustered_by_topic: dict[str, list[dict]]) -> None:
  350. """Enrich every cluster that needs it and store immediately."""
  351. semaphore = asyncio.Semaphore(llm_concurrency(NEWS_EXTRACT_PROVIDER))
  352. rate = llm_rate_limit(NEWS_EXTRACT_PROVIDER)
  353. self.logger.info(
  354. "enrich: semaphore_limit=%d rate_limit=%s/s provider=%s",
  355. llm_concurrency(NEWS_EXTRACT_PROVIDER), rate, NEWS_EXTRACT_PROVIDER,
  356. )
  357. # Flatten all clusters into one list with their topics
  358. all_targets: list[tuple[str, dict]] = []
  359. for topic, clusters in clustered_by_topic.items():
  360. for c in clusters:
  361. all_targets.append((topic, c))
  362. if not all_targets:
  363. return
  364. # Enrich concurrently
  365. tasks = [
  366. self._enrich_one(c, topic, semaphore, rate)
  367. for topic, c in all_targets
  368. ]
  369. results = await asyncio.gather(*tasks, return_exceptions=False)
  370. # Store each cluster individually, grouped by final topic
  371. by_final_topic: dict[str, list[dict]] = defaultdict(list)
  372. for c2, was_new in results:
  373. final_topic = str(c2.get("topic") or "other").strip().lower()
  374. if final_topic not in {t.lower() for t in DEFAULT_TOPICS}:
  375. final_topic = "other"
  376. by_final_topic[final_topic].append(c2)
  377. self.stats.total_clusters += 1
  378. if was_new:
  379. self.stats.total_newly_enriched += 1
  380. else:
  381. self.stats.total_already_enriched += 1
  382. for final_topic, group in by_final_topic.items():
  383. self.store.upsert_clusters(group, topic=final_topic)
  384. self.logger.info("stored: topic=%s clusters=%d", final_topic, len(group))
  385. async def _enrich_one(
  386. self,
  387. cluster: dict,
  388. topic: str,
  389. semaphore: asyncio.Semaphore,
  390. rate: float,
  391. ) -> tuple[dict, bool]:
  392. """Enrich a single cluster. Returns (cluster, was_newly_enriched).
  393. If the cluster already has entities AND keywords, skip LLM entirely.
  394. The data on the dict IS the cache — no timestamp or DB lookup needed.
  395. """
  396. c2 = enrich_cluster(cluster)
  397. c2.setdefault("topic", topic)
  398. llm_enabled = (not ENRICH_OTHER_TOPICS_ONLY) or (topic == "other")
  399. cluster_id = c2.get("cluster_id")
  400. if not llm_enabled or not cluster_id:
  401. return c2, False
  402. # Cache check: entities + keywords already present → skip
  403. # Exception: enriched_at missing means content changed → force re-enrich
  404. if c2.get("enriched_at") and (c2.get("entities") or []) and (c2.get("keywords") or []):
  405. self.logger.debug("enrich skip (cached): cluster=%s topic=%s", cluster_id, topic)
  406. return c2, False
  407. # Actually call the LLM
  408. last_err = ""
  409. for attempt in range(1 + self.MAX_ENRICHMENT_RETRIES):
  410. if attempt > 0:
  411. backoff = 2 ** attempt
  412. self.logger.info("retry: cluster=%s attempt=%d backoff=%.0fs", cluster_id, attempt, backoff)
  413. await asyncio.sleep(backoff)
  414. try:
  415. async with semaphore:
  416. c2 = await classify_cluster_llm(dict(c2))
  417. c2["enriched_at"] = datetime.now(timezone.utc).isoformat()
  418. return c2, True
  419. except Exception:
  420. last_err = str(sys.exc_info()[1])[:200] if sys.exc_info()[1] else "unknown"
  421. self.logger.warning(
  422. "enrich failed: cluster=%s attempt=%d err=%s",
  423. cluster_id, attempt, last_err,
  424. )
  425. # All retries exhausted
  426. prev_count = c2.get("enrichment_retry_count", 0)
  427. c2["enrichment_failed_at"] = datetime.now(timezone.utc).isoformat()
  428. c2["enrichment_retry_count"] = prev_count + 1
  429. self.logger.error("enrich exhausted: cluster=%s after %d retries", cluster_id, self.MAX_ENRICHMENT_RETRIES)
  430. self.stats.total_failed += 1
  431. return c2, True # was "newly" enriched (attempted), but failed
  432. # ------------------------------------------------------------------ #
  433. # Phase 7: Retry failed enrichments
  434. # ------------------------------------------------------------------ #
  435. async def _retry_failed(self) -> None:
  436. """Retry clusters whose previous enrichment failed."""
  437. failed = self.store.get_failed_enrichment_clusters(max_retries=3)
  438. if not failed:
  439. return
  440. self.logger.info("retry: failed_clusters=%d", len(failed))
  441. semaphore = asyncio.Semaphore(llm_concurrency(NEWS_EXTRACT_PROVIDER))
  442. rate = llm_rate_limit(NEWS_EXTRACT_PROVIDER)
  443. tasks = [
  444. self._enrich_one(c, str(c.get("topic") or "other"), semaphore, rate)
  445. for c in failed
  446. ]
  447. results = await asyncio.gather(*tasks, return_exceptions=False)
  448. by_topic: dict[str, list[dict]] = defaultdict(list)
  449. attempted = 0
  450. now_success = 0
  451. still_failed = 0
  452. for c2, was_new in results:
  453. if not was_new:
  454. continue
  455. attempted += 1
  456. # Clear failure marker on success
  457. if c2.get("enriched_at") and not c2.get("enrichment_failed_at"):
  458. c2.pop("enrichment_failed_at", None)
  459. c2.pop("enrichment_retry_count", None)
  460. now_success += 1
  461. else:
  462. still_failed += 1
  463. t = str(c2.get("topic") or "other").strip().lower()
  464. if t not in {x.lower() for x in DEFAULT_TOPICS}:
  465. t = "other"
  466. by_topic[t].append(c2)
  467. for t, group in by_topic.items():
  468. self.store.upsert_clusters(group, topic=t)
  469. self.logger.info("retry stored: topic=%s clusters=%d", t, len(group))
  470. if attempted:
  471. self.logger.info("retry done: attempted=%d recovered=%d still_failed=%d", attempted, now_success, still_failed)
  472. # ------------------------------------------------------------------ #
  473. # Phase 8: Feed stats + prune
  474. # ------------------------------------------------------------------ #
  475. def _save_feed_stats(self, feed_stats: list[FeedStats]) -> None:
  476. """Log per-feed statistics. ingested = fetched - duplicate - stale - seen."""
  477. for fs in feed_stats:
  478. fs.ingested = max(0, fs.fetched - fs.duplicate - fs.stale - fs.seen)
  479. self.logger.info(
  480. "feed stats: feed_url=%s fetched=%d duplicate=%d stale=%d seen=%d ingested=%d",
  481. fs.feed_url, fs.fetched, fs.duplicate, fs.stale, fs.seen, fs.ingested,
  482. )
  483. def _prune_and_finalize(
  484. self,
  485. enabled_urls: list[str],
  486. feed_map: dict[str, list[dict]],
  487. ) -> None:
  488. """Run pruning and update feed_state hashes + timestamps."""
  489. prune_result = self.store.prune_if_due(
  490. pruning_enabled=NEWS_PRUNING_ENABLED,
  491. retention_days=NEWS_RETENTION_DAYS,
  492. interval_hours=NEWS_PRUNE_INTERVAL_HOURS,
  493. )
  494. # Update feed_state: hash + item_count for feeds that had changes
  495. for feed_url, feed_articles in feed_map.items():
  496. material = "\n".join(
  497. f"{a.get('title','')}|{a.get('url','')}"
  498. for a in feed_articles
  499. )
  500. content_hash = hashlib.sha1(material.encode("utf-8")).hexdigest()
  501. self.store.set_feed_state(feed_url, content_hash, len(feed_articles))
  502. # Drop legacy aggregate feed-state rows
  503. with self.store._conn() as conn:
  504. conn.execute("DELETE FROM feed_state WHERE feed_key LIKE 'newsfeeds:%'")
  505. self.store.set_meta("last_refresh_at", datetime.now(timezone.utc).isoformat())
  506. self.logger.info("prune: %s", prune_result)
  507. # --------------------------------------------------------------------------- #
  508. # Compatibility wrapper (used by background loop + tests)
  509. # --------------------------------------------------------------------------- #
  510. async def refresh_clusters(topic: str | None = None, limit: int = 80) -> None:
  511. """Backward-compatible entry point. Delegates to ClusterPoller."""
  512. store = SQLiteClusterStore(DB_PATH)
  513. poller = ClusterPoller(store)
  514. await poller.poll(topic_filter=topic)