Skip to content

Commit

Permalink
revert
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Jan 21, 2025
1 parent caa514d commit 56e9e07
Showing 1 changed file with 83 additions and 85 deletions.
168 changes: 83 additions & 85 deletions arrow-ipc/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2177,6 +2177,89 @@ mod tests {
assert_eq!(input_batch, output_batch);
}

#[test]
fn test_unaligned() {
let batch = RecordBatch::try_from_iter(vec![(
"i32",
Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _,
)])
.unwrap();

let gen = IpcDataGenerator {};
#[allow(deprecated)]
let mut dict_tracker = DictionaryTracker::new_with_preserve_dict_id(false, true);
let (_, encoded) = gen
.encoded_batch(&batch, &mut dict_tracker, &Default::default())
.unwrap();

let message = root_as_message(&encoded.ipc_message).unwrap();

// Construct an unaligned buffer
let mut buffer = MutableBuffer::with_capacity(encoded.arrow_data.len() + 1);
buffer.push(0_u8);
buffer.extend_from_slice(&encoded.arrow_data);
let b = Buffer::from(buffer).slice(1);
assert_ne!(b.as_ptr().align_offset(8), 0);

let ipc_batch = message.header_as_record_batch().unwrap();
let roundtrip = ArrayReader::try_new(
&b,
ipc_batch,
batch.schema(),
&Default::default(),
&message.version(),
)
.unwrap()
.with_require_alignment(false)
.read_record_batch()
.unwrap();
assert_eq!(batch, roundtrip);
}

#[test]
fn test_unaligned_throws_error_with_require_alignment() {
let batch = RecordBatch::try_from_iter(vec![(
"i32",
Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _,
)])
.unwrap();

let gen = IpcDataGenerator {};
#[allow(deprecated)]
let mut dict_tracker = DictionaryTracker::new_with_preserve_dict_id(false, true);
let (_, encoded) = gen
.encoded_batch(&batch, &mut dict_tracker, &Default::default())
.unwrap();

let message = root_as_message(&encoded.ipc_message).unwrap();

// Construct an unaligned buffer
let mut buffer = MutableBuffer::with_capacity(encoded.arrow_data.len() + 1);
buffer.push(0_u8);
buffer.extend_from_slice(&encoded.arrow_data);
let b = Buffer::from(buffer).slice(1);
assert_ne!(b.as_ptr().align_offset(8), 0);

let ipc_batch = message.header_as_record_batch().unwrap();
let result = ArrayReader::try_new(
&b,
ipc_batch,
batch.schema(),
&Default::default(),
&message.version(),
)
.unwrap()
.with_require_alignment(true)
.read_record_batch();

let error = result.unwrap_err();
assert_eq!(
error.to_string(),
"Invalid argument error: Misaligned buffers[0] in array of type Int32, \
offset from expected alignment of 4 by 1"
);
}

#[test]
fn test_file_with_massive_column_count() {
// 499_999 is upper limit for default settings (1_000_000)
Expand Down Expand Up @@ -2332,89 +2415,4 @@ mod tests {
assert_eq!(decoded_batch.expect("Failed to read RecordBatch"), batch);
});
}

#[test]
fn test_unaligned() {
let batch = RecordBatch::try_from_iter(vec![(
"i32",
Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _,
)])
.unwrap();

let gen = IpcDataGenerator {};
#[allow(deprecated)]
let mut dict_tracker = DictionaryTracker::new_with_preserve_dict_id(false, true);
let (_, encoded) = gen
.encoded_batch(&batch, &mut dict_tracker, &Default::default())
.unwrap();

let message = root_as_message(&encoded.ipc_message).unwrap();

// Construct an unaligned buffer
let mut buffer = MutableBuffer::with_capacity(encoded.arrow_data.len() + 1);
buffer.push(0_u8);
buffer.extend_from_slice(&encoded.arrow_data);
let b = Buffer::from(buffer).slice(1);
assert_ne!(b.as_ptr().align_offset(8), 0);

let ipc_batch = message.header_as_record_batch().unwrap();

let roundtrip = ArrayReader::try_new(
&b,
ipc_batch,
batch.schema(),
&Default::default(),
&message.version(),
)
.unwrap()
.with_require_alignment(false)
.read_record_batch()
.unwrap();
assert_eq!(batch, roundtrip);
}

#[test]
fn test_unaligned_throws_error_with_require_alignment() {
let batch = RecordBatch::try_from_iter(vec![(
"i32",
Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _,
)])
.unwrap();

let gen = IpcDataGenerator {};
#[allow(deprecated)]
let mut dict_tracker = DictionaryTracker::new_with_preserve_dict_id(false, true);
let (_, encoded) = gen
.encoded_batch(&batch, &mut dict_tracker, &Default::default())
.unwrap();

let message = root_as_message(&encoded.ipc_message).unwrap();

// Construct an unaligned buffer
let mut buffer = MutableBuffer::with_capacity(encoded.arrow_data.len() + 1);
buffer.push(0_u8);
buffer.extend_from_slice(&encoded.arrow_data);
let b = Buffer::from(buffer).slice(1);
assert_ne!(b.as_ptr().align_offset(8), 0);

let ipc_batch = message.header_as_record_batch().unwrap();

let result = ArrayReader::try_new(
&b,
ipc_batch,
batch.schema(),
&Default::default(),
&message.version(),
)
.unwrap()
.with_require_alignment(true)
.read_record_batch();

let error = result.unwrap_err();
assert_eq!(
error.to_string(),
"Invalid argument error: Misaligned buffers[0] in array of type Int32, \
offset from expected alignment of 4 by 1"
);
}
}

0 comments on commit 56e9e07

Please sign in to comment.