From c6f47300bc722875ca9168acfe61688d2116ee88 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20Bajto=C5=A1?= Date: Thu, 7 Dec 2023 15:22:00 +0100 Subject: [PATCH 1/2] refactor: types for raw and parsed measurements MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Miroslav Bajtoš --- lib/evaluate.js | 8 ++++---- lib/preprocess.js | 12 ++++++++++-- lib/retrieval-stats.js | 6 +++--- lib/typings.d.ts | 15 +++++++-------- 4 files changed, 24 insertions(+), 17 deletions(-) diff --git a/lib/evaluate.js b/lib/evaluate.js index 12b00ac1..14db81e4 100644 --- a/lib/evaluate.js +++ b/lib/evaluate.js @@ -154,7 +154,7 @@ export const evaluate = async ({ /** * @param {number} roundIndex - * @param {import('./typings').Measurement[]} measurements + * @param {import('./preprocess').Measurement[]} measurements * @param {import('./typings').RoundDetails} sparkRoundDetails * @returns {Promise} */ @@ -179,7 +179,7 @@ export const runFraudDetection = async (roundIndex, measurements, sparkRoundDeta // // 2. Reward only one participant in each inet group // - /** @type {Map} */ + /** @type {Map} */ const taskGroups = new Map() for (const m of measurements) { if (m.fraudAssessment) continue @@ -194,7 +194,7 @@ export const runFraudDetection = async (roundIndex, measurements, sparkRoundDeta group.push(m) } - const getHash = async (/** @type {import('./typings').Measurement} */ m) => { + const getHash = async (/** @type {import('./preprocess').Measurement} */ m) => { const bytes = await globalThis.crypto.subtle.digest('SHA-256', Buffer.from(new Date(m.finished_at).toISOString())) return Buffer.from(bytes).toString('hex') } @@ -257,7 +257,7 @@ export const runFraudDetection = async (roundIndex, measurements, sparkRoundDeta * rejected by our inet_group algorithm. Multiple measurements submitted for the same task * are considered as one measurement. * - * @param {Map} taskGroups + * @param {Map} taskGroups * @returns {import('./typings').GroupWinningStats} */ const calculateInetGroupSuccessRates = (taskGroups) => { diff --git a/lib/preprocess.js b/lib/preprocess.js index 90d62125..5c14dcf7 100644 --- a/lib/preprocess.js +++ b/lib/preprocess.js @@ -10,6 +10,11 @@ import { RoundData } from './round.js' const debug = createDebug('spark:preprocess') export class Measurement { + /** + * + * @param {import('./typings.js').RawMeasurement} m + * @param {(string) => string} pointerize + */ constructor (m, pointerize = (v) => v) { this.participantAddress = pointerize(parseParticipantAddress(m.participant_address)) this.retrievalResult = pointerize(getRetrievalResult(m)) @@ -47,7 +52,7 @@ export const preprocess = async ({ } const start = new Date() - /** @type import('./typings').Measurement[] */ + /** @type import('./typings').RawMeasurement[] */ const measurements = await fetchMeasurements(cid) const fetchDuration = new Date() - start const validMeasurements = measurements @@ -181,6 +186,9 @@ export const parseParticipantAddress = filWalletAddress => { } } +/** + * @param {Measurement} measurement + */ const assertValidMeasurement = measurement => { assert( typeof measurement === 'object' && measurement !== null, @@ -192,7 +200,7 @@ const assertValidMeasurement = measurement => { } /** - * @param {import('./typings').Measurement} measurement + * @param {import('./typings').RawMeasurement} measurement * @return {import('./typings').RetrievalResult} */ export const getRetrievalResult = measurement => { diff --git a/lib/retrieval-stats.js b/lib/retrieval-stats.js index f710508d..83e244d9 100644 --- a/lib/retrieval-stats.js +++ b/lib/retrieval-stats.js @@ -3,7 +3,7 @@ import createDebug from 'debug' const debug = createDebug('spark:retrieval-stats') /** - * @param {import('./typings').Measurement[]} measurements + * @param {import('./preprocess').Measurement[]} measurements * @param {import('./typings').Point} telemetryPoint */ export const buildRetrievalStats = (measurements, telemetryPoint) => { @@ -106,11 +106,11 @@ const addHistogramToPoint = (point, fieldNamePrefix, values) => { } /** - * @param {import('./typings').Measurement[]} measurements + * @param {import('./preprocess').Measurement[]} measurements * @returns {number} */ const countUniqueTasks = (measurements) => { - const getTaskId = (/** @type {import('./typings').Measurement} */m) => + const getTaskId = (/** @type {import('./preprocess').Measurement} */m) => `${m.cid}::${m.protocol}::${m.provider_address}` const uniqueTasks = new Set() diff --git a/lib/typings.d.ts b/lib/typings.d.ts index 1ff866a6..bbefe44f 100644 --- a/lib/typings.d.ts +++ b/lib/typings.d.ts @@ -44,20 +44,19 @@ export type RetrievalResult = | 'ERROR_500' | 'UNKNOWN_ERROR' -export interface Measurement { - participantAddress: string; - fraudAssessment?: FraudAssesment; - retrievalResult?: RetrievalResult; +// Data coming from spark-api and spark-publish +export interface RawMeasurement { + participant_address: string; cid: string; provider_address: string; protocol: string; inet_group: string; - start_at: number; - first_byte_at: number; - end_at: number; - finished_at: number; + start_at: string; + first_byte_at: string; + end_at: string; + finished_at: string; status_code: number | undefined | null; timeout: boolean; From d6a749cbea64dc11d24135c2641232b072b0febc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20Bajto=C5=A1?= Date: Thu, 7 Dec 2023 15:39:24 +0100 Subject: [PATCH 2/2] feat: more validation in pre-process step MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Miroslav Bajtoš --- lib/preprocess.js | 40 +++++++++++++++++++++++++++++++++------- lib/retrieval-stats.js | 16 +++------------- test/preprocess.js | 14 +++++++------- 3 files changed, 43 insertions(+), 27 deletions(-) diff --git a/lib/preprocess.js b/lib/preprocess.js index 5c14dcf7..6ed36b93 100644 --- a/lib/preprocess.js +++ b/lib/preprocess.js @@ -22,20 +22,46 @@ export class Measurement { this.spark_version = pointerize(m.spark_version) this.fraudAssessment = null this.inet_group = pointerize(m.inet_group) - this.finished_at = parseDateTime(m.finished_at) + this.finished_at = parseDateTime(m.finished_at, 'finished_at') this.provider_address = pointerize(m.provider_address) this.protocol = pointerize(m.protocol) - this.byte_length = m.byte_length - this.start_at = parseDateTime(m.start_at) - this.first_byte_at = parseDateTime(m.first_byte_at) - this.end_at = parseDateTime(m.end_at) + + if ( + m.byte_length !== undefined && + !(Number.isInteger(m.byte_length) && m.byte_length >= 0) + ) { + throw new Error(`Invalid byte_length value, expected non-negative integer: ${m.byte_length}`) + } + this.byteLength = m.byte_length + + const startAt = parseDateTime(m.start_at, 'start_at') + const firstByteAt = parseDateTime(m.first_byte_at, 'first_byte_at') + const endAt = parseDateTime(m.end_at, 'end_at') + + if (firstByteAt !== undefined && startAt === undefined) { + throw new Error('Invalid measurement: first_byte_at is set, but start_at is not.') + } + this.ttfb = firstByteAt - startAt + if (this.ttfb < 0) { + throw new Error('Invalid measurement: first_byte_at is before start_at.') + } + + if (endAt !== undefined && startAt === undefined) { + throw new Error('Invalid measurement: end_at is set, but start_at is not.') + } + this.duration = endAt - startAt + if (this.duration < 0) { + throw new Error('Invalid measurement: end_at is before start_at.') + } } } -const parseDateTime = (str) => { +const parseDateTime = (str, fieldName) => { if (!str) return undefined const value = new Date(str) - if (Number.isNaN(value.getTime())) return undefined + if (Number.isNaN(value.getTime())) { + throw new Error(`Invalid ${fieldName} value, expected a Date string: ${str}`) + } return value.getTime() } diff --git a/lib/retrieval-stats.js b/lib/retrieval-stats.js index 83e244d9..d93c756c 100644 --- a/lib/retrieval-stats.js +++ b/lib/retrieval-stats.js @@ -49,24 +49,14 @@ export const buildRetrievalStats = (measurements, telemetryPoint) => { participants.add(m.participantAddress) inetGroups.add(m.inet_group) - // don't trust the checker to submit a positive integers - // TODO: reject measurements with invalid values during the preprocess phase? - const byteLength = typeof m.byte_length === 'number' && m.byte_length >= 0 - ? m.byte_length - : undefined - const startAt = m.start_at - const firstByteAt = m.first_byte_at - const endAt = m.end_at - const ttfb = startAt && firstByteAt && (firstByteAt - startAt) - const duration = startAt && endAt && (endAt - startAt) - + const { byteLength, ttfb, duration } = m debug('size=%s ttfb=%s duration=%s valid? %s', byteLength, ttfb, duration, m.fraudAssessment === 'OK') if (byteLength !== undefined) { downloadBandwidth += byteLength sizeValues.push(byteLength) } - if (ttfb !== undefined && ttfb > 0 && m.status_code === 200) ttfbValues.push(ttfb) - if (duration !== undefined && duration > 0) durationValues.push(duration) + if (ttfb !== undefined && m.status_code === 200) ttfbValues.push(ttfb) + if (duration !== undefined) durationValues.push(duration) } const successRate = resultBreakdown.OK / totalCount diff --git a/test/preprocess.js b/test/preprocess.js index cab7646f..1255ec31 100644 --- a/test/preprocess.js +++ b/test/preprocess.js @@ -24,10 +24,10 @@ describe('preprocess', () => { participant_address: 'f410ftgmzttyqi3ti4nxbvixa4byql3o5d4eo3jtc43i', spark_version: '1.2.3', inet_group: 'ig1', - finished_at: '2023-11-01T09:00:00.000Z', - first_byte_at: '2023-11-01T09:00:01.000Z', - start_at: '2023-11-01T09:00:02.000Z', - end_at: '2023-11-01T09:00:03.000Z' + start_at: '2023-11-01T09:00:01.000Z', + first_byte_at: '2023-11-01T09:00:02.000Z', + end_at: '2023-11-01T09:00:03.000Z', + finished_at: '2023-11-01T09:00:04.000Z' }] const getCalls = [] const fetchMeasurements = async (cid) => { @@ -43,10 +43,10 @@ describe('preprocess', () => { participant_address: '0x999999cf1046e68e36E1aA2E0E07105eDDD1f08E', spark_version: '1.2.3', inet_group: 'ig1', - finished_at: '2023-11-01T09:00:00.000Z', - first_byte_at: '2023-11-01T09:00:01.000Z', - start_at: '2023-11-01T09:00:02.000Z', + start_at: '2023-11-01T09:00:01.000Z', + first_byte_at: '2023-11-01T09:00:02.000Z', end_at: '2023-11-01T09:00:03.000Z', + finished_at: '2023-11-01T09:00:04.000Z', retrievalResult: 'UNKNOWN_ERROR' }) ])