Skip to content

Commit

Permalink
perf: optimise SQL query in observeRetrievalResultCodes
Browse files Browse the repository at this point in the history
Rework `observeRetrievalResultCodes` to execute a single SQL query
to update all daily codes in one go.

Before this change, we would run ~1k individual queries, consume a lot
of CPU and trigger CPU throttling alert.

As part of this change, I am also adding a console log to tell us how
many rows - tuples (day, code) - we are updating in each loop iteration.

Signed-off-by: Miroslav Bajtoš <[email protected]>
  • Loading branch information
bajtos committed Feb 11, 2025
1 parent f177799 commit 2f06697
Showing 1 changed file with 13 additions and 12 deletions.
25 changes: 13 additions & 12 deletions observer/lib/observer.js
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ export const observeRetrievalResultCodes = async (pgPoolStats, influxQueryApi) =
// Ref: https://github.com/filecoin-station/spark-stats/pull/244#discussion_r1824808007
// Note: Having a bucket retention policy is important for this query not to
// time out.
/** @type {{_time: string; _field: string; _value: number}[]} */
const rows = await influxQueryApi.collectRows(`
import "strings"
from(bucket: "spark-evaluate")
Expand All @@ -116,16 +117,16 @@ export const observeRetrievalResultCodes = async (pgPoolStats, influxQueryApi) =
|> keep(columns: ["_value", "_time", "_field"])
|> map(fn: (r) => ({ r with _field: strings.replace(v: r._field, t: "result_rate_", u: "", i: 1) }))
`)
for (const row of rows) {
await pgPoolStats.query(`
INSERT INTO daily_retrieval_result_codes
(day, code, rate)
VALUES ($1, $2, $3)
ON CONFLICT (day, code) DO UPDATE SET rate = EXCLUDED.rate
`, [
row._time,
row._field,
row._value
])
}
console.log('Inserting %s rows to daily_retrieval_result_codes ', rows.length)

await pgPoolStats.query(`
INSERT INTO daily_retrieval_result_codes
(day, code, rate)
VALUES (unnest($1::DATE[]), unnest($2::TEXT[]), unnest($3::NUMERIC[]))
ON CONFLICT (day, code) DO UPDATE SET rate = EXCLUDED.rate
`, [
rows.map(r => r._time),
rows.map(r => r._field),
rows.map(r => r._value)
])
}

0 comments on commit 2f06697

Please sign in to comment.