diff --git a/observer/lib/observer.js b/observer/lib/observer.js index 64142ab..2a1bbbe 100644 --- a/observer/lib/observer.js +++ b/observer/lib/observer.js @@ -106,6 +106,7 @@ export const observeRetrievalResultCodes = async (pgPoolStats, influxQueryApi) = // Ref: https://github.com/filecoin-station/spark-stats/pull/244#discussion_r1824808007 // Note: Having a bucket retention policy is important for this query not to // time out. + /** @type {{_time: string; _field: string; _value: number}[]} */ const rows = await influxQueryApi.collectRows(` import "strings" from(bucket: "spark-evaluate") @@ -116,16 +117,16 @@ export const observeRetrievalResultCodes = async (pgPoolStats, influxQueryApi) = |> keep(columns: ["_value", "_time", "_field"]) |> map(fn: (r) => ({ r with _field: strings.replace(v: r._field, t: "result_rate_", u: "", i: 1) })) `) - for (const row of rows) { - await pgPoolStats.query(` - INSERT INTO daily_retrieval_result_codes - (day, code, rate) - VALUES ($1, $2, $3) - ON CONFLICT (day, code) DO UPDATE SET rate = EXCLUDED.rate - `, [ - row._time, - row._field, - row._value - ]) - } + console.log('Inserting %s rows to daily_retrieval_result_codes ', rows.length) + + await pgPoolStats.query(` + INSERT INTO daily_retrieval_result_codes + (day, code, rate) + VALUES (unnest($1::DATE[]), unnest($2::TEXT[]), unnest($3::NUMERIC[])) + ON CONFLICT (day, code) DO UPDATE SET rate = EXCLUDED.rate + `, [ + rows.map(r => r._time), + rows.map(r => r._field), + rows.map(r => r._value) + ]) }