|
@@ -46,6 +46,7 @@ async def refresh_clusters(topic: str | None = None, limit: int = 80) -> None:
|
|
|
changed_articles: list[dict[str, Any]] = []
|
|
changed_articles: list[dict[str, Any]] = []
|
|
|
changed_feed_urls: list[str] = []
|
|
changed_feed_urls: list[str] = []
|
|
|
for feed_url, feed_articles in per_feed.items():
|
|
for feed_url, feed_articles in per_feed.items():
|
|
|
|
|
+ logger.info("refresh feed batch start feed_url=%s count=%s", feed_url, len(feed_articles))
|
|
|
material = "\n".join(
|
|
material = "\n".join(
|
|
|
f"{a.get('title','')}|{a.get('url','')}|{a.get('timestamp','')}"
|
|
f"{a.get('title','')}|{a.get('url','')}|{a.get('timestamp','')}"
|
|
|
for a in feed_articles
|
|
for a in feed_articles
|
|
@@ -59,6 +60,7 @@ async def refresh_clusters(topic: str | None = None, limit: int = 80) -> None:
|
|
|
logger.info("refresh changed feed_url=%s count=%s topic=%s", feed_url, len(feed_articles), topic)
|
|
logger.info("refresh changed feed_url=%s count=%s topic=%s", feed_url, len(feed_articles), topic)
|
|
|
changed_feed_urls.append(feed_url)
|
|
changed_feed_urls.append(feed_url)
|
|
|
changed_articles.extend(feed_articles)
|
|
changed_articles.extend(feed_articles)
|
|
|
|
|
+ logger.info("refresh feed batch complete feed_url=%s changed_total=%s", feed_url, len(changed_articles))
|
|
|
|
|
|
|
|
if not changed_articles:
|
|
if not changed_articles:
|
|
|
logger.info("refresh unchanged all feeds topic=%s", topic)
|
|
logger.info("refresh unchanged all feeds topic=%s", topic)
|
|
@@ -72,12 +74,14 @@ async def refresh_clusters(topic: str | None = None, limit: int = 80) -> None:
|
|
|
return
|
|
return
|
|
|
|
|
|
|
|
articles = changed_articles
|
|
articles = changed_articles
|
|
|
|
|
+ logger.info("refresh clustering start articles=%s topic=%s", len(articles), topic)
|
|
|
clustered_by_topic = dedup_and_cluster_articles(articles)
|
|
clustered_by_topic = dedup_and_cluster_articles(articles)
|
|
|
logger.info("refresh clustered topics=%s", list(clustered_by_topic.keys()))
|
|
logger.info("refresh clustered topics=%s", list(clustered_by_topic.keys()))
|
|
|
|
|
|
|
|
for t, clusters in clustered_by_topic.items():
|
|
for t, clusters in clustered_by_topic.items():
|
|
|
if topic and t != topic:
|
|
if topic and t != topic:
|
|
|
continue
|
|
continue
|
|
|
|
|
+ logger.info("refresh topic phase start topic=%s clusters=%s", t, len(clusters))
|
|
|
enriched = []
|
|
enriched = []
|
|
|
|
|
|
|
|
# Determine how many clusters to LLM-enrich.
|
|
# Determine how many clusters to LLM-enrich.
|
|
@@ -89,11 +93,17 @@ async def refresh_clusters(topic: str | None = None, limit: int = 80) -> None:
|
|
|
(not ENRICH_OTHER_TOPICS_ONLY) or (t == "other")
|
|
(not ENRICH_OTHER_TOPICS_ONLY) or (t == "other")
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
|
|
+ # Persist the raw clusters first so a slow enrichment pass does not
|
|
|
|
|
+ # leave the first bootstrap run with nothing stored.
|
|
|
|
|
+ store.upsert_clusters(clusters, topic=t)
|
|
|
|
|
+ logger.info("refresh stored raw topic=%s clusters=%s", t, len(clusters))
|
|
|
|
|
+
|
|
|
for idx, c in enumerate(clusters[:enrich_limit]):
|
|
for idx, c in enumerate(clusters[:enrich_limit]):
|
|
|
c2 = enrich_cluster(c)
|
|
c2 = enrich_cluster(c)
|
|
|
# Seed the heuristic topic on the payload so classify_cluster_llm
|
|
# Seed the heuristic topic on the payload so classify_cluster_llm
|
|
|
# has a sane fallback if the LLM omits or hallucinates one.
|
|
# has a sane fallback if the LLM omits or hallucinates one.
|
|
|
c2.setdefault("topic", t)
|
|
c2.setdefault("topic", t)
|
|
|
|
|
+ logger.info("refresh enrich cluster=%s topic=%s idx=%s/%s", c2.get("cluster_id"), t, idx + 1, enrich_limit)
|
|
|
|
|
|
|
|
if _llm_enabled_for_topic:
|
|
if _llm_enabled_for_topic:
|
|
|
# Cache: if we already have entities/sentiment for this cluster, skip LLM call.
|
|
# Cache: if we already have entities/sentiment for this cluster, skip LLM call.
|