Skip to content

Commit

Permalink
address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
kszucs committed Feb 27, 2025
1 parent 863ff9e commit 4a6bbdc
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 28 deletions.
41 changes: 18 additions & 23 deletions cpp/src/parquet/column_chunker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -592,9 +592,7 @@ void ContentDefinedChunker::Roll(const T value) {
auto bytes = reinterpret_cast<const uint8_t*>(&value);
for (size_t i = 0; i < BYTE_WIDTH; ++i) {
rolling_hash_ = (rolling_hash_ << 1) + GEAR_HASH_TABLE[nth_run_][bytes[i]];
if ((rolling_hash_ & hash_mask_) == 0) {
has_matched_ = true;
}
has_matched_ = has_matched_ || ((rolling_hash_ & hash_mask_) == 0);
}
}

Expand All @@ -608,15 +606,13 @@ void ContentDefinedChunker::Roll(std::string_view value) {
for (char c : value) {
rolling_hash_ =
(rolling_hash_ << 1) + GEAR_HASH_TABLE[nth_run_][static_cast<uint8_t>(c)];
if ((rolling_hash_ & hash_mask_) == 0) {
has_matched_ = true;
}
has_matched_ = has_matched_ || ((rolling_hash_ & hash_mask_) == 0);
}
}

bool ContentDefinedChunker::Check() {
bool ContentDefinedChunker::NeedNewChunk() {
// decide whether to create a new chunk based on the rolling hash; has_matched_ is
// set to true if we encountered a match since the last Check() call
// set to true if we encountered a match since the last NeedNewChunk() call
if (ARROW_PREDICT_FALSE(has_matched_)) {
has_matched_ = false;
// in order to have a normal distribution of chunk sizes, we only create a new chunk
Expand All @@ -631,7 +627,8 @@ bool ContentDefinedChunker::Check() {
}
if (ARROW_PREDICT_FALSE(chunk_size_ >= max_size_)) {
// we have a hard limit on the maximum chunk size, not that we don't reset the rolling
// hash state here, so the next Check() call will continue from the current state
// hash state here, so the next NeedNewChunk() call will continue from the current
// state
chunk_size_ = 0;
return true;
}
Expand All @@ -643,7 +640,7 @@ const std::vector<Chunk> ContentDefinedChunker::Calculate(const int16_t* def_lev
const int16_t* rep_levels,
int64_t num_levels,
const T& leaf_array) {
std::vector<Chunk> result;
std::vector<Chunk> chunks;
bool has_def_levels = level_info_.def_level > 0;
bool has_rep_levels = level_info_.rep_level > 0;

Expand All @@ -654,13 +651,13 @@ const std::vector<Chunk> ContentDefinedChunker::Calculate(const int16_t* def_lev
while (offset < num_levels) {
Roll(leaf_array.GetView(offset));
++offset;
if (Check()) {
result.emplace_back(prev_offset, prev_offset, offset - prev_offset);
if (NeedNewChunk()) {
chunks.emplace_back(prev_offset, prev_offset, offset - prev_offset);
prev_offset = offset;
}
}
if (prev_offset < num_levels) {
result.emplace_back(prev_offset, prev_offset, num_levels - prev_offset);
chunks.emplace_back(prev_offset, prev_offset, num_levels - prev_offset);
}
} else if (!has_rep_levels) {
// non-nested data with nulls
Expand All @@ -670,26 +667,25 @@ const std::vector<Chunk> ContentDefinedChunker::Calculate(const int16_t* def_lev
Roll(def_levels[offset]);
Roll(leaf_array.GetView(offset));
++offset;
if (Check()) {
result.emplace_back(prev_offset, prev_offset, offset - prev_offset);
if (NeedNewChunk()) {
chunks.emplace_back(prev_offset, prev_offset, offset - prev_offset);
prev_offset = offset;
}
}
if (prev_offset < num_levels) {
result.emplace_back(prev_offset, prev_offset, num_levels - prev_offset);
chunks.emplace_back(prev_offset, prev_offset, num_levels - prev_offset);
}
} else {
// nested data with nulls
bool has_leaf_value;
bool is_record_boundary;
int16_t def_level;
int16_t rep_level;
int64_t level_offset = 0;
int64_t value_offset = 0;
int64_t record_level_offset = 0;
int64_t record_value_offset = 0;

while (level_offset < num_levels) {
for (int64_t level_offset = 0; level_offset < num_levels; ++level_offset) {
def_level = def_levels[level_offset];
rep_level = rep_levels[level_offset];

Expand All @@ -702,28 +698,27 @@ const std::vector<Chunk> ContentDefinedChunker::Calculate(const int16_t* def_lev
Roll(leaf_array.GetView(value_offset));
}

if (is_record_boundary && Check()) {
if (is_record_boundary && NeedNewChunk()) {
auto levels_to_write = level_offset - record_level_offset;
if (levels_to_write > 0) {
result.emplace_back(record_level_offset, record_value_offset, levels_to_write);
chunks.emplace_back(record_level_offset, record_value_offset, levels_to_write);
record_level_offset = level_offset;
record_value_offset = value_offset;
}
}

++level_offset;
if (has_leaf_value) {
++value_offset;
}
}

auto levels_to_write = num_levels - record_level_offset;
if (levels_to_write > 0) {
result.emplace_back(record_level_offset, record_value_offset, levels_to_write);
chunks.emplace_back(record_level_offset, record_value_offset, levels_to_write);
}
}

return result;
return chunks;
}

#define PRIMITIVE_CASE(TYPE_ID, ArrowType) \
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/parquet/column_chunker.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class ContentDefinedChunker {
template <typename T>
void Roll(const T value);
void Roll(std::string_view value);
inline bool Check();
inline bool NeedNewChunk();

template <typename T>
const std::vector<Chunk> Calculate(const int16_t* def_levels, const int16_t* rep_levels,
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/parquet/column_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1337,7 +1337,7 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
bits_buffer_->ZeroPadding();
}

if (this->properties_->cdc_enabled()) {
if (properties_->cdc_enabled()) {
ARROW_ASSIGN_OR_RAISE(auto boundaries,
content_defined_chunker_.GetBoundaries(
def_levels, rep_levels, num_levels, leaf_array));
Expand Down
6 changes: 3 additions & 3 deletions python/pyarrow/_parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -2002,10 +2002,10 @@ cdef shared_ptr[WriterProperties] _create_writer_properties(
props.dictionary_pagesize_limit(dictionary_pagesize_limit)

# content defined chunking
if cdc is False:
props.disable_cdc()
elif cdc is True:
if cdc:
props.enable_cdc()
else:
props.disable_cdc()

if cdc_size_range is not None:
min_size, max_size = cdc_size_range
Expand Down
72 changes: 72 additions & 0 deletions python/run_test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
set -e

# -DARROW_USE_ASAN=OFF \
# -DARROW_USE_UBSAN=OFF \
# -DARROW_USE_TSAN=OFF \

SCRIPT_DIR=$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" &> /dev/null && pwd)
ARROW_DIR=${SCRIPT_DIR}/..
export ARROW_BUILD_TYPE=${ARROW_BUILD_TYPE:-debug}
export ARROW_HOME=$CONDA_PREFIX
export PARQUET_TEST_DATA=${ARROW_DIR}/cpp/submodules/parquet-testing/data
export ARROW_TEST_DATA=${ARROW_DIR}/testing/data

export ARROW_HDFS_TEST_HOST=impala
export ARROW_HDFS_TEST_PORT=8020
export ARROW_HDFS_TEST_USER=hdfs

mkdir -p ${ARROW_DIR}/cpp/build
pushd ${ARROW_DIR}/cpp/build

cmake -GNinja \
-DARROW_BUILD_BENCHMARKS=OFF \
-DARROW_BUILD_STATIC=OFF \
-DARROW_BUILD_TESTS=ON \
-DARROW_USE_ASAN=OFF \
-DARROW_DATASET=ON \
-DARROW_EXTRA_ERROR_CONTEXT=ON \
-DARROW_BUILD_INTEGRATION=ON \
-DARROW_DEPENDENCY_SOURCE=CONDA \
-DARROW_FLIGHT=OFF \
-DARROW_GANDIVA=OFF \
-DARROW_JEMALLOC=ON \
-DARROW_MIMALLOC=ON \
-DARROW_WITH_SNAPPY=ON \
-DARROW_WITH_LZ4=ON \
-DARROW_WITH_ZSTD=ON \
-DARROW_COMPUTE=ON \
-DARROW_PARQUET=ON \
-DARROW_CSV=ON \
-DARROW_ORC=OFF \
-DARROW_USE_CCACHE=ON \
-DARROW_S3=ON \
-DARROW_TEST_MEMCHECK=OFF \
-DCMAKE_BUILD_TYPE=$ARROW_BUILD_TYPE \
-DCMAKE_EXPORT_COMPILE_COMMANDS=YES \
-DCMAKE_INSTALL_PREFIX=$ARROW_HOME \
..

ninja
ninja install

popd

export PYARROW_CMAKE_GENERATOR=Ninja
export PYARROW_BUILD_TYPE=debug
export PYARROW_WITH_PARQUET=1
# export PYARROW_WITH_HDFS=1
# export PYARROW_WITH_GANDIVA=0
export PYARROW_WITH_DATASET=1
# export PYARROW_WITH_FLIGHT=1
export PYARROW_WITH_S3=1
export PYARROW_PARALLEL=8
# export PYARROW_WITH_ORC=1

# # export DYLD_INSERT_LIBRARIES=/Applications/Xcode.app/Contents/Developer/Toolchains/XcodeDefault.xctoolchain/usr/lib/clang/12.0.0/lib/darwin/libclang_rt.asan_osx_dynamic.dylib
# # export DYLD_INSERT_LIBRARIES=/Applications/Xcode.app/Contents/Developer/Toolchains/XcodeDefault.xctoolchain/usr/lib/clang/12.0.0/lib/darwin/libclang_rt.tsan_osx_dynamic.dylib

pushd ${ARROW_DIR}/python
#python setup.py build_ext --inplace
python setup.py develop
popd
# pytest -sv "$@"

0 comments on commit 4a6bbdc

Please sign in to comment.