diff --git a/packages/server/src/models/StreamSplitter.ts b/packages/server/src/models/StreamSplitter.ts index db977a3f..97d79e09 100644 --- a/packages/server/src/models/StreamSplitter.ts +++ b/packages/server/src/models/StreamSplitter.ts @@ -44,18 +44,19 @@ export class StreamSplitter extends stream.Writable { await this._newChunk() } - const overflow = this.currentChunkSize + chunk.length - this.chunkSize + let overflow = this.currentChunkSize + chunk.length - this.chunkSize + // The current chunk will be more than our defined part size if we would // write all of it to disk. - if (overflow > 0) { + while (overflow > 0) { // Only write to disk the up to our defined part size. - await this._writeChunk(chunk.slice(0, chunk.length - overflow)) + await this._writeChunk(chunk.subarray(0, chunk.length - overflow)) await this._finishChunk() + // We still have some overflow left, so we write it to a new chunk. await this._newChunk() - await this._writeChunk(chunk.slice(chunk.length - overflow, chunk.length)) - callback(null) - return + chunk = chunk.subarray(chunk.length - overflow, chunk.length) + overflow = this.currentChunkSize + chunk.length - this.chunkSize } // The chunk is smaller than our defined part size so we can just write it to disk. diff --git a/packages/server/test/StreamSplitter.test.ts b/packages/server/test/StreamSplitter.test.ts index 1518e35c..c9cc5907 100644 --- a/packages/server/test/StreamSplitter.test.ts +++ b/packages/server/test/StreamSplitter.test.ts @@ -4,6 +4,7 @@ import stream from 'node:stream/promises' import {strict as assert} from 'node:assert' import {StreamSplitter} from '../src/models' +import {Readable} from 'node:stream' const fileSize = 20_971_520 @@ -25,4 +26,29 @@ describe('StreamSplitter', () => { await stream.pipeline(readStream, splitterStream) assert.equal(offset, fileSize) }) + + it('should split to multiple chunks when single buffer exceeds chunk size', async () => { + const optimalChunkSize = 1024 + const expectedChunks = 7 + + const readStream = Readable.from([Buffer.alloc(expectedChunks * optimalChunkSize)]) + + let chunksStarted = 0 + let chunksFinished = 0 + const splitterStream = new StreamSplitter({ + chunkSize: optimalChunkSize, + directory: os.tmpdir(), + }) + .on('chunkStarted', () => { + chunksStarted++ + }) + .on('chunkFinished', () => { + chunksFinished++ + }) + + await stream.pipeline(readStream, splitterStream) + + assert.equal(chunksStarted, expectedChunks) + assert.equal(chunksFinished, expectedChunks) + }) })