|
|
@@ -168,32 +168,69 @@ class ClusterPoller:
|
|
|
self._prune_and_finalize(enabled_urls, feed_map)
|
|
|
return self.stats
|
|
|
|
|
|
- # 3c. For changed-content articles, clear enriched_at in the cluster
|
|
|
- # payload JSON so the next enrichment cycle re-processes them with
|
|
|
- # the updated article data. (enriched_at is stored inside the JSON
|
|
|
- # payload, not as a separate SQL column.)
|
|
|
+ # 3c. For changed-content articles whose feed has re-enrichment enabled,
|
|
|
+ # clear enriched_at in the cluster payload JSON so the next enrichment
|
|
|
+ # cycle re-processes them with the updated article data.
|
|
|
+ # Feeds with re-enrich disabled are left alone — their clusters keep
|
|
|
+ # existing enrichment and the changed articles are silently skipped.
|
|
|
if seen_changed:
|
|
|
from news_mcp.article_identity import article_key as _ak
|
|
|
import json as _json
|
|
|
- changed_keys = {_ak(a) for a in seen_changed}
|
|
|
- with self.store._conn() as conn:
|
|
|
- for ak in changed_keys:
|
|
|
- row = conn.execute(
|
|
|
- "SELECT sa.cluster_id, c.payload FROM seen_articles sa "
|
|
|
- "JOIN clusters c ON c.cluster_id = sa.cluster_id "
|
|
|
- "WHERE sa.article_key=?",
|
|
|
- (ak,),
|
|
|
- ).fetchone()
|
|
|
- if row:
|
|
|
- changed_cluster_ids.add(row[0])
|
|
|
- payload = _json.loads(row[1])
|
|
|
- payload.pop("enriched_at", None)
|
|
|
- conn.execute(
|
|
|
- "UPDATE clusters SET payload=? WHERE cluster_id=?",
|
|
|
- (_json.dumps(payload, ensure_ascii=False), row[0]),
|
|
|
- )
|
|
|
- if changed_cluster_ids:
|
|
|
- self.logger.info("content_changed: clusters=%d will re-enrich", len(changed_cluster_ids))
|
|
|
+
|
|
|
+ # Group changed articles by feed_url
|
|
|
+ from collections import defaultdict as _dd
|
|
|
+ changed_by_feed: dict[str, list] = _dd(list)
|
|
|
+ for a in seen_changed:
|
|
|
+ fu = a.get("feed_url", "")
|
|
|
+ changed_by_feed[fu].append(a)
|
|
|
+
|
|
|
+ # Only process feeds that have re-enrichment enabled
|
|
|
+ re_enrich_feeds = set()
|
|
|
+ for fu in changed_by_feed:
|
|
|
+ if self.store.is_re_enrich_enabled(fu):
|
|
|
+ re_enrich_feeds.add(fu)
|
|
|
+
|
|
|
+ # Move disabled-feed articles from seen_changed → seen_unchanged
|
|
|
+ truly_changed = []
|
|
|
+ for a in seen_changed:
|
|
|
+ fu = a.get("feed_url", "")
|
|
|
+ if fu in re_enrich_feeds:
|
|
|
+ truly_changed.append(a)
|
|
|
+ else:
|
|
|
+ # Re-enrich disabled → treat as unchanged (skip silently)
|
|
|
+ seen_unchanged.append(a)
|
|
|
+ fu_stats = [fs for fs in feed_stats if fs.feed_url == fu]
|
|
|
+ if fu_stats:
|
|
|
+ fu_stats[0].seen += 1
|
|
|
+
|
|
|
+ seen_changed = truly_changed
|
|
|
+
|
|
|
+ if seen_changed:
|
|
|
+ changed_keys = {_ak(a) for a in seen_changed}
|
|
|
+ with self.store._conn() as conn:
|
|
|
+ for ak in changed_keys:
|
|
|
+ row = conn.execute(
|
|
|
+ "SELECT sa.cluster_id, c.payload FROM seen_articles sa "
|
|
|
+ "JOIN clusters c ON c.cluster_id = sa.cluster_id "
|
|
|
+ "WHERE sa.article_key=?",
|
|
|
+ (ak,),
|
|
|
+ ).fetchone()
|
|
|
+ if row:
|
|
|
+ changed_cluster_ids.add(row[0])
|
|
|
+ payload = _json.loads(row[1])
|
|
|
+ payload.pop("enriched_at", None)
|
|
|
+ conn.execute(
|
|
|
+ "UPDATE clusters SET payload=? WHERE cluster_id=?",
|
|
|
+ (_json.dumps(payload, ensure_ascii=False), row[0]),
|
|
|
+ )
|
|
|
+ if changed_cluster_ids:
|
|
|
+ self.logger.info(
|
|
|
+ "content_changed: clusters=%d will re-enrich (feeds=%s)",
|
|
|
+ len(changed_cluster_ids),
|
|
|
+ ",".join(sorted(re_enrich_feeds)),
|
|
|
+ )
|
|
|
+ else:
|
|
|
+ self.logger.info("content_changed: all %d changed articles on re-enrich disabled feeds → skipped", len(changed_by_feed) and sum(len(v) for v in changed_by_feed.values()))
|
|
|
|
|
|
# 4. Pre-seed existing clusters for cross-cycle merging
|
|
|
existing_clusters = self._preseed_clusters()
|