diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index bc45c4e3..736494bd 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -14,7 +14,6 @@ jobs: strategy: matrix: node_version: - - 14 - 16 - 18 diff --git a/lib/client/bounded-buffer-readable-stream.js b/lib/client/bounded-buffer-readable-stream.js new file mode 100644 index 00000000..c353464e --- /dev/null +++ b/lib/client/bounded-buffer-readable-stream.js @@ -0,0 +1,122 @@ +/** + * Pinpoint Node.js Agent + * Copyright 2021-present NAVER Corp. + * Apache License v2.0 + */ + +'use strict' + +const { Readable } = require('node:stream') +const log = require('../utils/logger') + +class BoundedBufferReadableStream { + constructor(constructorOptions) { + this.buffer = [] + this.options = constructorOptions || {} + this.readableStream = this.makeReadableSteam() + this.maxBufferSize = this.options.maxBuffer || 100 + } + + makeReadableSteam() { + const readableStream = new Readable(Object.assign({ + read: () => { + this.readStart() + } + }, this.options)) + + readableStream.on('error', () => { + // https://nodejs.org/api/stream.html#readablepipedestination-options + // `One important caveat is that if the Readable stream emits an error during processing, + // the Writable destination is not closed automatically. + // If an error occurs, it will be necessary to manually close each stream + // in order to prevent memory leaks.` + // for readable steam error memory leak prevention + if (this.writableSteam && typeof this.writableSteam.end === 'function') { + this.writableSteam.end() + } + }) + + readableStream.on('close', () => { + if (!this.writableFactory) { + return + } + + this.readableStream = this.makeReadableSteam() + this._pipe() + }) + + return readableStream + } + + push(data) { + if (this.buffer.length < this.maxBufferSize) { + this.buffer.push(data) + } + + if (this.canStart()) { + this.readStart() + } + } + + canStart() { + return this.readable + } + + readStart() { + this.readable = true + + const length = this.buffer.length + for (let index = 0; index < length; index++) { + if (!this.readableStream.push(this.buffer.shift())) { + return this.readStop() + } + } + } + + readStop() { + this.readable = false + } + + end() { + this.readableStream.end() + } + + pipe(writableFactory) { + this.writableFactory = writableFactory + this._pipe() + } + + _pipe() { + if (typeof this.writableFactory !== 'function') { + return + } + + const writableSteam = this.writableFactory() + if (!writableSteam) { + return + } + + writableSteam.on('error', (error) => { + if (error) { + log.error('writable steam error', error) + } + }) + + writableSteam.on('unpipe', () => { + this.readStop() + }) + + writableSteam.on('close', () => { + + }) + + this.readableStream.pipe(writableSteam) + this.writableSteam = writableSteam + } + + setEncoding(encoding) { + this.readableStream.setEncoding(encoding) + } +} + +module.exports = BoundedBufferReadableStream \ No newline at end of file diff --git a/package-lock.json b/package-lock.json index 4feebe50..2c292dc8 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "pinpoint-node-agent", - "version": "0.9.0-next.4", + "version": "1.0.0-next.1", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "pinpoint-node-agent", - "version": "0.9.0-next.4", + "version": "1.0.0-next.1", "license": "Apache-2.0", "dependencies": { "@grpc/grpc-js": "^1.2.3", @@ -22,7 +22,7 @@ "devDependencies": { "@types/semver": "^7.3.13", "@types/shimmer": "^1.0.2", - "axios": "^1.6.2", + "axios": "^1.6.8", "eslint": "^8.43.0", "eslint-config-prettier": "^3.6.0", "eslint-plugin-import": "^2.25.2", @@ -47,7 +47,7 @@ "typescript": "^4.8.3" }, "engines": { - "node": ">=14.0" + "node": ">=16.0" } }, "node_modules/@aashutoshrathi/word-wrap": { @@ -770,9 +770,9 @@ } }, "node_modules/@types/dockerode": { - "version": "3.3.19", - "resolved": "https://registry.npmjs.org/@types/dockerode/-/dockerode-3.3.19.tgz", - "integrity": "sha512-7CC5yIpQi+bHXwDK43b/deYXteP3Lem9gdocVVHJPSRJJLMfbiOchQV3rDmAPkMw+n3GIVj7m1six3JW+VcwwA==", + "version": "3.3.26", + "resolved": "https://registry.npmjs.org/@types/dockerode/-/dockerode-3.3.26.tgz", + "integrity": "sha512-/K+I9bGhRO2SvyIHisGeOsy/ypxnWLz8+Rde9S2tNNEKa3r91e0XMYIEq2D+kb7srm7xrmpAR0CDKfXoZOr4OA==", "dev": true, "dependencies": { "@types/docker-modem": "*", @@ -938,16 +938,16 @@ } }, "node_modules/archiver": { - "version": "5.3.1", - "resolved": "https://registry.npmjs.org/archiver/-/archiver-5.3.1.tgz", - "integrity": "sha512-8KyabkmbYrH+9ibcTScQ1xCJC/CGcugdVIwB+53f5sZziXgwUh3iXlAlANMxcZyDEfTHMe6+Z5FofV8nopXP7w==", + "version": "5.3.2", + "resolved": "https://registry.npmjs.org/archiver/-/archiver-5.3.2.tgz", + "integrity": "sha512-+25nxyyznAXF7Nef3y0EbBeqmGZgeN/BxHX29Rs39djAfaFalmQ89SE6CWyDCHzGL0yt/ycBtNOmGTW0FyGWNw==", "dev": true, "dependencies": { "archiver-utils": "^2.1.0", - "async": "^3.2.3", + "async": "^3.2.4", "buffer-crc32": "^0.2.1", "readable-stream": "^3.6.0", - "readdir-glob": "^1.0.0", + "readdir-glob": "^1.1.2", "tar-stream": "^2.2.0", "zip-stream": "^4.1.0" }, @@ -1147,9 +1147,9 @@ "dev": true }, "node_modules/async-lock": { - "version": "1.4.0", - "resolved": "https://registry.npmjs.org/async-lock/-/async-lock-1.4.0.tgz", - "integrity": "sha512-coglx5yIWuetakm3/1dsX9hxCNox22h7+V80RQOu2XUUMidtArxKoZoOtHUPuR84SycKTXzgGzAUR5hJxujyJQ==", + "version": "1.4.1", + "resolved": "https://registry.npmjs.org/async-lock/-/async-lock-1.4.1.tgz", + "integrity": "sha512-Az2ZTpuytrtqENulXwO3GGv1Bztugx6TT37NIo7imr/Qo0gsYiGtSdBa2B6fsXhTpVZDNfu1Qn3pk531e3q+nQ==", "dev": true }, "node_modules/asynckit": { @@ -1171,12 +1171,12 @@ } }, "node_modules/axios": { - "version": "1.6.2", - "resolved": "https://registry.npmjs.org/axios/-/axios-1.6.2.tgz", - "integrity": "sha512-7i24Ri4pmDRfJTR7LDBhsOTtcm+9kjX5WiY1X3wIisx6G9So3pfMkEiU7emUBe46oceVImccTEM3k6C5dbVW8A==", + "version": "1.6.8", + "resolved": "https://registry.npmjs.org/axios/-/axios-1.6.8.tgz", + "integrity": "sha512-v/ZHtJDU39mDpyBoFVkETcd/uNdxrWRrg3bKpOKzXFA6Bvqopts6ALSMU3y6ijYxbw2B+wPrIv46egTzJXCLGQ==", "dev": true, "dependencies": { - "follow-redirects": "^1.15.0", + "follow-redirects": "^1.15.6", "form-data": "^4.0.0", "proxy-from-env": "^1.1.0" } @@ -1570,9 +1570,9 @@ } }, "node_modules/cluster-key-slot": { - "version": "1.1.0", - "resolved": "https://registry.npmjs.org/cluster-key-slot/-/cluster-key-slot-1.1.0.tgz", - "integrity": "sha512-2Nii8p3RwAPiFwsnZvukotvow2rIHM+yQ6ZcBXGHdniadkYGZYiGmkHJIbZPIV9nfv7m/U1IPMVVcAhoWFeklw==", + "version": "1.1.2", + "resolved": "https://registry.npmjs.org/cluster-key-slot/-/cluster-key-slot-1.1.2.tgz", + "integrity": "sha512-RMr0FhtfXemyinomL4hrWcYJxmX6deFdCxpJzhDttxgO1+bcCnkk+9drydLVDmAMG7NE6aN/fl4F7ucU/90gAA==", "dev": true, "engines": { "node": ">=0.10.0" @@ -2081,9 +2081,9 @@ } }, "node_modules/docker-compose": { - "version": "0.24.2", - "resolved": "https://registry.npmjs.org/docker-compose/-/docker-compose-0.24.2.tgz", - "integrity": "sha512-2/WLvA7UZ6A2LDLQrYW0idKipmNBWhtfvrn2yzjC5PnHDzuFVj1zAZN6MJxVMKP0zZH8uzAK6OwVZYHGuyCmTw==", + "version": "0.24.7", + "resolved": "https://registry.npmjs.org/docker-compose/-/docker-compose-0.24.7.tgz", + "integrity": "sha512-CdHl9n0S4+bl4i6MaxDQHNjqB1FdvuDirrDTzPKmdiMpheQqCjgsny0GZ2VhvN7qHTY0833lRlKWZgrkn1i6cg==", "dev": true, "dependencies": { "yaml": "^2.2.2" @@ -3297,9 +3297,9 @@ } }, "node_modules/follow-redirects": { - "version": "1.15.5", - "resolved": "https://registry.npmjs.org/follow-redirects/-/follow-redirects-1.15.5.tgz", - "integrity": "sha512-vSFWUON1B+yAw1VN4xMfxgn5fTUiaOzAJCKBwIIgT/+7CuGy9+r+5gITvP62j3RmaD5Ph65UaERdOSRGUzZtgw==", + "version": "1.15.6", + "resolved": "https://registry.npmjs.org/follow-redirects/-/follow-redirects-1.15.6.tgz", + "integrity": "sha512-wWN62YITEaOpSK584EZXJafH1AGpO8RVgElfkuXbTOrPX4fIfOyEpW/CsiNd8JdYrAoOvafRTOEnvsO++qCqFA==", "dev": true, "funding": [ { @@ -8473,9 +8473,9 @@ } }, "@types/dockerode": { - "version": "3.3.19", - "resolved": "https://registry.npmjs.org/@types/dockerode/-/dockerode-3.3.19.tgz", - "integrity": "sha512-7CC5yIpQi+bHXwDK43b/deYXteP3Lem9gdocVVHJPSRJJLMfbiOchQV3rDmAPkMw+n3GIVj7m1six3JW+VcwwA==", + "version": "3.3.26", + "resolved": "https://registry.npmjs.org/@types/dockerode/-/dockerode-3.3.26.tgz", + "integrity": "sha512-/K+I9bGhRO2SvyIHisGeOsy/ypxnWLz8+Rde9S2tNNEKa3r91e0XMYIEq2D+kb7srm7xrmpAR0CDKfXoZOr4OA==", "dev": true, "requires": { "@types/docker-modem": "*", @@ -8617,16 +8617,16 @@ } }, "archiver": { - "version": "5.3.1", - "resolved": "https://registry.npmjs.org/archiver/-/archiver-5.3.1.tgz", - "integrity": "sha512-8KyabkmbYrH+9ibcTScQ1xCJC/CGcugdVIwB+53f5sZziXgwUh3iXlAlANMxcZyDEfTHMe6+Z5FofV8nopXP7w==", + "version": "5.3.2", + "resolved": "https://registry.npmjs.org/archiver/-/archiver-5.3.2.tgz", + "integrity": "sha512-+25nxyyznAXF7Nef3y0EbBeqmGZgeN/BxHX29Rs39djAfaFalmQ89SE6CWyDCHzGL0yt/ycBtNOmGTW0FyGWNw==", "dev": true, "requires": { "archiver-utils": "^2.1.0", - "async": "^3.2.3", + "async": "^3.2.4", "buffer-crc32": "^0.2.1", "readable-stream": "^3.6.0", - "readdir-glob": "^1.0.0", + "readdir-glob": "^1.1.2", "tar-stream": "^2.2.0", "zip-stream": "^4.1.0" }, @@ -8788,9 +8788,9 @@ "dev": true }, "async-lock": { - "version": "1.4.0", - "resolved": "https://registry.npmjs.org/async-lock/-/async-lock-1.4.0.tgz", - "integrity": "sha512-coglx5yIWuetakm3/1dsX9hxCNox22h7+V80RQOu2XUUMidtArxKoZoOtHUPuR84SycKTXzgGzAUR5hJxujyJQ==", + "version": "1.4.1", + "resolved": "https://registry.npmjs.org/async-lock/-/async-lock-1.4.1.tgz", + "integrity": "sha512-Az2ZTpuytrtqENulXwO3GGv1Bztugx6TT37NIo7imr/Qo0gsYiGtSdBa2B6fsXhTpVZDNfu1Qn3pk531e3q+nQ==", "dev": true }, "asynckit": { @@ -8806,12 +8806,12 @@ "dev": true }, "axios": { - "version": "1.6.2", - "resolved": "https://registry.npmjs.org/axios/-/axios-1.6.2.tgz", - "integrity": "sha512-7i24Ri4pmDRfJTR7LDBhsOTtcm+9kjX5WiY1X3wIisx6G9So3pfMkEiU7emUBe46oceVImccTEM3k6C5dbVW8A==", + "version": "1.6.8", + "resolved": "https://registry.npmjs.org/axios/-/axios-1.6.8.tgz", + "integrity": "sha512-v/ZHtJDU39mDpyBoFVkETcd/uNdxrWRrg3bKpOKzXFA6Bvqopts6ALSMU3y6ijYxbw2B+wPrIv46egTzJXCLGQ==", "dev": true, "requires": { - "follow-redirects": "^1.15.0", + "follow-redirects": "^1.15.6", "form-data": "^4.0.0", "proxy-from-env": "^1.1.0" } @@ -9091,9 +9091,9 @@ } }, "cluster-key-slot": { - "version": "1.1.0", - "resolved": "https://registry.npmjs.org/cluster-key-slot/-/cluster-key-slot-1.1.0.tgz", - "integrity": "sha512-2Nii8p3RwAPiFwsnZvukotvow2rIHM+yQ6ZcBXGHdniadkYGZYiGmkHJIbZPIV9nfv7m/U1IPMVVcAhoWFeklw==", + "version": "1.1.2", + "resolved": "https://registry.npmjs.org/cluster-key-slot/-/cluster-key-slot-1.1.2.tgz", + "integrity": "sha512-RMr0FhtfXemyinomL4hrWcYJxmX6deFdCxpJzhDttxgO1+bcCnkk+9drydLVDmAMG7NE6aN/fl4F7ucU/90gAA==", "dev": true }, "co": { @@ -9467,9 +9467,9 @@ "dev": true }, "docker-compose": { - "version": "0.24.2", - "resolved": "https://registry.npmjs.org/docker-compose/-/docker-compose-0.24.2.tgz", - "integrity": "sha512-2/WLvA7UZ6A2LDLQrYW0idKipmNBWhtfvrn2yzjC5PnHDzuFVj1zAZN6MJxVMKP0zZH8uzAK6OwVZYHGuyCmTw==", + "version": "0.24.7", + "resolved": "https://registry.npmjs.org/docker-compose/-/docker-compose-0.24.7.tgz", + "integrity": "sha512-CdHl9n0S4+bl4i6MaxDQHNjqB1FdvuDirrDTzPKmdiMpheQqCjgsny0GZ2VhvN7qHTY0833lRlKWZgrkn1i6cg==", "dev": true, "requires": { "yaml": "^2.2.2" @@ -10440,9 +10440,9 @@ "integrity": "sha512-PIaqOGvVH5P+R92Ywy5PumsNEHvondVQh42SGOmkA9A0ZTFbfguzZpjZ/Gy3WVRUqT9Ia8k5tWlJeiZQzRHA7g==" }, "follow-redirects": { - "version": "1.15.5", - "resolved": "https://registry.npmjs.org/follow-redirects/-/follow-redirects-1.15.5.tgz", - "integrity": "sha512-vSFWUON1B+yAw1VN4xMfxgn5fTUiaOzAJCKBwIIgT/+7CuGy9+r+5gITvP62j3RmaD5Ph65UaERdOSRGUzZtgw==", + "version": "1.15.6", + "resolved": "https://registry.npmjs.org/follow-redirects/-/follow-redirects-1.15.6.tgz", + "integrity": "sha512-wWN62YITEaOpSK584EZXJafH1AGpO8RVgElfkuXbTOrPX4fIfOyEpW/CsiNd8JdYrAoOvafRTOEnvsO++qCqFA==", "dev": true }, "for-each": { diff --git a/package.json b/package.json index 7185cfd4..066b46af 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "pinpoint-node-agent", - "version": "0.9.0-next.4", + "version": "1.0.0-next.1", "main": "index.js", "types": "index.d.ts", "type": "commonjs", @@ -23,7 +23,7 @@ }, "homepage": "https://github.com/pinpoint-apm/pinpoint-node-agent", "engines": { - "node": ">=14.0" + "node": ">=16.0" }, "keywords": [ "pinpoint", @@ -64,7 +64,7 @@ "devDependencies": { "@types/semver": "^7.3.13", "@types/shimmer": "^1.0.2", - "axios": "^1.6.2", + "axios": "^1.6.8", "eslint": "^8.43.0", "eslint-config-prettier": "^3.6.0", "eslint-plugin-import": "^2.25.2", diff --git a/test/agent.test.js b/test/agent.test.js index 21f66879..92bce2a0 100644 --- a/test/agent.test.js +++ b/test/agent.test.js @@ -14,5 +14,5 @@ test('Should initialize agent', function (t) { const agent = require('./support/agent-singleton-mock') t.ok(agent) - t.equal(agent.pinpointClient.agentInfo.agentVersion, '0.9.0-next.4', 'agent version from package.json') + t.equal(agent.pinpointClient.agentInfo.agentVersion, '1.0.0-next.1', 'agent version from package.json') }) \ No newline at end of file diff --git a/test/client/bounded-buffer-readable-stream.test.js b/test/client/bounded-buffer-readable-stream.test.js new file mode 100644 index 00000000..70d507d7 --- /dev/null +++ b/test/client/bounded-buffer-readable-stream.test.js @@ -0,0 +1,225 @@ +/** + * Pinpoint Node.js Agent + * Copyright 2021-present NAVER Corp. + * Apache License v2.0 + */ + +'use strict' + +const test = require('tape') +const BoundedBufferReadableStream = require('../../lib/client/bounded-buffer-readable-stream') +const { Writable } = require('node:stream') +const semver = require('semver') + +test('no piped readable steam', (t) => { + const readable = new BoundedBufferReadableStream() + readable.push('test1') + readable.push('test2') + readable.push(null) + + t.equal(readable.readableStream.readable, true, 'no writable stream piped readable steam not started') + t.equal(readable.buffer.length, 3, 'no writable stream piped readable steam buffer is not empty') + t.end() +}) + +test('piped readable steam', (t) => { + const readable = new BoundedBufferReadableStream({ encoding: 'utf8' }) + readable.push('test1') + readable.push('test2') + readable.push(null) + + readable.pipe(() => { + const actualSteams = [] + const writable = new Writable({ + write(chunk, encoding, callback) { + actualSteams.push(chunk.toString()) + callback() + } + }) + writable.on('finish', () => { + t.equal(readable.readableStream.readable, false, 'piped readable steam finished') + t.equal(readable.buffer.length, 0, 'piped readable steam buffer is empty') + + t.equal(actualSteams.length, 2, 'piped readable steam') + t.equal(actualSteams[0], 'test1', 'piped readable steam test1') + t.equal(actualSteams[1], 'test2', 'piped readable steam test2') + + t.end() + }) + return writable + }) +}) + +test('reconnect writable stream on piped readable stream', (t) => { + t.plan(10) + const readable = new BoundedBufferReadableStream({ encoding: 'utf8' }) + readable.push('test1') + readable.push('test2') + + readable.readableStream.on('error', () => { + t.fail('piped readable steam error event is not called') + }) + + readable.readableStream.on('end', () => { + t.fail('piped readable steam end event is not called') + }) + + readable.readableStream.on('close', () => { + t.fail('piped readable steam close event is not called') + }) + + readable.readableStream.on('unpipe', () => { + t.fail('piped readable steam unpipe event is not called') + }) + + readable.pipe(() => { + let count = 0 + const actualSteams = [] + const writableStream = new Writable({ + write(chunk, encoding, callback) { + actualSteams.push(chunk.toString()) + callback() + count++ + + if (count === 2) { + // Writable.prototype.pipe is only public method for errorOrDestroy + // Writable.prototype.pipe = function() { + // errorOrDestroy(this, new ERR_STREAM_CANNOT_PIPE()); + // }; + writableStream.pipe() + } + } + }) + writableStream.on('error', (error) => { + t.equal(error.message, 'Cannot pipe, not readable', 'piped writable steam error') + t.equal(readable.readableStream.readable, true, 'piped readable steam is readable on close event') + t.equal(readable.buffer.length, 0, 'piped readable steam buffer is empty') + + t.equal(actualSteams.length, 2, 'piped readable steam') + t.equal(actualSteams[0], 'test1', 'piped readable steam test1') + t.equal(actualSteams[1], 'test2', 'piped readable steam test2') + + t.false(readable.readable, 'piped readable steam is unpiped, so is not readable') + }) + + writableStream.on('finish', () => { + t.fail('piped writable steam finish event is not called') + }) + + writableStream.on('close', () => { + t.equal(readable.readableStream.readable, true, 'piped readable steam is no readable on close event') + t.equal(writableStream.destroyed, true, 'piped writable steam is destroyed on close event') + }) + + writableStream.on('unpipe', () => { + t.true(true, 'piped readable steam unpipe event is not called') + }) + + return writableStream + }) +}) + +test('If the Readable stream emits an error during processing, the writable destination is not closed automatically. If an error occurs, Close each streams, ', async (t) => { + if (semver.satisfies(process.versions.node, '<17.0')) { + t.plan(16) + } else { + t.plan(19) + } + + const readable = new BoundedBufferReadableStream() + readable.push('test1') + readable.push('test2') + readable.readableStream.on('error', (error) => { + t.equal(error.message, 'The "chunk" argument must be of type string or an instance of Buffer or Uint8Array. Received type number (1)', 'readable steam error message check') + t.equal(readable.buffer.length, 0, 'readable steam buffer is empty') + t.equal(readable.readableStream.readable, false, 'the readable of readable steam is false on error event') + if (semver.satisfies(process.versions.node, '>17.0')) { + t.true(readable.readableStream.closed, 'closed property is true on error event of readable steam') + } + t.true(readable.readableStream.destroyed, 'destroyed property is true on error event of readable steam') + }) + readable.readableStream.on('end', () => { + t.fail('readable steam end event is not called') + }) + readable.readableStream.on('close', function () { + t.false(this.readable, 'readable property is false on close event of readable steam') + if (semver.satisfies(process.versions.node, '>17.0')) { + t.true(this.closed, 'closed property is true on close event of readable steam') + } + t.true(this.destroyed, 'destroyed property is true on close event of readable steam') + + // recovery readable steam + t.true(readable.readableStream.readable, 'readable property is true on close event of recovery readable steam after error') + }) + + t.equal(readable.buffer[0], 'test1', 'buffer[0] is test1') + + readable.pipe(() => { + const actualSteams = [] + const writableStream = new Writable({ + write(chunk, encoding, callback) { + actualSteams.push(chunk.toString()) + callback() + } + }) + + writableStream.on('error', () => { + t.fail('If readable stream occur an error, piped writable stream error event is not called') + }) + + writableStream.on('finish', function () { + t.comment('If readable stream occur an error, piped writable steam is manually must be close') + t.false(this.closed, 'closed property is false on the finish event of writable steam') + t.false(this.destroyed, 'destroyed property is false on the finish event of writable steam') + t.false(this.writable, 'writable property is false on the finish event of writable steam') + }) + + writableStream.on('close', function () { + if (semver.satisfies(process.versions.node, '>17.0')) { + t.true(this.closed, 'closed property is true on the close event of writable steam') + } + t.true(this.destroyed, 'destroyed property is true on the close event of writable steam') + t.false(this.writable, 'writable property is false on the close event of writable steam') + }) + + writableStream.on('unpipe', function () { + t.comment('If readable stream occur an error, piped writable steam unpipe event is must be called') + t.false(this.closed, 'closed property is false on the unpipe event of writable steam') + t.false(this.destroyed, 'destroyed property is false on the unpipe event of writable steam') + t.false(this.writable, 'writable property is false on the unpipe event of writable steam') + }) + + return writableStream + }) + process.nextTick(() => { + const originalReadStart = readable.readStart + // readable stream has only errorOrDestroy method on _read method + // try { + // this._read(state.highWaterMark); + // } catch (err) { + // errorOrDestroy(this, err); + // } + readable.readStart = () => { + process.nextTick(() => { + readable.readStart = originalReadStart + }) + return readable.readableStream.push(1) + } + + readable.readableStream.push('test3') + }) +}) + +test('Max buffer size', (t) => { + const dut = new BoundedBufferReadableStream({ maxBuffer: 2 }) + dut.push('test1') + dut.push('test2') + t.true(dut.buffer.length === 2, 'buffer size is 2') + + dut.push('test3') + t.true(dut.buffer.length === 2, 'buffer size is 2') + t.equal(dut.buffer[0], 'test1', 'buffer[0] is test1') + t.equal(dut.buffer[1], 'test2', 'buffer[1] is test2') + + t.end() +}) \ No newline at end of file