From 294c131704026db8b5f4d641791b25c8a3ac7faa Mon Sep 17 00:00:00 2001 From: Enrico Minack Date: Fri, 13 Dec 2024 15:34:41 +0100 Subject: [PATCH] Test writing multi-threaded --- .../dataset/file_parquet_encryption_test.cc | 116 +++++++++++++----- 1 file changed, 85 insertions(+), 31 deletions(-) diff --git a/cpp/src/arrow/dataset/file_parquet_encryption_test.cc b/cpp/src/arrow/dataset/file_parquet_encryption_test.cc index ae9f2e4522357..ccb916fa143be 100644 --- a/cpp/src/arrow/dataset/file_parquet_encryption_test.cc +++ b/cpp/src/arrow/dataset/file_parquet_encryption_test.cc @@ -120,21 +120,67 @@ class DatasetEncryptionTestBase : public testing::TestWithParam(GetParam()); auto dataset = std::make_shared(table_); EXPECT_OK_AND_ASSIGN(auto scanner_builder, dataset->NewScan()); + // ideally, we would have UseThreads(concurrently) here, but that is not working + // unless GH-26818 (https://github.com/apache/arrow/issues/26818) is fixed + ARROW_EXPECT_OK(scanner_builder->UseThreads(false)); EXPECT_OK_AND_ASSIGN(auto scanner, scanner_builder->Finish()); - FileSystemDatasetWriteOptions write_options; - write_options.file_write_options = parquet_file_write_options; - write_options.filesystem = file_system_; - write_options.base_dir = kBaseDir; - write_options.partitioning = partitioning_; - write_options.basename_template = "part{i}.parquet"; - ASSERT_OK(FileSystemDataset::Write(write_options, std::move(scanner))); + if (concurrently) { + // have a notable number of threads to exhibit multi-threading issues + ASSERT_OK_AND_ASSIGN(auto pool, arrow::internal::ThreadPool::Make(16)); + std::vector> threads; + + // write dataset above multiple times concurrently to see that is thread-safe. + for (size_t i = 1; i <= 100; ++i) { + FileSystemDatasetWriteOptions write_options; + write_options.file_write_options = parquet_file_write_options; + write_options.filesystem = file_system_; + write_options.base_dir = "thread-" + std::to_string(i); + write_options.partitioning = partitioning_; + write_options.basename_template = "part{i}.parquet"; + threads.push_back( + DeferNotOk(pool->Submit(FileSystemDataset::Write, write_options, scanner)) + ); + } + pool->WaitForIdle(); + + // assert all jobs succeeded + for (auto& thread : threads) { + thread.Wait(); + ASSERT_TRUE(thread.state() == FutureState::SUCCESS); + } + } else { + FileSystemDatasetWriteOptions write_options; + write_options.file_write_options = parquet_file_write_options; + write_options.filesystem = file_system_; + write_options.base_dir = kBaseDir; + write_options.partitioning = partitioning_; + write_options.basename_template = "part{i}.parquet"; + ASSERT_OK(FileSystemDataset::Write(write_options, std::move(scanner))); + } } virtual void PrepareTableAndPartitioning() = 0; + Result> CreateDataset(std::string_view base_dir, const std::shared_ptr &file_format) { + // Get FileInfo objects for all files under the base directory + fs::FileSelector selector; + selector.base_dir = base_dir; + selector.recursive = true; + + FileSystemFactoryOptions factory_options; + factory_options.partitioning = partitioning_; + factory_options.partition_base_dir = base_dir; + ARROW_ASSIGN_OR_RAISE(auto dataset_factory, + FileSystemDatasetFactory::Make(file_system_, selector, file_format, factory_options)); + + // Create the dataset + return dataset_factory->Finish(); + } + void TestScanDataset() { // Create decryption properties. auto decryption_config = @@ -152,23 +198,13 @@ class DatasetEncryptionTestBase : public testing::TestWithParam(); file_format->default_fragment_scan_options = std::move(parquet_scan_options); - // Get FileInfo objects for all files under the base directory - fs::FileSelector selector; - selector.base_dir = kBaseDir; - selector.recursive = true; - - FileSystemFactoryOptions factory_options; - factory_options.partitioning = partitioning_; - factory_options.partition_base_dir = kBaseDir; - ASSERT_OK_AND_ASSIGN(auto dataset_factory, - FileSystemDatasetFactory::Make(file_system_, selector, - file_format, factory_options)); - - // Create the dataset - ASSERT_OK_AND_ASSIGN(auto dataset, dataset_factory->Finish()); + ASSERT_OK_AND_ASSIGN(auto expected_table, table_->CombineChunks()); auto concurrently = std::get<1>(GetParam()); if (concurrently) { + // Create the dataset + ASSERT_OK_AND_ASSIGN(auto dataset, CreateDataset("thread-1", file_format)); + // have a notable number of threads to exhibit multi-threading issues ASSERT_OK_AND_ASSIGN(auto pool, arrow::internal::ThreadPool::Make(16)); std::vector>> threads; @@ -183,13 +219,23 @@ class DatasetEncryptionTestBase : public testing::TestWithParamNewScan()); ARROW_ASSIGN_OR_RAISE(auto scanner, scanner_builder->Finish()); + ARROW_EXPECT_OK(scanner_builder->UseThreads(std::get<1>(GetParam()))); ARROW_ASSIGN_OR_RAISE(auto read_table, scanner->ToTable()); // Verify the data was read correctly @@ -208,6 +255,7 @@ class DatasetEncryptionTestBase : public testing::TestWithParam(GetParam()) ? "thread-1" : std::string(kBaseDir); std::shared_ptr file_system_; std::shared_ptr table_; std::shared_ptr partitioning_; @@ -252,7 +300,7 @@ TEST_P(DatasetEncryptionTest, WriteReadDatasetWithEncryption) { // Read a single parquet file with and without decryption properties. TEST_P(DatasetEncryptionTest, ReadSingleFile) { // Open the Parquet file. - ASSERT_OK_AND_ASSIGN(auto input, file_system_->OpenInputFile("part=a/part0.parquet")); + ASSERT_OK_AND_ASSIGN(auto input, file_system_->OpenInputFile(base_dir_ + "/part=a/part0.parquet")); // Try to read metadata without providing decryption properties // when the footer is encrypted. @@ -288,32 +336,38 @@ INSTANTIATE_TEST_SUITE_P(DatasetEncryptionTestThreaded, DatasetEncryptionTest, // GH-39444: This test covers the case where parquet dataset scanner crashes when // processing encrypted datasets over 2^15 rows in multi-threaded mode. -class LargeRowEncryptionTest : public DatasetEncryptionTestBase { +class LargeRowCountEncryptionTest : public DatasetEncryptionTestBase { public: // The dataset is partitioned using a Hive partitioning scheme. void PrepareTableAndPartitioning() override { // Specifically chosen to be greater than batch size for triggering prefetch. constexpr int kRowCount = 32769; + // Number of batches + constexpr int kBatchCount = 10; - // Create a random floating-point array with large number of rows. + // Create multiple random floating-point arrays with large number of rows. arrow::random::RandomArrayGenerator rand_gen(0); - auto array = rand_gen.Float32(kRowCount, 0.0, 1.0, false); + auto arrays = std::vector>(); + for (int i = 0; i < kBatchCount; i++) { + arrays.push_back(rand_gen.Float32(kRowCount, 0.0, 1.0, false)); + } + ASSERT_OK_AND_ASSIGN(auto column, ChunkedArray::Make(arrays, float32())); auto table_schema = schema({field("a", float32())}); // Prepare table and partitioning. - table_ = arrow::Table::Make(table_schema, {array}); + table_ = arrow::Table::Make(table_schema, {column}); partitioning_ = std::make_shared(arrow::schema({})); } }; // Test for writing and reading encrypted dataset with large row count. -TEST_P(LargeRowEncryptionTest, ReadEncryptLargeRows) { +TEST_P(LargeRowCountEncryptionTest, ReadEncryptLargeRowCount) { ASSERT_NO_FATAL_FAILURE(TestScanDataset()); } -INSTANTIATE_TEST_SUITE_P(LargeRowEncryptionTest, LargeRowEncryptionTest, +INSTANTIATE_TEST_SUITE_P(LargeRowCountEncryptionTest, LargeRowCountEncryptionTest, ::testing::Values(std::tuple(COLUMN_KEY, false), std::tuple(UNIFORM, false))); -INSTANTIATE_TEST_SUITE_P(LargeRowEncryptionTestThreaded, LargeRowEncryptionTest, +INSTANTIATE_TEST_SUITE_P(LargeRowCountEncryptionTestThreaded, LargeRowCountEncryptionTest, ::testing::Values(std::tuple(COLUMN_KEY, true), std::tuple(UNIFORM, true))); } // namespace dataset