diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 3d86800fc282..4e91685519f3 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -387,7 +387,7 @@ pub struct ArrowReaderOptions { pub(crate) page_index_policy: PageIndexPolicy, /// If encryption is enabled, the file decryption properties can be provided #[cfg(feature = "encryption")] - pub(crate) file_decryption_properties: Option, + pub(crate) file_decryption_properties: Option>, } impl ArrowReaderOptions { @@ -508,7 +508,7 @@ impl ArrowReaderOptions { #[cfg(feature = "encryption")] pub fn with_file_decryption_properties( self, - file_decryption_properties: FileDecryptionProperties, + file_decryption_properties: Arc, ) -> Self { Self { file_decryption_properties: Some(file_decryption_properties), @@ -528,7 +528,7 @@ impl ArrowReaderOptions { /// This can be set via /// [`file_decryption_properties`][Self::with_file_decryption_properties]. #[cfg(feature = "encryption")] - pub fn file_decryption_properties(&self) -> Option<&FileDecryptionProperties> { + pub fn file_decryption_properties(&self) -> Option<&Arc> { self.file_decryption_properties.as_ref() } } @@ -572,8 +572,9 @@ impl ArrowReaderMetadata { let metadata = ParquetMetaDataReader::new().with_page_index_policy(options.page_index_policy); #[cfg(feature = "encryption")] - let metadata = - metadata.with_decryption_properties(options.file_decryption_properties.as_ref()); + let metadata = metadata.with_decryption_properties( + options.file_decryption_properties.as_ref().map(Arc::clone), + ); let metadata = metadata.parse_and_finish(reader)?; Self::try_new(Arc::new(metadata), options) } diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index fe6414705282..27f90e3d7bc6 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -181,7 +181,7 @@ impl AsyncFileReader for T { #[cfg(feature = "encryption")] let metadata_reader = metadata_reader.with_decryption_properties( - options.and_then(|o| o.file_decryption_properties.as_ref()), + options.and_then(|o| o.file_decryption_properties.as_ref().map(Arc::clone)), ); let parquet_metadata = metadata_reader.load_via_suffix_and_finish(self).await?; diff --git a/parquet/src/arrow/async_reader/store.rs b/parquet/src/arrow/async_reader/store.rs index 74cf169c61a0..5ac21567a12d 100644 --- a/parquet/src/arrow/async_reader/store.rs +++ b/parquet/src/arrow/async_reader/store.rs @@ -206,8 +206,9 @@ impl AsyncFileReader for ParquetObjectReader { #[cfg(feature = "encryption")] if let Some(options) = options { - metadata = metadata - .with_decryption_properties(options.file_decryption_properties.as_ref()); + metadata = metadata.with_decryption_properties( + options.file_decryption_properties.as_ref().map(Arc::clone), + ); } let metadata = if let Some(file_size) = self.file_size { diff --git a/parquet/src/encryption/decrypt.rs b/parquet/src/encryption/decrypt.rs index ad05c72aa0be..b5374066dfc3 100644 --- a/parquet/src/encryption/decrypt.rs +++ b/parquet/src/encryption/decrypt.rs @@ -438,16 +438,16 @@ impl DecryptionPropertiesBuilder { } /// Finalize the builder and return created [`FileDecryptionProperties`] - pub fn build(self) -> Result { + pub fn build(self) -> Result> { let keys = DecryptionKeys::Explicit(ExplicitDecryptionKeys { footer_key: self.footer_key, column_keys: self.column_keys, }); - Ok(FileDecryptionProperties { + Ok(Arc::new(FileDecryptionProperties { keys, aad_prefix: self.aad_prefix, footer_signature_verification: self.footer_signature_verification, - }) + })) } /// Specify the expected AAD prefix to be used for decryption. @@ -509,13 +509,13 @@ impl DecryptionPropertiesBuilderWithRetriever { } /// Finalize the builder and return created [`FileDecryptionProperties`] - pub fn build(self) -> Result { + pub fn build(self) -> Result> { let keys = DecryptionKeys::ViaRetriever(self.key_retriever); - Ok(FileDecryptionProperties { + Ok(Arc::new(FileDecryptionProperties { keys, aad_prefix: self.aad_prefix, footer_signature_verification: self.footer_signature_verification, - }) + })) } /// Specify the expected AAD prefix to be used for decryption. @@ -536,7 +536,7 @@ impl DecryptionPropertiesBuilderWithRetriever { #[derive(Clone, Debug)] pub(crate) struct FileDecryptor { - decryption_properties: FileDecryptionProperties, + decryption_properties: Arc, footer_decryptor: Arc, file_aad: Vec, } @@ -549,7 +549,7 @@ impl PartialEq for FileDecryptor { impl FileDecryptor { pub(crate) fn new( - decryption_properties: &FileDecryptionProperties, + decryption_properties: &Arc, footer_key_metadata: Option<&[u8]>, aad_file_unique: Vec, aad_prefix: Vec, @@ -565,7 +565,7 @@ impl FileDecryptor { Ok(Self { footer_decryptor: Arc::new(footer_decryptor), - decryption_properties: decryption_properties.clone(), + decryption_properties: Arc::clone(decryption_properties), file_aad, }) } diff --git a/parquet/src/encryption/encrypt.rs b/parquet/src/encryption/encrypt.rs index 80832d7303b5..f6ae9004f164 100644 --- a/parquet/src/encryption/encrypt.rs +++ b/parquet/src/encryption/encrypt.rs @@ -27,6 +27,7 @@ use crate::schema::types::{ColumnDescPtr, SchemaDescriptor}; use ring::rand::{SecureRandom, SystemRandom}; use std::collections::{HashMap, HashSet}; use std::io::Write; +use std::sync::Arc; #[derive(Debug, Clone, PartialEq)] struct EncryptionKey { @@ -288,13 +289,13 @@ impl EncryptionPropertiesBuilder { #[derive(Debug)] /// The encryption configuration for a single Parquet file pub(crate) struct FileEncryptor { - properties: FileEncryptionProperties, + properties: Arc, aad_file_unique: Vec, file_aad: Vec, } impl FileEncryptor { - pub(crate) fn new(properties: FileEncryptionProperties) -> Result { + pub(crate) fn new(properties: Arc) -> Result { // Generate unique AAD for file let rng = SystemRandom::new(); let mut aad_file_unique = vec![0u8; 8]; diff --git a/parquet/src/file/metadata/mod.rs b/parquet/src/file/metadata/mod.rs index 03975d6394c9..763025fe142b 100644 --- a/parquet/src/file/metadata/mod.rs +++ b/parquet/src/file/metadata/mod.rs @@ -1877,6 +1877,7 @@ mod tests { #[cfg(not(feature = "encryption"))] let base_expected_size = 2248; #[cfg(feature = "encryption")] + // Not as accurate as it should be: https://github.com/apache/arrow-rs/issues/8472 let base_expected_size = 2416; assert_eq!(parquet_meta.memory_size(), base_expected_size); @@ -1908,6 +1909,7 @@ mod tests { #[cfg(not(feature = "encryption"))] let bigger_expected_size = 2674; #[cfg(feature = "encryption")] + // Not as accurate as it should be: https://github.com/apache/arrow-rs/issues/8472 let bigger_expected_size = 2842; // more set fields means more memory usage diff --git a/parquet/src/file/metadata/parser.rs b/parquet/src/file/metadata/parser.rs index 5929e6440c52..cb7c67e9bf19 100644 --- a/parquet/src/file/metadata/parser.rs +++ b/parquet/src/file/metadata/parser.rs @@ -73,7 +73,7 @@ mod inner { ) -> Result { if encrypted_footer || self.file_decryption_properties.is_some() { crate::file::metadata::thrift::encryption::parquet_metadata_with_encryption( - self.file_decryption_properties.as_deref(), + self.file_decryption_properties.as_ref(), encrypted_footer, buf, ) diff --git a/parquet/src/file/metadata/reader.rs b/parquet/src/file/metadata/reader.rs index 231741de03ae..15b7264c90f4 100644 --- a/parquet/src/file/metadata/reader.rs +++ b/parquet/src/file/metadata/reader.rs @@ -180,9 +180,9 @@ impl ParquetMetaDataReader { #[cfg(feature = "encryption")] pub fn with_decryption_properties( mut self, - properties: Option<&FileDecryptionProperties>, + properties: Option>, ) -> Self { - self.file_decryption_properties = properties.cloned().map(std::sync::Arc::new); + self.file_decryption_properties = properties; self } @@ -1245,7 +1245,7 @@ mod async_tests { // just make sure the metadata is properly decrypted and read let expected = ParquetMetaDataReader::new() - .with_decryption_properties(Some(&decryption_properties)) + .with_decryption_properties(Some(decryption_properties)) .load_via_suffix_and_finish(input) .await .unwrap(); diff --git a/parquet/src/file/metadata/thrift/encryption.rs b/parquet/src/file/metadata/thrift/encryption.rs index ae7d5bbc2b1d..9744f0f7a6b5 100644 --- a/parquet/src/file/metadata/thrift/encryption.rs +++ b/parquet/src/file/metadata/thrift/encryption.rs @@ -17,8 +17,6 @@ //! Encryption support for Thrift serialization -use std::io::Write; - use crate::{ encryption::decrypt::{FileDecryptionProperties, FileDecryptor}, errors::{ParquetError, Result}, @@ -35,6 +33,8 @@ use crate::{ }, thrift_struct, thrift_union, }; +use std::io::Write; +use std::sync::Arc; thrift_struct!( pub(crate) struct AesGcmV1 { @@ -210,7 +210,7 @@ fn row_group_from_encrypted_thrift( /// [Parquet Spec]: https://github.com/apache/parquet-format#metadata /// [Parquet Encryption Spec]: https://parquet.apache.org/docs/file-format/data-pages/encryption/ pub(crate) fn parquet_metadata_with_encryption( - file_decryption_properties: Option<&FileDecryptionProperties>, + file_decryption_properties: Option<&Arc>, encrypted_footer: bool, buf: &[u8], ) -> Result { @@ -310,7 +310,7 @@ pub(crate) fn parquet_metadata_with_encryption( fn get_file_decryptor( encryption_algorithm: EncryptionAlgorithm, footer_key_metadata: Option<&[u8]>, - file_decryption_properties: &FileDecryptionProperties, + file_decryption_properties: &Arc, ) -> Result { match encryption_algorithm { EncryptionAlgorithm::AES_GCM_V1(algo) => { diff --git a/parquet/src/file/metadata/thrift/mod.rs b/parquet/src/file/metadata/thrift/mod.rs index 332f7689911d..ba707baa766f 100644 --- a/parquet/src/file/metadata/thrift/mod.rs +++ b/parquet/src/file/metadata/thrift/mod.rs @@ -356,6 +356,7 @@ fn validate_column_metadata(mask: u16) -> Result<()> { if mask & COL_META_ENCODINGS == 0 { return Err(general_err!("Required field encodings is missing")); } + if mask & COL_META_CODEC == 0 { return Err(general_err!("Required field codec is missing")); } diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs index 2f766e25e7f8..d7771b42e24a 100644 --- a/parquet/src/file/properties.rs +++ b/parquet/src/file/properties.rs @@ -169,7 +169,7 @@ pub struct WriterProperties { statistics_truncate_length: Option, coerce_types: bool, #[cfg(feature = "encryption")] - pub(crate) file_encryption_properties: Option, + pub(crate) file_encryption_properties: Option>, } impl Default for WriterProperties { @@ -432,7 +432,7 @@ impl WriterProperties { /// /// For more details see [`WriterPropertiesBuilder::with_file_encryption_properties`] #[cfg(feature = "encryption")] - pub fn file_encryption_properties(&self) -> Option<&FileEncryptionProperties> { + pub fn file_encryption_properties(&self) -> Option<&Arc> { self.file_encryption_properties.as_ref() } } @@ -506,7 +506,7 @@ impl WriterPropertiesBuilder { statistics_truncate_length: self.statistics_truncate_length, coerce_types: self.coerce_types, #[cfg(feature = "encryption")] - file_encryption_properties: self.file_encryption_properties, + file_encryption_properties: self.file_encryption_properties.map(Arc::new), } } @@ -965,7 +965,7 @@ impl From for WriterPropertiesBuilder { statistics_truncate_length: props.statistics_truncate_length, coerce_types: props.coerce_types, #[cfg(feature = "encryption")] - file_encryption_properties: props.file_encryption_properties, + file_encryption_properties: props.file_encryption_properties.map(Arc::unwrap_or_clone), } } } diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index a77ce266d14a..f85ca67412ac 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -213,12 +213,12 @@ impl SerializedFileWriter { properties: &WriterPropertiesPtr, schema_descriptor: &SchemaDescriptor, ) -> Result>> { - if let Some(file_encryption_properties) = &properties.file_encryption_properties { + if let Some(file_encryption_properties) = properties.file_encryption_properties() { file_encryption_properties.validate_encrypted_column_names(schema_descriptor)?; - Ok(Some(Arc::new(FileEncryptor::new( - file_encryption_properties.clone(), - )?))) + Ok(Some(Arc::new(FileEncryptor::new(Arc::clone( + file_encryption_properties, + ))?))) } else { Ok(None) } @@ -318,7 +318,7 @@ impl SerializedFileWriter { /// Writes magic bytes at the beginning of the file. #[cfg(feature = "encryption")] fn start_file(properties: &WriterPropertiesPtr, buf: &mut TrackedWrite) -> Result<()> { - let magic = get_file_magic(properties.file_encryption_properties.as_ref()); + let magic = get_file_magic(properties.file_encryption_properties.as_deref()); buf.write_all(magic)?; Ok(()) diff --git a/parquet/tests/encryption/encryption.rs b/parquet/tests/encryption/encryption.rs index 042ef6a1e704..f999abab95de 100644 --- a/parquet/tests/encryption/encryption.rs +++ b/parquet/tests/encryption/encryption.rs @@ -89,8 +89,8 @@ fn test_plaintext_footer_signature_verification() { .build() .unwrap(); - let options = ArrowReaderOptions::default() - .with_file_decryption_properties(decryption_properties.clone()); + let options = + ArrowReaderOptions::default().with_file_decryption_properties(decryption_properties); let result = ArrowReaderMetadata::load(&file, options.clone()); assert!(result.is_err()); assert!( @@ -148,8 +148,8 @@ fn test_non_uniform_encryption_disabled_aad_storage() { .unwrap(); let file = File::open(path).unwrap(); - let options = ArrowReaderOptions::default() - .with_file_decryption_properties(decryption_properties.clone()); + let options = + ArrowReaderOptions::default().with_file_decryption_properties(decryption_properties); let result = ArrowReaderMetadata::load(&file, options.clone()); assert!(result.is_err()); assert_eq!( @@ -279,8 +279,8 @@ fn test_uniform_encryption_plaintext_footer_with_key_retriever() { .build() .unwrap(); - let options = ArrowReaderOptions::default() - .with_file_decryption_properties(decryption_properties.clone()); + let options = + ArrowReaderOptions::default().with_file_decryption_properties(decryption_properties); let metadata = ArrowReaderMetadata::load(&file, options.clone()).unwrap(); // Write data into temporary file with plaintext footer and footer key metadata @@ -320,8 +320,8 @@ fn test_uniform_encryption_plaintext_footer_with_key_retriever() { .build() .unwrap(); - let options = ArrowReaderOptions::default() - .with_file_decryption_properties(decryption_properties.clone()); + let options = + ArrowReaderOptions::default().with_file_decryption_properties(decryption_properties); let _ = ArrowReaderMetadata::load(&temp_file, options.clone()).unwrap(); // Read temporary file with plaintext metadata using key retriever with invalid key @@ -334,8 +334,8 @@ fn test_uniform_encryption_plaintext_footer_with_key_retriever() { let decryption_properties = FileDecryptionProperties::with_key_retriever(key_retriever) .build() .unwrap(); - let options = ArrowReaderOptions::default() - .with_file_decryption_properties(decryption_properties.clone()); + let options = + ArrowReaderOptions::default().with_file_decryption_properties(decryption_properties); let result = ArrowReaderMetadata::load(&temp_file, options.clone()); assert!(result.is_err()); assert!( @@ -672,7 +672,7 @@ fn test_write_uniform_encryption_plaintext_footer() { // Try writing plaintext footer and then reading it with the correct footer key read_and_roundtrip_to_encrypted_file( &file, - decryption_properties.clone(), + Arc::clone(&decryption_properties), file_encryption_properties.clone(), ); @@ -928,8 +928,8 @@ fn test_write_encrypted_struct_field() { .with_column_key("struct_col.float64_col", column_key_2) .build() .unwrap(); - let options = ArrowReaderOptions::default() - .with_file_decryption_properties(decryption_properties.clone()); + let options = + ArrowReaderOptions::default().with_file_decryption_properties(decryption_properties); let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(temp_file, options).unwrap(); @@ -1036,7 +1036,7 @@ fn test_decrypt_page_index_non_uniform() { fn test_decrypt_page_index( path: &str, - decryption_properties: FileDecryptionProperties, + decryption_properties: Arc, ) -> Result<(), ParquetError> { let file = File::open(path)?; let options = ArrowReaderOptions::default() diff --git a/parquet/tests/encryption/encryption_async.rs b/parquet/tests/encryption/encryption_async.rs index ccbb2b0bff77..bd8877559405 100644 --- a/parquet/tests/encryption/encryption_async.rs +++ b/parquet/tests/encryption/encryption_async.rs @@ -433,7 +433,7 @@ async fn test_decrypt_page_index_non_uniform() { async fn test_decrypt_page_index( path: &str, - decryption_properties: FileDecryptionProperties, + decryption_properties: Arc, ) -> Result<(), ParquetError> { let mut file = File::open(&path).await?; @@ -450,7 +450,7 @@ async fn test_decrypt_page_index( async fn verify_encryption_test_file_read_async( file: &mut tokio::fs::File, - decryption_properties: FileDecryptionProperties, + decryption_properties: Arc, ) -> Result<(), ParquetError> { let options = ArrowReaderOptions::new().with_file_decryption_properties(decryption_properties); @@ -470,14 +470,14 @@ async fn verify_encryption_test_file_read_async( async fn read_and_roundtrip_to_encrypted_file_async( path: &str, - decryption_properties: FileDecryptionProperties, + decryption_properties: Arc, encryption_properties: FileEncryptionProperties, ) -> Result<(), ParquetError> { let temp_file = tempfile::tempfile().unwrap(); let mut file = File::open(&path).await.unwrap(); - let options = - ArrowReaderOptions::new().with_file_decryption_properties(decryption_properties.clone()); + let options = ArrowReaderOptions::new() + .with_file_decryption_properties(Arc::clone(&decryption_properties)); let arrow_metadata = ArrowReaderMetadata::load_async(&mut file, options).await?; let record_reader = ParquetRecordBatchStreamBuilder::new_with_metadata( file.try_clone().await?, @@ -759,7 +759,7 @@ async fn test_multi_threaded_encrypted_writing() { .unwrap(); let (record_batches, metadata) = - read_encrypted_file(&file, decryption_properties.clone()).unwrap(); + read_encrypted_file(&file, Arc::clone(&decryption_properties)).unwrap(); let schema = metadata.schema().clone(); let props = Arc::new( @@ -830,7 +830,7 @@ async fn test_multi_threaded_encrypted_writing() { // Check that the file was written correctly let (read_record_batches, read_metadata) = - read_encrypted_file(&temp_file, decryption_properties.clone()).unwrap(); + read_encrypted_file(&temp_file, decryption_properties).unwrap(); verify_encryption_test_data(read_record_batches, read_metadata.metadata()); // Check that file was encrypted @@ -860,7 +860,7 @@ async fn test_multi_threaded_encrypted_writing_deprecated() { .unwrap(); let (record_batches, metadata) = - read_encrypted_file(&file, decryption_properties.clone()).unwrap(); + read_encrypted_file(&file, Arc::clone(&decryption_properties)).unwrap(); let to_write: Vec<_> = record_batches .iter() .flat_map(|rb| rb.columns().to_vec()) @@ -920,7 +920,7 @@ async fn test_multi_threaded_encrypted_writing_deprecated() { // Check that the file was written correctly let (read_record_batches, read_metadata) = - read_encrypted_file(&temp_file, decryption_properties.clone()).unwrap(); + read_encrypted_file(&temp_file, decryption_properties).unwrap(); verify_encryption_double_test_data(read_record_batches, read_metadata.metadata()); // Check that file was encrypted diff --git a/parquet/tests/encryption/encryption_util.rs b/parquet/tests/encryption/encryption_util.rs index 9c28e7f05f1e..345078d28d93 100644 --- a/parquet/tests/encryption/encryption_util.rs +++ b/parquet/tests/encryption/encryption_util.rs @@ -28,7 +28,7 @@ use parquet::file::metadata::ParquetMetaData; use parquet::file::properties::WriterProperties; use std::collections::HashMap; use std::fs::File; -use std::sync::Mutex; +use std::sync::{Arc, Mutex}; pub(crate) fn verify_encryption_double_test_data( record_batches: Vec, @@ -219,10 +219,10 @@ pub(crate) fn verify_column_indexes(metadata: &ParquetMetaData) { pub(crate) fn read_encrypted_file( file: &File, - decryption_properties: FileDecryptionProperties, + decryption_properties: Arc, ) -> std::result::Result<(Vec, ArrowReaderMetadata), ParquetError> { - let options = ArrowReaderOptions::default() - .with_file_decryption_properties(decryption_properties.clone()); + let options = + ArrowReaderOptions::default().with_file_decryption_properties(decryption_properties); let metadata = ArrowReaderMetadata::load(file, options.clone())?; let builder = @@ -234,11 +234,12 @@ pub(crate) fn read_encrypted_file( pub(crate) fn read_and_roundtrip_to_encrypted_file( file: &File, - decryption_properties: FileDecryptionProperties, + decryption_properties: Arc, encryption_properties: FileEncryptionProperties, ) { // read example data - let (batches, metadata) = read_encrypted_file(file, decryption_properties.clone()).unwrap(); + let (batches, metadata) = + read_encrypted_file(file, Arc::clone(&decryption_properties)).unwrap(); // write example data to a temporary file let temp_file = tempfile::tempfile().unwrap(); @@ -264,7 +265,7 @@ pub(crate) fn read_and_roundtrip_to_encrypted_file( pub(crate) fn verify_encryption_test_file_read( file: File, - decryption_properties: FileDecryptionProperties, + decryption_properties: Arc, ) { let options = ArrowReaderOptions::default().with_file_decryption_properties(decryption_properties);