From b1b8fd089947af1ec870fcea2a391c4425e45d60 Mon Sep 17 00:00:00 2001 From: Bailey Pearson Date: Tue, 12 Nov 2024 22:16:52 -0700 Subject: [PATCH] wip --- .github/workflows/test.yml | 2 +- binding.gyp | 19 ++++++--- etc/install-zstd.sh | 11 +++-- index.d.ts | 4 +- src/addon.cpp | 68 ++++++++++++++++++++++++++++--- src/compression_worker.h | 18 ++++++++- src/compressor.h | 43 ++++++++++++++++++++ src/decompressor.h | 43 ++++++++++++++++++++ src/frame_header.h | 82 -------------------------------------- src/napi_utils.h | 12 ++++++ test/index.test.js | 81 ++++++++++++++++++------------------- 11 files changed, 238 insertions(+), 145 deletions(-) create mode 100644 src/compressor.h create mode 100644 src/decompressor.h delete mode 100644 src/frame_header.h diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 9c4c84f..adc8ffe 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -25,7 +25,7 @@ jobs: registry-url: 'https://registry.npmjs.org' - name: Build with Node.js ${{ matrix.node }} on ${{ matrix.os }} - run: npm install && npm run compile + run: npm install --ignore-scripts && bash etc/install-zstd.sh && npm run compile shell: bash - name: Test ${{ matrix.os }} diff --git a/binding.gyp b/binding.gyp index 71366ce..4128710 100644 --- a/binding.gyp +++ b/binding.gyp @@ -4,21 +4,25 @@ 'type': 'loadable_module', 'defines': ['ZSTD_STATIC_LINKING_ONLY'], 'include_dirs': [ + "<(module_root_dir)/deps/zstd/lib", " -export declare function decompress(data: Buffer): Promise +export declare function compress(data: Buffer, level?: number | undefined | null): Promise; +export declare function decompress(data: Buffer): Promise; diff --git a/src/addon.cpp b/src/addon.cpp index 5ce6a19..294a2db 100644 --- a/src/addon.cpp +++ b/src/addon.cpp @@ -1,14 +1,70 @@ #include +#include +#include + +#include "compression_worker.h" +#include "napi_utils.h" +#include "compressor.h" +#include "decompressor.h" + +#include "zstd.h" + using namespace Napi; -Napi::String Compress(const Napi::CallbackInfo& info) { - auto string = Napi::String::New(info.Env(), "compress()"); - return string; +Napi::Promise Compress(const Napi::CallbackInfo& info) { + auto number_of_args = info.Length(); + size_t compression_level; + Napi::Uint8Array to_compress; + + if (number_of_args == 0) { + std::string error_message = + "compress(uint8array) or compress(uint8array, compression level)"; + throw TypeError::New(info.Env(), error_message); + } else if (number_of_args == 1) { + to_compress = Uint8ArrayFromValue(info[0], "buffer"); + compression_level = 3; + } else if (number_of_args == 2) { + to_compress = Uint8ArrayFromValue(info[0], "buffer"); + if (!info[1].IsNumber()) { + throw TypeError::New(info.Env(), + std::string("if provided, compression_level must be a number.")); + } + compression_level = (size_t)info[1].ToNumber().Int32Value(); + } else { + std::string error_message = + "compress(uint8array) or compress(uint8array, compression level)"; + throw TypeError::New(info.Env(), error_message); + } + + Compressor compressor = Compressor::fromUint8Array(to_compress, compression_level); + Worker* worker = new Worker(info.Env(), std::move(compressor)); + + worker->Queue(); + + return worker->GetPromise(); } -Napi::String Decompress(const Napi::CallbackInfo& info) { - auto string = Napi::String::New(info.Env(), "decompress()"); - return string; + +Napi::Promise Decompress(const CallbackInfo& info) { + auto number_of_args = info.Length(); + Napi::Uint8Array compressed_data; + + if (number_of_args == 0) { + std::string error_message = "decompress(uint8array)"; + throw TypeError::New(info.Env(), error_message); + } else if (number_of_args == 1) { + compressed_data = Uint8ArrayFromValue(info[0], "buffer"); + } else { + std::string error_message = "decompress(uint8array)"; + throw TypeError::New(info.Env(), error_message); + } + + Decompressor decompressor = Decompressor::fromUint8Array(compressed_data); + Worker* worker = new Worker(info.Env(), decompressor); + + worker->Queue(); + + return worker->GetPromise(); } Napi::Object Init(Napi::Env env, Napi::Object exports) { diff --git a/src/compression_worker.h b/src/compression_worker.h index 239f5ef..b68f0fd 100644 --- a/src/compression_worker.h +++ b/src/compression_worker.h @@ -4,6 +4,10 @@ using namespace Napi; +/** + * @brief A class that represents the result of a compression operation. Once the MACOS_DEPLOYMENT_TARGET can be raised to 10.13 and use + * a c++17, we can remove this class and use a std::optional, std::string>>> instead. + */ struct CompressionResult { CompressionResult(std::string error, std::vector result, @@ -37,6 +41,11 @@ struct CompressionResult { bool initialized; }; +/** + * @brief An asynchronous Napi::Worker that can be with any functor that produces CompressionResults. + * + * @tparam TWorker - The functor to call asynchronously. + */ template class Worker : public Napi::AsyncWorker { public: @@ -59,15 +68,20 @@ class Worker : public Napi::AsyncWorker { if (!result.initialized) { m_deferred.Reject( Napi::Error::New( - Env(), "Decompression runtime error - did not receive a result or a value.") + Env(), "zstd runtime error - async worker finished without a compression or decompression result.") .Value()); } else if (result.hasError) { m_deferred.Reject(Napi::Error::New(Env(), result.error).Value()); - } else { + } else if (result.hasResult) { Uint8Array output = Uint8Array::New(m_deferred.Env(), result.result.size()); std::copy(result.result.begin(), result.result.end(), output.Data()); m_deferred.Resolve(output); + } else { + m_deferred.Reject( + Napi::Error::New( + Env(), "zstd runtime error - async worker finished without a compression or decompression result.") + .Value()); } } diff --git a/src/compressor.h b/src/compressor.h new file mode 100644 index 0000000..c3985da --- /dev/null +++ b/src/compressor.h @@ -0,0 +1,43 @@ + +#include +#include + +#include "zstd.h" + +using namespace Napi; + +struct Compressor { + std::vector data; + size_t compression_level; + + Compressor(std::vector data, size_t compression_level) + : data(data), compression_level(compression_level) {} + + CompressionResult operator()() { + size_t output_buffer_size = ZSTD_compressBound(data.size()); + std::vector output(output_buffer_size); + + size_t result_code = ZSTD_compress( + output.data(), output.size(), data.data(), data.size(), compression_level); + + if (ZSTD_isError(result_code)) { + std::string error(ZSTD_getErrorName(result_code)); + return CompressionResult::Error(error); + } + + output.resize(result_code); + + return CompressionResult::Ok(output); + } + + static Compressor fromUint8Array(const Uint8Array& to_compress, size_t compression_level) { + const uint8_t* input_data = to_compress.Data(); + size_t total = to_compress.ElementLength(); + + std::vector data(to_compress.ElementLength()); + + std::copy(input_data, input_data + total, data.data()); + + return Compressor(std::move(data), compression_level); + } +}; \ No newline at end of file diff --git a/src/decompressor.h b/src/decompressor.h new file mode 100644 index 0000000..1d81977 --- /dev/null +++ b/src/decompressor.h @@ -0,0 +1,43 @@ +#include + +#include + + + +#include "zstd.h" + +using namespace Napi; + +struct Decompressor { + std::vector data; + size_t buffer_size; + + Decompressor(std::vector data, size_t buffer_size) + : data(data), buffer_size(buffer_size) {} + + CompressionResult operator()() { + std::vector decompressed(buffer_size); + + size_t _result = + ZSTD_decompress(decompressed.data(), decompressed.size(), data.data(), data.size()); + + if (ZSTD_isError(_result)) { + std::string error(ZSTD_getErrorName(_result)); + return CompressionResult::Error(error); + } + + decompressed.resize(_result); + + return CompressionResult::Ok(decompressed); + } + + static Decompressor fromUint8Array(const Uint8Array& compressed_data) { + const uint8_t* input_data = compressed_data.Data(); + size_t total = compressed_data.ElementLength(); + + std::vector data(total); + std::copy(input_data, input_data + total, data.data()); + + return Decompressor(data, total * 1000); + } +}; diff --git a/src/frame_header.h b/src/frame_header.h deleted file mode 100644 index ed04f40..0000000 --- a/src/frame_header.h +++ /dev/null @@ -1,82 +0,0 @@ - - -#include - -#include "zstd.h" - -using namespace Napi; - -struct FrameHeader { - struct Deleter { - void operator()(ZSTD_frameHeader* header) noexcept { - free(header); - } - }; - - unsigned long long frameContentSize; - unsigned long long windowSize; - ZSTD_frameType_e frameType; - unsigned headerSize; - unsigned dictID; - unsigned checksumFlag; - - FrameHeader(Uint8Array array) : FrameHeader(parseFrameHeader(array)){}; - - private: - FrameHeader(std::unique_ptr ptr) - : frameContentSize(ptr->frameContentSize), - windowSize(ptr->windowSize), - frameType(ptr->frameType), - headerSize(ptr->headerSize), - dictID(ptr->dictID), - checksumFlag(ptr->checksumFlag){}; - - static std::unique_ptr parseFrameHeader( - Uint8Array buffer) { - std::unique_ptr ptr( - (ZSTD_frameHeader*)malloc(sizeof(ZSTD_frameHeader))); - auto result = ZSTD_getFrameHeader(ptr.get(), buffer.Data(), buffer.ByteLength()); - - if (ZSTD_isError(result)) { - std::string error_message = - std::string("Error parsing frame header: ") + ZSTD_getErrorName(result); - throw Error::New(buffer.Env(), error_message); - } - - return ptr; - } -}; - -/** useful for debugging */ -Value getFrameHeader(const CallbackInfo& info) { - if (info.Length() != 1) { - throw TypeError::New(info.Env(), "must provide a buffer."); - } - - Napi::Value arg_buffer = info[0]; - auto parse_arg = [](Napi::Value v) -> Uint8Array { - if (!v.IsTypedArray() || v.As().TypedArrayType() != napi_uint8_array) { - std::string error_message = "Parameter must be a Uint8Array."; - throw TypeError::New(v.Env(), error_message); - } - return v.As(); - }; - - Uint8Array buffer = parse_arg(arg_buffer); - FrameHeader header(buffer); - - Object return_value = Object::New(info.Env()); - - return_value["windowSize"] = BigInt::New(info.Env(), uint64_t(header.windowSize)); - return_value["frameContentSize"] = - header.frameContentSize == ZSTD_CONTENTSIZE_UNKNOWN - ? String::New(info.Env(), "unknown") - : BigInt::New(info.Env(), uint64_t(header.frameContentSize)); - return_value["headerSize"] = BigInt::New(info.Env(), uint64_t(header.headerSize)); - return_value["dictID"] = BigInt::New(info.Env(), uint64_t(header.dictID)); - return_value["checksumFlag"] = BigInt::New(info.Env(), uint64_t(header.checksumFlag)); - return_value["frameType"] = String::New( - info.Env(), header.frameType == ZSTD_frame ? "zstd frame" : "zstd skippable frame"); - - return return_value; -} \ No newline at end of file diff --git a/src/napi_utils.h b/src/napi_utils.h index 9e0e548..33cabf8 100644 --- a/src/napi_utils.h +++ b/src/napi_utils.h @@ -2,6 +2,10 @@ using namespace Napi; +/** + * @brief Given a T* source and a T* destination, copies count + * elements from source into destination. + */ template void copy_buffer_data(T* source, T* dest, size_t count) { for (size_t i = 0; i < count; ++i) { @@ -9,6 +13,14 @@ void copy_buffer_data(T* source, T* dest, size_t count) { } } +/** + * @brief Given an Napi;:Value, this function returns the value as a Uint8Array, if the + * Value is a Uint8Array. Otherwise, this function throws. + * + * @param v - An Napi::Value + * @param argument_name - the name of the value, to use when constructing an error message. + * @return Napi::Uint8Array + */ Uint8Array Uint8ArrayFromValue(Value v, std::string argument_name) { if (!v.IsTypedArray() || v.As().TypedArrayType() != napi_uint8_array) { std::string error_message = "Parameter `" + argument_name + "` must be a Uint8Array."; diff --git a/test/index.test.js b/test/index.test.js index 79bd296..35ff5f8 100644 --- a/test/index.test.js +++ b/test/index.test.js @@ -1,52 +1,47 @@ -const { describe, test } = require("mocha"); -const { compress, decompress } = require("bindings")("zstd"); +const { describe, test } = require('mocha'); +const { compress, decompress } = require('bindings')('zstd'); -const zstdLegacy = require("@mongodb-js/zstd"); -const { expect } = require("chai"); +const zstdLegacy = require('@mongodb-js/zstd'); +const { expect } = require('chai'); -describe("compress", function () { - - describe("compat tests", function () { - describe("old compress, new decompress", testSuite(zstdLegacy.decompress, compress)); - describe("new compress, decompress", testSuite(decompress, zstdLegacy.compress)); - describe("new compress, new decompress", testSuite(decompress, compress)); +describe('compress', function () { + describe('compat tests', function () { + describe('old compress, new decompress', testSuite(zstdLegacy.decompress, compress)); + describe('new compress, decompress', testSuite(decompress, zstdLegacy.compress)); + describe('new compress, new decompress', testSuite(decompress, compress)); }); }); -function testSuite( - decompress, compress -) { - test("empty", async function () { - const input = Buffer.from("", "utf8"); - const result = await decompress(await compress(input)); - expect(result.toString("utf8")).to.deep.equal(""); - }); +/** + * @param {import('../index').decompress} decompress + * @param {import('../index').compress} compress + */ +function testSuite(decompress, compress) { + return function () { + test('empty', async function () { + const input = Buffer.from('', 'utf8'); + const result = await decompress(await compress(input)); + expect(result.toString('utf8')).to.deep.equal(''); + }); - test("one element", async function () { - const input = Buffer.from("a", "utf8"); - const result = Buffer.from( - await decompress(await compress(input)), - ); - expect(result.toString("utf8")).to.deep.equal("a"); - }); + test('one element', async function () { + const input = Buffer.from('a', 'utf8'); + const result = Buffer.from(await decompress(await compress(input))); + expect(result.toString('utf8')).to.deep.equal('a'); + }); - test("typical length string", async function () { - const input = Buffer.from("hello, world! my name is bailey", "utf8"); - const result = Buffer.from( - await decompress(await compress(input)), - ); - expect(result.toString("utf8")).to.deep.equal( - "hello, world! my name is bailey", - ); - }); + test('typical length string', async function () { + const input = Buffer.from('hello, world! my name is bailey', 'utf8'); + const result = Buffer.from(await decompress(await compress(input))); + expect(result.toString('utf8')).to.deep.equal('hello, world! my name is bailey'); + }); - test("huge array", async function () { - const input_expected = Array.from({ length: 1_000 }, () => "a").join( - "", - ); - const input = Buffer.from(input_expected, "utf8"); + test('huge array', async function () { + const input_expected = Array.from({ length: 1_000 }, () => 'a').join(''); + const input = Buffer.from(input_expected, 'utf8'); - const result = Buffer.from(await decompress(await compress(input))); - expect(result.toString("utf8")).to.deep.equal(input_expected); - }); -} \ No newline at end of file + const result = Buffer.from(await decompress(await compress(input))); + expect(result.toString('utf8')).to.deep.equal(input_expected); + }); + }; +}