diff --git a/ballista/core/src/serde/mod.rs b/ballista/core/src/serde/mod.rs index 5400b00ca..627ba92e5 100644 --- a/ballista/core/src/serde/mod.rs +++ b/ballista/core/src/serde/mod.rs @@ -125,28 +125,29 @@ pub struct BallistaLogicalExtensionCodec { } impl BallistaLogicalExtensionCodec { - // looks for a codec which can operate on this node - // returns a position of codec in the list. - // - // position is important with encoding process - // as there is a need to remember which codec - // in the list was used to encode message, - // so we can use it for decoding as well - - fn try_any( + /// looks for a codec which can operate on this node + /// returns a position of codec in the list and result. + /// + /// position is important with encoding process + /// as position of used codecs is needed + /// so the same codec can be used for decoding + + fn try_any( &self, - mut f: impl FnMut(&dyn LogicalExtensionCodec) -> Result, - ) -> Result<(u8, T)> { + mut f: impl FnMut(&dyn LogicalExtensionCodec) -> Result, + ) -> Result<(u32, R)> { let mut last_err = None; for (position, codec) in self.file_format_codecs.iter().enumerate() { match f(codec.as_ref()) { - Ok(node) => return Ok((position as u8, node)), + Ok(result) => return Ok((position as u32, result)), Err(err) => last_err = Some(err), } } Err(last_err.unwrap_or_else(|| { - DataFusionError::NotImplemented("Empty list of composed codecs".to_owned()) + DataFusionError::Internal( + "List of provided extended logical codecs is empty".to_owned(), + ) })) } } @@ -155,10 +156,12 @@ impl Default for BallistaLogicalExtensionCodec { fn default() -> Self { Self { default_codec: Arc::new(DefaultLogicalExtensionCodec {}), + // Position in this list is important as it will be used for decoding. + // If new codec is added it should go to last position. file_format_codecs: vec![ + Arc::new(ParquetLogicalExtensionCodec {}), Arc::new(CsvLogicalExtensionCodec {}), Arc::new(JsonLogicalExtensionCodec {}), - Arc::new(ParquetLogicalExtensionCodec {}), Arc::new(ArrowLogicalExtensionCodec {}), Arc::new(AvroLogicalExtensionCodec {}), ], @@ -210,19 +213,17 @@ impl LogicalExtensionCodec for BallistaLogicalExtensionCodec { buf: &[u8], ctx: &datafusion::prelude::SessionContext, ) -> Result> { - if !buf.is_empty() { - // gets codec id from input buffer - let codec_number = buf[0]; - let codec = self.file_format_codecs.get(codec_number as usize).ok_or( - DataFusionError::NotImplemented("Can't find required codex".to_owned()), - )?; - - codec.try_decode_file_format(&buf[1..], ctx) - } else { - Err(DataFusionError::NotImplemented( - "File format blob should have more than 0 bytes".to_owned(), - )) - } + let proto = FileFormatProto::decode(buf) + .map_err(|e| DataFusionError::Internal(e.to_string()))?; + + let codec = self + .file_format_codecs + .get(proto.encoder_position as usize) + .ok_or(DataFusionError::Internal( + "Can't find required codec in file codec list".to_owned(), + ))?; + + codec.try_decode_file_format(&proto.blob, ctx) } fn try_encode_file_format( @@ -230,18 +231,17 @@ impl LogicalExtensionCodec for BallistaLogicalExtensionCodec { buf: &mut Vec, node: Arc, ) -> Result<()> { - let mut encoded_format = vec![]; - let (codec_number, _) = self.try_any(|codec| { - codec.try_encode_file_format(&mut encoded_format, node.clone()) - })?; - // we need to remember which codec in the list was used to - // encode this node. - buf.push(codec_number); - - // save actual encoded node - buf.append(&mut encoded_format); - - Ok(()) + let mut blob = vec![]; + let (encoder_position, _) = + self.try_any(|codec| codec.try_encode_file_format(&mut blob, node.clone()))?; + + let proto = FileFormatProto { + encoder_position, + blob, + }; + proto + .encode(buf) + .map_err(|e| DataFusionError::Internal(e.to_string())) } } @@ -429,6 +429,25 @@ impl PhysicalExtensionCodec for BallistaPhysicalExtensionCodec { } } +/// FileFormatProto captures data encoded by file format codecs +/// +/// it captures position of codec used to encode FileFormat +/// and actual encoded value. +/// +/// capturing codec position is required, as same codec can decode +/// blobs encoded by different encoders (probability is low but it +/// happened in the past) +/// +#[derive(Clone, PartialEq, prost::Message)] +struct FileFormatProto { + /// encoder id used to encode blob + /// (to be used for decoding) + #[prost(uint32, tag = 1)] + pub encoder_position: u32, + #[prost(bytes, tag = 2)] + pub blob: Vec, +} + #[cfg(test)] mod test { use datafusion::{