Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rewrite delta bitpack reader #912

Merged
merged 1 commit into from
Dec 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 43 additions & 13 deletions fastparquet/cencoding.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -214,22 +214,30 @@ cpdef void read_rle_bit_packed_hybrid(NumpyIO io_obj, int32_t width, uint32_t le


cdef void delta_read_bitpacked(NumpyIO file_obj, uint8_t bitwidth,
NumpyIO o, uint64_t count, uint8_t itemsize=4):
NumpyIO o, uint64_t count, uint8_t longval=0):
cdef:
uint64_t data = 0
int8_t stop = -bitwidth
int8_t left = 0
int8_t right = 0
uint64_t mask = 0XFFFFFFFFFFFFFFFF >> (64 - bitwidth)
while count > 0:
if stop < 0:
data = ((data & 0X00FFFFFFFFFFFFFF) << 8) | file_obj.read_byte()
stop += 8
if (left - right) < bitwidth:
data = data | (<uint64_t>file_obj.read_byte() << left)
left += 8
elif right > 8:
data >>= 8
left -= 8
right -= 8
else:
o.write_int((data >> stop) & mask)
stop -= bitwidth
if longval:
o.write_long((data >> right) & mask)
else:
o.write_int((data >> right) & mask)
right += bitwidth
count -= 1


cpdef void delta_binary_unpack(NumpyIO file_obj, NumpyIO o):
cpdef void delta_binary_unpack(NumpyIO file_obj, NumpyIO o, uint8_t longval=0):
cdef:
uint64_t block_size = read_unsigned_var_int(file_obj)
uint64_t miniblock_per_block = read_unsigned_var_int(file_obj)
Expand All @@ -248,19 +256,27 @@ cpdef void delta_binary_unpack(NumpyIO file_obj, NumpyIO o):
temp = o.loc
if count > 1:
# no more diffs if on last value
delta_read_bitpacked(file_obj, bitwidth, o, values_per_miniblock, count)
delta_read_bitpacked(file_obj, bitwidth, o, values_per_miniblock, longval)
o.loc = temp
for j in range(values_per_miniblock):
temp = o.read_int()
o.loc -= 4
o.write_int(value)
if longval:
temp = o.read_long()
o.loc -= 8
o.write_long(value)
else:
temp = o.read_int()
o.loc -= 4
o.write_int(value)
value += min_delta + temp
count -= 1
if count <= 0:
return
else:
for j in range(values_per_miniblock):
o.write_int(value)
if longval:
o.write_long(value)
else:
o.write_int(value)
value += min_delta
count -= 1
if count <= 0:
Expand Down Expand Up @@ -372,6 +388,20 @@ cdef class NumpyIO(object):
(<int32_t*> self.get_pointer())[0] = i
self.loc += 4

cdef void write_long(self, int64_t i):
if self.nbytes - self.loc < 8:
return
(<int64_t*> self.get_pointer())[0] = i
self.loc += 8

cdef int64_t read_long(self):
cdef int64_t i
if self.nbytes - self.loc < 8:
return 0
i = (<int64_t*> self.get_pointer())[0]
self.loc += 8
return i

cdef void write_many(self, char b, int32_t count):
cdef int32_t i
for i in range(count):
Expand Down
5 changes: 3 additions & 2 deletions fastparquet/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,10 @@ def read_data_page(f, helper, header, metadata, skip_nulls=False,
else:
values = np.zeros(nval, dtype=np.int8)
elif daph.encoding == parquet_thrift.Encoding.DELTA_BINARY_PACKED:
values = np.empty(daph.num_values - num_nulls, dtype=np.int32)
values = np.empty(daph.num_values - num_nulls,
dtype=np.int64 if metadata.type == 2 else np.int32)
o = encoding.NumpyIO(values.view('uint8'))
encoding.delta_binary_unpack(io_obj, o)
encoding.delta_binary_unpack(io_obj, o, longval=metadata.type == 2)
else:
raise NotImplementedError('Encoding %s' % daph.encoding)
return definition_levels, repetition_levels, values[:nval]
Expand Down
4 changes: 2 additions & 2 deletions fastparquet/test/test_encoding.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,8 @@ def test_delta_from_def_2():
# one and only miniblock
cencoding.encode_unsigned_varint(zigzag(-2), o) # minimum delta (zigzag)
o.write_byte(2) # bit-width list (only one)
o.write_byte(0b00000011) # [0, 0, 0, 3]
o.write_byte(0b11111100) # [3, 3, 3, pad]
o.write_byte(0b11000000) # rev([0, 0, 0, 3])
o.write_byte(0b00111111) # rev([3, 3, 3, pad])

o.seek(0)

Expand Down
Loading