From 9564c8c035489e5878c15a7a7c45f0a1ece06e95 Mon Sep 17 00:00:00 2001 From: Julian Gruber Date: Thu, 14 Nov 2024 10:10:23 +0100 Subject: [PATCH] cli: split `fetch-recent-miner-measurements` and `evaluate-measurements` (#401) * cli: split `fetch-recent-miner-measurements` and `evaluate-measurements` * improve output file name logic * fix evaluation file name * add write machine-readable evaluation --- bin/evaluate-measurements.js | 183 +++++++++++++++++++++++++ bin/fetch-recent-miner-measurements.js | 113 ++++----------- 2 files changed, 207 insertions(+), 89 deletions(-) create mode 100644 bin/evaluate-measurements.js diff --git a/bin/evaluate-measurements.js b/bin/evaluate-measurements.js new file mode 100644 index 00000000..deac23e9 --- /dev/null +++ b/bin/evaluate-measurements.js @@ -0,0 +1,183 @@ +import fs from 'node:fs' +import { readFile } from 'node:fs/promises' +import { RoundData } from '../lib/round.js' +import { evaluate } from '../lib/evaluate.js' +import * as SparkImpactEvaluator from '@filecoin-station/spark-impact-evaluator' +import { fetchRoundDetails } from '../lib/spark-api.js' +import createDebug from 'debug' +import { Point } from '@influxdata/influxdb-client' +import { basename } from 'node:path' + +const { KEEP_REJECTED } = process.env + +const debug = createDebug('spark:bin') + +const [nodePath, selfPath, measurementsPath] = process.argv + +const USAGE = ` +Usage: + ${nodePath} ${selfPath} measurementsPath +` + +if (!measurementsPath) { + console.error('Missing required argument: measurementsPath') + console.error(USAGE) + process.exit(1) +} + +const keepRejected = isFlagEnabled(KEEP_REJECTED) + +const rounds = new Map() +const measurementsFile = await readFile(measurementsPath, 'utf8') +for (const line of measurementsFile.split('\n').filter(Boolean)) { + const { roundIndex: _roundIndex, measurement } = JSON.parse(line) + const roundIndex = BigInt(_roundIndex) + if (!rounds.has(roundIndex)) rounds.set(roundIndex, []) + rounds.get(roundIndex).push(measurement) +} + +const EVALUATION_TXT_FILE = `${basename(measurementsPath, '.ndjson')}.evaluation.txt` +const EVALUATION_NDJSON_FILE = `${basename(measurementsPath, '.ndjson')}.evaluation.ndjson` + +const evaluationTxtWriter = fs.createWriteStream(EVALUATION_TXT_FILE) +const evaluationNdjsonWriter = fs.createWriteStream(EVALUATION_NDJSON_FILE) + +evaluationTxtWriter.write(formatHeader({ includeFraudAssesment: keepRejected }) + '\n') + +const resultCounts = { + total: 0 +} + +for (const [roundIndex, measurements] of rounds) { + await processRound( + roundIndex, + measurements, + resultCounts + ) +} + +console.log('Found %s accepted measurements.', resultCounts.total) +for (const [r, c] of Object.entries(resultCounts)) { + if (r === 'total') continue + console.log(' %s %s (%s%)', + r.padEnd(40), + String(c).padEnd(10), + Math.floor(c / resultCounts.total * 10000) / 100 + ) +} + +console.error('Wrote human-readable evaluation to %s', EVALUATION_TXT_FILE) +console.error('Wrote machine-readable evaluation to %s', EVALUATION_NDJSON_FILE) + +/** + * @param {bigint} roundIndex + * @param {object[]} measurements + * @param {Record} resultCounts + */ +async function processRound (roundIndex, measurements, resultCounts) { + console.error(' → evaluating round %s', roundIndex) + + const round = new RoundData(roundIndex) + round.measurements = measurements + + const ieContract = { + async getAddress () { + return SparkImpactEvaluator.ADDRESS + } + } + + await evaluate({ + roundIndex: round.index, + round, + fetchRoundDetails, + recordTelemetry, + logger: { log: debug, error: debug }, + ieContract, + setScores: async () => {}, + prepareProviderRetrievalResultStats: async () => {} + }) + + for (const m of round.measurements) { + if (m.fraudAssessment !== 'OK') continue + resultCounts.total++ + resultCounts[m.retrievalResult] = (resultCounts[m.retrievalResult] ?? 0) + 1 + } + + if (!keepRejected) { + round.measurements = round.measurements + // Keep accepted measurements only + .filter(m => m.fraudAssessment === 'OK') + // Remove the fraudAssessment field as all accepted measurements have the same 'OK' value + .map(m => ({ ...m, fraudAssessment: undefined })) + } + + evaluationTxtWriter.write( + round.measurements + .map(m => formatMeasurement(m, { includeFraudAssesment: keepRejected }) + '\n') + .join('') + ) + evaluationNdjsonWriter.write( + round.measurements + .map(m => JSON.stringify(m) + '\n') + .join('') + ) + console.error(' → added %s accepted measurements from this round', round.measurements.length) +} + +/** + * @param {string} measurementName + * @param {(point: Point) => void} fn + */ +function recordTelemetry (measurementName, fn) { + const point = new Point(measurementName) + fn(point) + debug('TELEMETRY %s %o', measurementName, point.fields) +} + +/** + * @param {string | undefined} envVarValue + */ +function isFlagEnabled (envVarValue) { + return !!envVarValue && envVarValue.toLowerCase() !== 'false' && envVarValue !== '0' +} + +/** + * @param {import('../lib/preprocess.js').Measurement} m + * @param {object} options + * @param {boolean} [options.includeFraudAssesment] + */ +function formatMeasurement (m, { includeFraudAssesment } = {}) { + const fields = [ + new Date(m.finished_at).toISOString(), + (m.cid ?? '').padEnd(70), + (m.protocol ?? '').padEnd(10) + ] + + if (includeFraudAssesment) { + fields.push((m.fraudAssessment === 'OK' ? '🫡 ' : '🙅 ')) + } + + fields.push((m.retrievalResult ?? '')) + + return fields.join(' ') +} + +/** + * @param {object} options + * @param {boolean} [options.includeFraudAssesment] + */ +function formatHeader ({ includeFraudAssesment } = {}) { + const fields = [ + 'Timestamp'.padEnd(new Date().toISOString().length), + 'CID'.padEnd(70), + 'Protocol'.padEnd(10) + ] + + if (includeFraudAssesment) { + fields.push('🕵️ ') + } + + fields.push('RetrievalResult') + + return fields.join(' ') +} diff --git a/bin/fetch-recent-miner-measurements.js b/bin/fetch-recent-miner-measurements.js index 46d35cef..66a8c7b5 100644 --- a/bin/fetch-recent-miner-measurements.js +++ b/bin/fetch-recent-miner-measurements.js @@ -13,13 +13,8 @@ import { createContracts } from '../lib/contracts.js' import { fetchMeasurements, preprocess } from '../lib/preprocess.js' import { RoundData } from '../lib/round.js' import * as SparkImpactEvaluator from '@filecoin-station/spark-impact-evaluator' -import { evaluate } from '../lib/evaluate.js' -import { fetchRoundDetails } from '../lib/spark-api.js' -const { - STORE_ALL_MINERS, - KEEP_REJECTED -} = process.env +const { STORE_ALL_MINERS } = process.env Sentry.init({ dsn: 'https://d0651617f9690c7e9421ab9c949d67a4@o1408530.ingest.sentry.io/4505906069766144', @@ -83,17 +78,12 @@ console.error(' → found %s complete rounds', rounds.length) const ALL_MEASUREMENTS_FILE = 'measurements-all.ndjson' const MINER_DATA_FILE = `measurements-${minerId}.ndjson` -const MINER_SUMMARY_FILE = `measurements-${minerId}.txt` - -const keepRejected = isFlagEnabled(KEEP_REJECTED) const allMeasurementsWriter = isFlagEnabled(STORE_ALL_MINERS) ? fs.createWriteStream(ALL_MEASUREMENTS_FILE) : undefined const minerDataWriter = fs.createWriteStream(MINER_DATA_FILE) -const minerSummaryWriter = fs.createWriteStream(MINER_SUMMARY_FILE) -minerSummaryWriter.write(formatHeader({ includeFraudAssesment: keepRejected }) + '\n') const abortController = new AbortController() const signal = abortController.signal @@ -118,7 +108,7 @@ if (signal.aborted) { console.error('Interrupted, exiting. Output files contain partial data.') } -console.log('Found %s accepted measurements.', resultCounts.total) +console.log('Found %s valid measurements.', resultCounts.total) for (const [r, c] of Object.entries(resultCounts)) { if (r === 'total') continue console.log(' %s %s (%s%)', @@ -132,7 +122,6 @@ if (allMeasurementsWriter) { console.error('Wrote (ALL) raw measurements to %s', ALL_MEASUREMENTS_FILE) } console.error('Wrote (minerId=%s) raw measurements to %s', minerId, MINER_DATA_FILE) -console.error('Wrote human-readable summary for %s to %s', minerId, MINER_SUMMARY_FILE) /** * @param {string} contractAddress @@ -200,52 +189,39 @@ async function processRound (roundIndex, measurementCids, resultCounts) { ) signal.throwIfAborted() - const ieContract = { - async getAddress () { - return contractAddress - } - } - - console.error(' → evaluating the round') - await evaluate({ - roundIndex: round.index, - round, - fetchRoundDetails, - recordTelemetry, - logger: { log: debug, error: debug }, - ieContract, - setScores: async () => {}, - prepareProviderRetrievalResultStats: async () => {} - }) - for (const m of round.measurements) { - if (m.minerId !== minerId || m.fraudAssessment !== 'OK') continue + if (m.minerId !== minerId) continue resultCounts.total++ resultCounts[m.retrievalResult] = (resultCounts[m.retrievalResult] ?? 0) + 1 } - if (!keepRejected) { - round.measurements = round.measurements - // Keep accepted measurements only - .filter(m => m.fraudAssessment === 'OK') - // Remove the fraudAssessment field as all accepted measurements have the same 'OK' value - .map(m => ({ ...m, fraudAssessment: undefined })) - } - - if (allMeasurementsWriter) { - allMeasurementsWriter.write(round.measurements.map(m => JSON.stringify(m) + '\n').join('')) + if (allMeasurementsWriter && round.measurements.length > 0) { + allMeasurementsWriter.write( + round.measurements + .map(measurement => ndJsonLine({ roundIndex: round.index.toString(), measurement })) + .join('') + ) } const minerMeasurements = round.measurements.filter(m => m.minerId === minerId) - minerDataWriter.write(minerMeasurements.map(m => JSON.stringify(m) + '\n').join('')) - minerSummaryWriter.write( - minerMeasurements - .map(m => formatMeasurement(m, { includeFraudAssesment: keepRejected }) + '\n') - .join('') - ) + if (minerMeasurements.length > 0) { + minerDataWriter.write( + minerMeasurements + .map(measurement => ndJsonLine({ roundIndex: round.index.toString(), measurement })) + .join('') + ) + } console.error(' → added %s new measurements from this round', minerMeasurements.length) } +/** + * @param {*} obj + * @returns string + */ +function ndJsonLine (obj) { + return JSON.stringify(obj) + '\n' +} + /** * @param {RoundData} round * @param {string} cid @@ -288,47 +264,6 @@ function recordTelemetry (measurementName, fn) { debug('TELEMETRY %s %o', measurementName, point.fields) } -/** - * @param {import('../lib/preprocess.js').Measurement} m - * @param {object} options - * @param {boolean} [options.includeFraudAssesment] - */ -function formatMeasurement (m, { includeFraudAssesment } = {}) { - const fields = [ - new Date(m.finished_at).toISOString(), - (m.cid ?? '').padEnd(70), - (m.protocol ?? '').padEnd(10) - ] - - if (includeFraudAssesment) { - fields.push((m.fraudAssessment === 'OK' ? '🫡 ' : '🙅 ')) - } - - fields.push((m.retrievalResult ?? '')) - - return fields.join(' ') -} - -/** - * @param {object} options - * @param {boolean} [options.includeFraudAssesment] - */ -function formatHeader ({ includeFraudAssesment } = {}) { - const fields = [ - 'Timestamp'.padEnd(new Date().toISOString().length), - 'CID'.padEnd(70), - 'Protocol'.padEnd(10) - ] - - if (includeFraudAssesment) { - fields.push('🕵️ ') - } - - fields.push('RetrievalResult') - - return fields.join(' ') -} - /** * @param {string | undefined} envVarValue */