Skip to content

Commit

Permalink
fix: adds more info on stream monitoring (#575)
Browse files Browse the repository at this point in the history
  • Loading branch information
fenos authored Oct 23, 2024
1 parent f0c6953 commit 6b052d8
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 18 deletions.
19 changes: 14 additions & 5 deletions src/internal/streams/monitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,28 @@ import { Readable } from 'node:stream'
export function monitorStream(dataStream: Readable) {
const speedMonitor = monitorStreamSpeed(dataStream)
const byteCounter = createByteCounterStream()
const span = trace.getActiveSpan()

let measures: number[] = []
let measures: any[] = []

// Handle the 'speed' event to collect speed measurements
speedMonitor.on('speed', (bps) => {
measures.push(bps)
const span = trace.getActiveSpan()
span?.setAttributes({ 'stream.speed': measures, bytesRead: byteCounter.bytes })
measures.push({
'stream.speed': bps,
'stream.bytesRead': byteCounter.bytes,
'stream.status': {
paused: dataStream.isPaused(),
closed: dataStream.closed,
},
})

span?.setAttributes({
stream: JSON.stringify(measures),
})
})

speedMonitor.on('close', () => {
measures = []
const span = trace.getActiveSpan()
span?.setAttributes({ uploadRead: byteCounter.bytes })
})

Expand Down
23 changes: 10 additions & 13 deletions src/internal/streams/stream-speed.ts
Original file line number Diff line number Diff line change
@@ -1,35 +1,32 @@
import { Readable } from 'stream'
import { PassThrough } from 'node:stream'

/**
* Keep track of a stream's speed
* @param stream
* @param frequency
*/
/**
* Keep track of a stream's speed
* @param stream
* @param frequency
*/
export function monitorStreamSpeed(stream: Readable, frequency = 1000) {
let totalBytes = 0
const startTime = Date.now()
let lastIntervalBytes = 0

const passThrough = new PassThrough()

const interval = setInterval(() => {
const currentTime = Date.now()
const elapsedTime = (currentTime - startTime) / 1000
const currentSpeedBytesPerSecond = totalBytes / elapsedTime

const emitSpeed = () => {
const currentSpeedBytesPerSecond = lastIntervalBytes / (frequency / 1000)
passThrough.emit('speed', currentSpeedBytesPerSecond)
lastIntervalBytes = 0 // Reset for the next interval
}

const interval = setInterval(() => {
emitSpeed()
}, frequency)

passThrough.on('data', (chunk) => {
totalBytes += chunk.length
lastIntervalBytes += chunk.length // Increment bytes for the current interval
})

const cleanup = () => {
emitSpeed()
clearInterval(interval)
passThrough.removeAllListeners('speed')
}
Expand Down

0 comments on commit 6b052d8

Please sign in to comment.