diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 1cf27424963..580e5662d5f 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -1141,6 +1141,7 @@ mod tests { use crate::basic::Encoding; use crate::data_type::AsBytes; + use crate::encryption::encryption::FileEncryptionProperties; use crate::file::metadata::ParquetMetaData; use crate::file::page_index::index::Index; use crate::file::page_index::index_reader::read_offset_indexes; @@ -3546,8 +3547,14 @@ mod tests { let file = tempfile::tempfile().unwrap(); // todo: add encryption + let key_code: &[u8] = "0123456789012345".as_bytes(); + let file_encryption_properties = FileEncryptionProperties::builder(key_code.to_vec()) + .build() + .unwrap(); + let props = WriterProperties::builder() .set_max_row_group_size(200) + .with_file_encryption_properties(file_encryption_properties) .build(); let mut writer = diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index 50156a26e27..5ac4e458492 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -1535,6 +1535,8 @@ mod tests { page::PageReader, reader::{get_column_reader, get_typed_column_reader, ColumnReaderImpl}, }; + #[cfg(feature = "encryption")] + use crate::encryption::encryption::FileEncryptionProperties; use crate::file::writer::TrackedWrite; use crate::file::{ properties::ReaderProperties, reader::SerializedPageReader, writer::SerializedPageWriter, @@ -3379,6 +3381,31 @@ mod tests { ); } + #[cfg(feature = "encryption")] + #[test] + fn test_encryption_writer() { + let message_type = " + message test_schema { + OPTIONAL BYTE_ARRAY a (UTF8); + } + "; + let schema = Arc::new(parse_message_type(message_type).unwrap()); + let file: File = tempfile::tempfile().unwrap(); + + let builder = WriterProperties::builder(); + let key_code: &[u8] = "0123456789012345".as_bytes(); + let file_encryption_properties = FileEncryptionProperties::builder(key_code.to_vec()) + .build() + .unwrap(); + + let props = Arc::new( + builder + .with_file_encryption_properties(file_encryption_properties) + .build(), + ); + let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap(); + } + #[test] fn test_increment_max_binary_chars() { let r = increment(vec![0xFF, 0xFE, 0xFD, 0xFF, 0xFF]); diff --git a/parquet/src/encryption/ciphers.rs b/parquet/src/encryption/ciphers.rs index 8385932671f..e8856e5db1d 100644 --- a/parquet/src/encryption/ciphers.rs +++ b/parquet/src/encryption/ciphers.rs @@ -16,9 +16,11 @@ // under the License. use crate::errors::Result; -use ring::aead::{Aad, LessSafeKey, UnboundKey, AES_128_GCM}; +use ring::aead::{Aad, LessSafeKey, NonceSequence, UnboundKey, AES_128_GCM}; +use ring::rand::{SecureRandom, SystemRandom}; use std::fmt::Debug; +const RIGHT_TWELVE: u128 = 0x0000_0000_ffff_ffff_ffff_ffff_ffff_ffff; const NONCE_LEN: usize = 12; const TAG_LEN: usize = 16; const SIZE_LEN: usize = 4; @@ -60,3 +62,79 @@ impl BlockDecryptor for RingGcmBlockDecryptor { Ok(result) } } + +pub trait BlockEncryptor: Debug + Send + Sync { + fn encrypt(&mut self, plaintext: &[u8], aad: &[u8]) -> Vec; +} + +#[derive(Debug, Clone)] +struct CounterNonce { + start: u128, + counter: u128, +} + +impl CounterNonce { + pub fn new(rng: &SystemRandom) -> Self { + let mut buf = [0; 16]; + rng.fill(&mut buf).unwrap(); + + // Since this is a random seed value, endianess doesn't matter at all, + // and we can use whatever is platform-native. + let start = u128::from_ne_bytes(buf) & RIGHT_TWELVE; + let counter = start.wrapping_add(1); + + Self { start, counter } + } + + /// One accessor for the nonce bytes to avoid potentially flipping endianess + #[inline] + pub fn get_bytes(&self) -> [u8; NONCE_LEN] { + self.counter.to_le_bytes()[0..NONCE_LEN].try_into().unwrap() + } +} + +impl NonceSequence for CounterNonce { + fn advance(&mut self) -> Result { + // If we've wrapped around, we've exhausted this nonce sequence + if (self.counter & RIGHT_TWELVE) == (self.start & RIGHT_TWELVE) { + Err(ring::error::Unspecified) + } else { + // Otherwise, just advance and return the new value + let buf: [u8; NONCE_LEN] = self.get_bytes(); + self.counter = self.counter.wrapping_add(1); + Ok(ring::aead::Nonce::assume_unique_for_key(buf)) + } + } +} + +#[derive(Debug, Clone)] +pub(crate) struct RingGcmBlockEncryptor { + key: LessSafeKey, + nonce_sequence: CounterNonce, +} + +impl RingGcmBlockEncryptor { + // todo TBD: some KMS systems produce data keys, need to be able to pass them to Encryptor. + // todo TBD: for other KMSs, we will create data keys inside arrow-rs, making sure to use SystemRandom + /// Create a new `RingGcmBlockEncryptor` with a given key and random nonce. + /// The nonce will advance appropriately with each block encryption and + /// return an error if it wraps around. + pub(crate) fn new(key_bytes: &[u8]) -> Self { + let rng = SystemRandom::new(); + + // todo support other key sizes + let key = UnboundKey::new(&AES_128_GCM, key_bytes.as_ref()).unwrap(); + let nonce = CounterNonce::new(&rng); + + Self { + key: LessSafeKey::new(key), + nonce_sequence: nonce, + } + } +} + +impl BlockEncryptor for RingGcmBlockEncryptor { + fn encrypt(&mut self, plaintext: &[u8], aad: &[u8]) -> Vec { + todo!() + } +} diff --git a/parquet/src/encryption/encryption.rs b/parquet/src/encryption/encryption.rs new file mode 100644 index 00000000000..f48672d4c62 --- /dev/null +++ b/parquet/src/encryption/encryption.rs @@ -0,0 +1,82 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; +use std::sync::Arc; +use crate::encryption::ciphers::{RingGcmBlockEncryptor, BlockEncryptor}; + +#[derive(Debug, Clone)] +pub struct FileEncryptionProperties { + encrypt_footer: bool, + footer_key: Vec, + column_keys: Option, Vec>>, + aad_prefix: Option>, +} + +impl FileEncryptionProperties { + pub fn builder(footer_key: Vec) -> EncryptionPropertiesBuilder { + EncryptionPropertiesBuilder::new(footer_key) + } +} + +pub struct EncryptionPropertiesBuilder { + footer_key: Vec, + column_keys: Option, Vec>>, + aad_prefix: Option>, +} + +impl EncryptionPropertiesBuilder { + pub fn new(footer_key: Vec) -> EncryptionPropertiesBuilder { + Self { + footer_key, + column_keys: None, + aad_prefix: None, + } + } + + pub fn build(self) -> crate::errors::Result { + Ok(FileEncryptionProperties { + encrypt_footer: true, + footer_key: self.footer_key, + column_keys: self.column_keys, + aad_prefix: self.aad_prefix, + }) + } +} + +#[derive(Clone, Debug)] +pub struct FileEncryptor { + encryption_properties: FileEncryptionProperties, + footer_encryptor: Option>, + file_aad: Vec, +} + +impl FileEncryptor { + pub(crate) fn new( + encryption_properties: FileEncryptionProperties, + aad_file_unique: Vec, + aad_prefix: Vec, + ) -> Self { + let file_aad = [aad_prefix.as_slice(), aad_file_unique.as_slice()].concat(); + let footer_encryptor = RingGcmBlockEncryptor::new(&encryption_properties.footer_key); + Self { + encryption_properties, + footer_encryptor: Some(Arc::new(footer_encryptor)), + file_aad, + } + } +} diff --git a/parquet/src/encryption/mod.rs b/parquet/src/encryption/mod.rs index 1e33bf4fbd6..f30464f0243 100644 --- a/parquet/src/encryption/mod.rs +++ b/parquet/src/encryption/mod.rs @@ -20,4 +20,5 @@ pub mod ciphers; pub mod decryption; +pub mod encryption; pub mod modules; diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs index a4ffb162d09..549cc91a8fb 100644 --- a/parquet/src/file/properties.rs +++ b/parquet/src/file/properties.rs @@ -18,6 +18,8 @@ //! Configuration via [`WriterProperties`] and [`ReaderProperties`] use crate::basic::{Compression, Encoding}; use crate::compression::{CodecOptions, CodecOptionsBuilder}; +#[cfg(feature = "encryption")] +use crate::encryption::encryption::FileEncryptionProperties; use crate::file::metadata::KeyValue; use crate::format::SortingColumn; use crate::schema::types::ColumnPath; @@ -372,6 +374,11 @@ impl WriterProperties { .and_then(|c| c.bloom_filter_properties()) .or_else(|| self.default_column_properties.bloom_filter_properties()) } + + #[cfg(feature = "encryption")] + pub fn file_encryption_properties(&self) -> Option<&FileEncryptionProperties> { + self.file_encryption_properties.as_ref() + } } /// Builder for [`WriterProperties`] parquet writer configuration. @@ -394,6 +401,8 @@ pub struct WriterPropertiesBuilder { column_index_truncate_length: Option, statistics_truncate_length: Option, coerce_types: bool, + #[cfg(feature = "encryption")] + file_encryption_properties: Option, } impl WriterPropertiesBuilder { @@ -416,6 +425,8 @@ impl WriterPropertiesBuilder { column_index_truncate_length: DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH, statistics_truncate_length: DEFAULT_STATISTICS_TRUNCATE_LENGTH, coerce_types: DEFAULT_COERCE_TYPES, + #[cfg(feature = "encryption")] + file_encryption_properties: None, } } @@ -438,6 +449,8 @@ impl WriterPropertiesBuilder { column_index_truncate_length: self.column_index_truncate_length, statistics_truncate_length: self.statistics_truncate_length, coerce_types: self.coerce_types, + #[cfg(feature = "encryption")] + file_encryption_properties: self.file_encryption_properties, } } @@ -810,6 +823,16 @@ impl WriterPropertiesBuilder { self.coerce_types = coerce_types; self } + + /// Sets FileEncryptionProperties. + #[cfg(feature = "encryption")] + pub fn with_file_encryption_properties( + mut self, + file_encryption_properties: FileEncryptionProperties, + ) -> Self { + self.file_encryption_properties = Some(file_encryption_properties); + self + } } /// Controls the level of statistics to be computed by the writer and stored in diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index 2fa2d2dcf91..d3c5f26159d 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -33,6 +33,8 @@ use crate::column::{ writer::{get_column_writer, ColumnWriter}, }; use crate::data_type::DataType; +use crate::encryption::ciphers::RingGcmBlockEncryptor; +use crate::encryption::encryption::FileEncryptor; use crate::errors::{ParquetError, Result}; use crate::file::properties::{BloomFilterPosition, WriterPropertiesPtr}; use crate::file::reader::ChunkReader; @@ -523,6 +525,9 @@ impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> { ) -> Result, { self.assert_previous_writer_closed()?; + let file_encryption_properties = self.props.file_encryption_properties(); + let file_encryptor = + FileEncryptor::new(file_encryption_properties.unwrap().clone(), vec![], vec![]); Ok(match self.next_column_desc() { Some(column) => { let props = self.props.clone();