|
|
@@ -195,12 +195,16 @@ def main() -> None:
|
|
|
|
|
|
with store._conn() as conn: # noqa: SLF001
|
|
|
for topic, bucket in groups:
|
|
|
- rep = bucket[0]
|
|
|
- rep_id = rep.get("cluster_id")
|
|
|
merged = _merge_payloads(bucket)
|
|
|
- store.upsert_clusters([merged], topic=topic)
|
|
|
+ cluster_id = merged.get("cluster_id")
|
|
|
+ payload = json.dumps(merged, ensure_ascii=False)
|
|
|
+ conn.execute(
|
|
|
+ "INSERT INTO clusters(cluster_id, topic, payload, updated_at) VALUES(?,?,?,?) "
|
|
|
+ "ON CONFLICT(cluster_id) DO UPDATE SET topic=excluded.topic, payload=excluded.payload, updated_at=excluded.updated_at",
|
|
|
+ (cluster_id, topic, payload, merged.get("last_updated") or merged.get("updated_at") or ""),
|
|
|
+ )
|
|
|
# Delete absorbed duplicates; keep the representative row.
|
|
|
- dup_ids = [c.get("cluster_id") for c in bucket[1:] if c.get("cluster_id")]
|
|
|
+ dup_ids = [c.get("cluster_id") for c in bucket if c.get("cluster_id") != cluster_id]
|
|
|
for cid in dup_ids:
|
|
|
conn.execute("DELETE FROM clusters WHERE cluster_id=?", (cid,))
|
|
|
conn.commit()
|