Skip to content

Commit

Permalink
cli: split fetch-recent-miner-measurements and `evaluate-measuremen…
Browse files Browse the repository at this point in the history
…ts` (#401)

* cli: split `fetch-recent-miner-measurements` and `evaluate-measurements`

* improve output file name logic

* fix evaluation file name

* add write machine-readable evaluation
  • Loading branch information
juliangruber authored Nov 14, 2024
1 parent 4019947 commit 9564c8c
Show file tree
Hide file tree
Showing 2 changed files with 207 additions and 89 deletions.
183 changes: 183 additions & 0 deletions bin/evaluate-measurements.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
import fs from 'node:fs'
import { readFile } from 'node:fs/promises'
import { RoundData } from '../lib/round.js'
import { evaluate } from '../lib/evaluate.js'
import * as SparkImpactEvaluator from '@filecoin-station/spark-impact-evaluator'
import { fetchRoundDetails } from '../lib/spark-api.js'
import createDebug from 'debug'
import { Point } from '@influxdata/influxdb-client'
import { basename } from 'node:path'

const { KEEP_REJECTED } = process.env

const debug = createDebug('spark:bin')

const [nodePath, selfPath, measurementsPath] = process.argv

const USAGE = `
Usage:
${nodePath} ${selfPath} measurementsPath
`

if (!measurementsPath) {
console.error('Missing required argument: measurementsPath')
console.error(USAGE)
process.exit(1)
}

const keepRejected = isFlagEnabled(KEEP_REJECTED)

const rounds = new Map()
const measurementsFile = await readFile(measurementsPath, 'utf8')
for (const line of measurementsFile.split('\n').filter(Boolean)) {
const { roundIndex: _roundIndex, measurement } = JSON.parse(line)
const roundIndex = BigInt(_roundIndex)
if (!rounds.has(roundIndex)) rounds.set(roundIndex, [])
rounds.get(roundIndex).push(measurement)
}

const EVALUATION_TXT_FILE = `${basename(measurementsPath, '.ndjson')}.evaluation.txt`
const EVALUATION_NDJSON_FILE = `${basename(measurementsPath, '.ndjson')}.evaluation.ndjson`

const evaluationTxtWriter = fs.createWriteStream(EVALUATION_TXT_FILE)
const evaluationNdjsonWriter = fs.createWriteStream(EVALUATION_NDJSON_FILE)

evaluationTxtWriter.write(formatHeader({ includeFraudAssesment: keepRejected }) + '\n')

const resultCounts = {
total: 0
}

for (const [roundIndex, measurements] of rounds) {
await processRound(
roundIndex,
measurements,
resultCounts
)
}

console.log('Found %s accepted measurements.', resultCounts.total)
for (const [r, c] of Object.entries(resultCounts)) {
if (r === 'total') continue
console.log(' %s %s (%s%)',
r.padEnd(40),
String(c).padEnd(10),
Math.floor(c / resultCounts.total * 10000) / 100
)
}

console.error('Wrote human-readable evaluation to %s', EVALUATION_TXT_FILE)
console.error('Wrote machine-readable evaluation to %s', EVALUATION_NDJSON_FILE)

/**
* @param {bigint} roundIndex
* @param {object[]} measurements
* @param {Record<string, number>} resultCounts
*/
async function processRound (roundIndex, measurements, resultCounts) {
console.error(' → evaluating round %s', roundIndex)

const round = new RoundData(roundIndex)
round.measurements = measurements

const ieContract = {
async getAddress () {
return SparkImpactEvaluator.ADDRESS
}
}

await evaluate({
roundIndex: round.index,
round,
fetchRoundDetails,
recordTelemetry,
logger: { log: debug, error: debug },
ieContract,
setScores: async () => {},
prepareProviderRetrievalResultStats: async () => {}
})

for (const m of round.measurements) {
if (m.fraudAssessment !== 'OK') continue
resultCounts.total++
resultCounts[m.retrievalResult] = (resultCounts[m.retrievalResult] ?? 0) + 1
}

if (!keepRejected) {
round.measurements = round.measurements
// Keep accepted measurements only
.filter(m => m.fraudAssessment === 'OK')
// Remove the fraudAssessment field as all accepted measurements have the same 'OK' value
.map(m => ({ ...m, fraudAssessment: undefined }))
}

evaluationTxtWriter.write(
round.measurements
.map(m => formatMeasurement(m, { includeFraudAssesment: keepRejected }) + '\n')
.join('')
)
evaluationNdjsonWriter.write(
round.measurements
.map(m => JSON.stringify(m) + '\n')
.join('')
)
console.error(' → added %s accepted measurements from this round', round.measurements.length)
}

/**
* @param {string} measurementName
* @param {(point: Point) => void} fn
*/
function recordTelemetry (measurementName, fn) {
const point = new Point(measurementName)
fn(point)
debug('TELEMETRY %s %o', measurementName, point.fields)
}

/**
* @param {string | undefined} envVarValue
*/
function isFlagEnabled (envVarValue) {
return !!envVarValue && envVarValue.toLowerCase() !== 'false' && envVarValue !== '0'
}

/**
* @param {import('../lib/preprocess.js').Measurement} m
* @param {object} options
* @param {boolean} [options.includeFraudAssesment]
*/
function formatMeasurement (m, { includeFraudAssesment } = {}) {
const fields = [
new Date(m.finished_at).toISOString(),
(m.cid ?? '').padEnd(70),
(m.protocol ?? '').padEnd(10)
]

if (includeFraudAssesment) {
fields.push((m.fraudAssessment === 'OK' ? '🫡 ' : '🙅 '))
}

fields.push((m.retrievalResult ?? ''))

return fields.join(' ')
}

/**
* @param {object} options
* @param {boolean} [options.includeFraudAssesment]
*/
function formatHeader ({ includeFraudAssesment } = {}) {
const fields = [
'Timestamp'.padEnd(new Date().toISOString().length),
'CID'.padEnd(70),
'Protocol'.padEnd(10)
]

if (includeFraudAssesment) {
fields.push('🕵️ ')
}

fields.push('RetrievalResult')

return fields.join(' ')
}
113 changes: 24 additions & 89 deletions bin/fetch-recent-miner-measurements.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,8 @@ import { createContracts } from '../lib/contracts.js'
import { fetchMeasurements, preprocess } from '../lib/preprocess.js'
import { RoundData } from '../lib/round.js'
import * as SparkImpactEvaluator from '@filecoin-station/spark-impact-evaluator'
import { evaluate } from '../lib/evaluate.js'
import { fetchRoundDetails } from '../lib/spark-api.js'

const {
STORE_ALL_MINERS,
KEEP_REJECTED
} = process.env
const { STORE_ALL_MINERS } = process.env

Sentry.init({
dsn: 'https://[email protected]/4505906069766144',
Expand Down Expand Up @@ -83,17 +78,12 @@ console.error(' → found %s complete rounds', rounds.length)

const ALL_MEASUREMENTS_FILE = 'measurements-all.ndjson'
const MINER_DATA_FILE = `measurements-${minerId}.ndjson`
const MINER_SUMMARY_FILE = `measurements-${minerId}.txt`

const keepRejected = isFlagEnabled(KEEP_REJECTED)

const allMeasurementsWriter = isFlagEnabled(STORE_ALL_MINERS)
? fs.createWriteStream(ALL_MEASUREMENTS_FILE)
: undefined

const minerDataWriter = fs.createWriteStream(MINER_DATA_FILE)
const minerSummaryWriter = fs.createWriteStream(MINER_SUMMARY_FILE)
minerSummaryWriter.write(formatHeader({ includeFraudAssesment: keepRejected }) + '\n')

const abortController = new AbortController()
const signal = abortController.signal
Expand All @@ -118,7 +108,7 @@ if (signal.aborted) {
console.error('Interrupted, exiting. Output files contain partial data.')
}

console.log('Found %s accepted measurements.', resultCounts.total)
console.log('Found %s valid measurements.', resultCounts.total)
for (const [r, c] of Object.entries(resultCounts)) {
if (r === 'total') continue
console.log(' %s %s (%s%)',
Expand All @@ -132,7 +122,6 @@ if (allMeasurementsWriter) {
console.error('Wrote (ALL) raw measurements to %s', ALL_MEASUREMENTS_FILE)
}
console.error('Wrote (minerId=%s) raw measurements to %s', minerId, MINER_DATA_FILE)
console.error('Wrote human-readable summary for %s to %s', minerId, MINER_SUMMARY_FILE)

/**
* @param {string} contractAddress
Expand Down Expand Up @@ -200,52 +189,39 @@ async function processRound (roundIndex, measurementCids, resultCounts) {
)
signal.throwIfAborted()

const ieContract = {
async getAddress () {
return contractAddress
}
}

console.error(' → evaluating the round')
await evaluate({
roundIndex: round.index,
round,
fetchRoundDetails,
recordTelemetry,
logger: { log: debug, error: debug },
ieContract,
setScores: async () => {},
prepareProviderRetrievalResultStats: async () => {}
})

for (const m of round.measurements) {
if (m.minerId !== minerId || m.fraudAssessment !== 'OK') continue
if (m.minerId !== minerId) continue
resultCounts.total++
resultCounts[m.retrievalResult] = (resultCounts[m.retrievalResult] ?? 0) + 1
}

if (!keepRejected) {
round.measurements = round.measurements
// Keep accepted measurements only
.filter(m => m.fraudAssessment === 'OK')
// Remove the fraudAssessment field as all accepted measurements have the same 'OK' value
.map(m => ({ ...m, fraudAssessment: undefined }))
}

if (allMeasurementsWriter) {
allMeasurementsWriter.write(round.measurements.map(m => JSON.stringify(m) + '\n').join(''))
if (allMeasurementsWriter && round.measurements.length > 0) {
allMeasurementsWriter.write(
round.measurements
.map(measurement => ndJsonLine({ roundIndex: round.index.toString(), measurement }))
.join('')
)
}

const minerMeasurements = round.measurements.filter(m => m.minerId === minerId)
minerDataWriter.write(minerMeasurements.map(m => JSON.stringify(m) + '\n').join(''))
minerSummaryWriter.write(
minerMeasurements
.map(m => formatMeasurement(m, { includeFraudAssesment: keepRejected }) + '\n')
.join('')
)
if (minerMeasurements.length > 0) {
minerDataWriter.write(
minerMeasurements
.map(measurement => ndJsonLine({ roundIndex: round.index.toString(), measurement }))
.join('')
)
}
console.error(' → added %s new measurements from this round', minerMeasurements.length)
}

/**
* @param {*} obj
* @returns string
*/
function ndJsonLine (obj) {
return JSON.stringify(obj) + '\n'
}

/**
* @param {RoundData} round
* @param {string} cid
Expand Down Expand Up @@ -288,47 +264,6 @@ function recordTelemetry (measurementName, fn) {
debug('TELEMETRY %s %o', measurementName, point.fields)
}

/**
* @param {import('../lib/preprocess.js').Measurement} m
* @param {object} options
* @param {boolean} [options.includeFraudAssesment]
*/
function formatMeasurement (m, { includeFraudAssesment } = {}) {
const fields = [
new Date(m.finished_at).toISOString(),
(m.cid ?? '').padEnd(70),
(m.protocol ?? '').padEnd(10)
]

if (includeFraudAssesment) {
fields.push((m.fraudAssessment === 'OK' ? '🫡 ' : '🙅 '))
}

fields.push((m.retrievalResult ?? ''))

return fields.join(' ')
}

/**
* @param {object} options
* @param {boolean} [options.includeFraudAssesment]
*/
function formatHeader ({ includeFraudAssesment } = {}) {
const fields = [
'Timestamp'.padEnd(new Date().toISOString().length),
'CID'.padEnd(70),
'Protocol'.padEnd(10)
]

if (includeFraudAssesment) {
fields.push('🕵️ ')
}

fields.push('RetrievalResult')

return fields.join(' ')
}

/**
* @param {string | undefined} envVarValue
*/
Expand Down

0 comments on commit 9564c8c

Please sign in to comment.