From 2f0669769e24aa323452eba416ac4b9eccd36832 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20Bajto=C5=A1?= Date: Tue, 11 Feb 2025 16:31:52 +0100 Subject: [PATCH] perf: optimise SQL query in observeRetrievalResultCodes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Rework `observeRetrievalResultCodes` to execute a single SQL query to update all daily codes in one go. Before this change, we would run ~1k individual queries, consume a lot of CPU and trigger CPU throttling alert. As part of this change, I am also adding a console log to tell us how many rows - tuples (day, code) - we are updating in each loop iteration. Signed-off-by: Miroslav Bajtoš --- observer/lib/observer.js | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/observer/lib/observer.js b/observer/lib/observer.js index 64142ab..2a1bbbe 100644 --- a/observer/lib/observer.js +++ b/observer/lib/observer.js @@ -106,6 +106,7 @@ export const observeRetrievalResultCodes = async (pgPoolStats, influxQueryApi) = // Ref: https://github.com/filecoin-station/spark-stats/pull/244#discussion_r1824808007 // Note: Having a bucket retention policy is important for this query not to // time out. + /** @type {{_time: string; _field: string; _value: number}[]} */ const rows = await influxQueryApi.collectRows(` import "strings" from(bucket: "spark-evaluate") @@ -116,16 +117,16 @@ export const observeRetrievalResultCodes = async (pgPoolStats, influxQueryApi) = |> keep(columns: ["_value", "_time", "_field"]) |> map(fn: (r) => ({ r with _field: strings.replace(v: r._field, t: "result_rate_", u: "", i: 1) })) `) - for (const row of rows) { - await pgPoolStats.query(` - INSERT INTO daily_retrieval_result_codes - (day, code, rate) - VALUES ($1, $2, $3) - ON CONFLICT (day, code) DO UPDATE SET rate = EXCLUDED.rate - `, [ - row._time, - row._field, - row._value - ]) - } + console.log('Inserting %s rows to daily_retrieval_result_codes ', rows.length) + + await pgPoolStats.query(` + INSERT INTO daily_retrieval_result_codes + (day, code, rate) + VALUES (unnest($1::DATE[]), unnest($2::TEXT[]), unnest($3::NUMERIC[])) + ON CONFLICT (day, code) DO UPDATE SET rate = EXCLUDED.rate + `, [ + rows.map(r => r._time), + rows.map(r => r._field), + rows.map(r => r._value) + ]) }