diff --git a/mdb_shard/src/shard_file_handle.rs b/mdb_shard/src/shard_file_handle.rs index 816ac903..bc2a2a46 100644 --- a/mdb_shard/src/shard_file_handle.rs +++ b/mdb_shard/src/shard_file_handle.rs @@ -9,7 +9,7 @@ use crate::cas_structs::CASChunkSequenceHeader; use crate::error::{MDBShardError, Result}; use crate::file_structs::{FileDataSequenceEntry, MDBFileInfo}; use crate::shard_format::MDBShardInfo; -use crate::utils::{parse_shard_filename, shard_file_name, temp_shard_file_name}; +use crate::utils::{parse_shard_filename, shard_file_name, temp_shard_file_name, truncate_hash}; /// When a specific implementation of the #[derive(Debug, Clone, Default)] @@ -50,16 +50,20 @@ impl MDBShardFile { hashed_write = HashedWrite::new(out_file); std::io::copy(reader, &mut hashed_write)?; + hashed_write.flush()?; } // Get the hash - hashed_write.flush()?; let shard_hash = hashed_write.hash(); let full_file_name = target_directory.join(shard_file_name(&shard_hash)); std::fs::rename(&temp_file_name, &full_file_name)?; + let si = MDBShardInfo::load_from_file(reader)?; + + debug_assert_eq!(MDBShardInfo::load_from_file(&mut Cursor::new(&mut std::fs::read(&full_file_name)?))?, si); + Self::new(shard_hash, full_file_name, MDBShardInfo::load_from_file(reader)?) } @@ -141,7 +145,10 @@ impl MDBShardFile { include_chunk_lookup_table, )?; - Self::write_out_from_reader(target_directory, &mut Cursor::new(output_bytes)) + let written_out = Self::write_out_from_reader(target_directory, &mut Cursor::new(output_bytes))?; + written_out.verify_shard_integrity_debug_only(); + + Ok(written_out) } #[inline] @@ -255,6 +262,27 @@ impl MDBShardFile { } debug!("Integrity test passed for shard {:?}", &self.path); - // TODO: More parts; but this will at least succeed on the server end. + // Verify that the shard chunk lookup tables are correct. + + // Read from the lookup table section. + let mut read_truncated_hashes = self.read_all_truncated_hashes().unwrap(); + + let mut truncated_hashes = Vec::new(); + + let cas_blocks = self.shard.read_all_cas_blocks_full(&mut self.get_reader().unwrap()).unwrap(); + + // Read from the cas blocks + let mut cas_index = 0; + for ci in cas_blocks { + for (i, chunk) in ci.chunks.iter().enumerate() { + truncated_hashes.push((truncate_hash(&chunk.chunk_hash), (cas_index as u32, i as u32))); + } + cas_index += 1 + ci.chunks.len(); + } + + read_truncated_hashes.sort_by_key(|s| s.0); + truncated_hashes.sort_by_key(|s| s.0); + + assert_eq!(read_truncated_hashes, truncated_hashes); } } diff --git a/mdb_shard/src/shard_file_manager.rs b/mdb_shard/src/shard_file_manager.rs index 6e4a2f12..e2a1acc2 100644 --- a/mdb_shard/src/shard_file_manager.rs +++ b/mdb_shard/src/shard_file_manager.rs @@ -973,8 +973,15 @@ mod tests { continue; } - // Do some repeat keys to make sure that path is tested as well. - let key: HMACKey = rng_hash((i % 6) as u64); + let key: HMACKey = { + if i == 1 { + // This tests that the default route with no hmac translation is solid too + HMACKey::default() + } else { + // Do some repeat keys to make sure that path is tested as well. + rng_hash((i % 6) as u64) + } + }; let shard = MDBShardFile::load_from_file(p)?; @@ -989,7 +996,11 @@ mod tests { include_info, ) .unwrap(); - assert_eq!(out.chunk_hmac_key(), Some(key)); + if key != HMACKey::default() { + assert_eq!(out.chunk_hmac_key(), Some(key)); + } else { + assert_eq!(out.chunk_hmac_key(), None); + } } // Now, verify that everything still works great. diff --git a/mdb_shard/src/shard_format.rs b/mdb_shard/src/shard_format.rs index ca5d5b51..68dcf15f 100644 --- a/mdb_shard/src/shard_format.rs +++ b/mdb_shard/src/shard_format.rs @@ -54,7 +54,7 @@ pub fn current_timestamp() -> u64 { .as_secs() } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq)] pub struct MDBShardFileHeader { // Header to be determined? "XetHub MDB Shard File Version 1" pub tag: [u8; 32], @@ -274,7 +274,7 @@ impl MDBShardFileFooter { /// in the CAS info section that is the start of the CAS block, and the subsequent u32 gives /// the offset index of the chunk in that CAS block. -#[derive(Clone, Default, Debug)] +#[derive(Clone, Default, Debug, PartialEq)] pub struct MDBShardInfo { pub header: MDBShardFileHeader, pub metadata: MDBShardFileFooter, @@ -286,11 +286,11 @@ impl MDBShardInfo { // Move cursor to beginning of shard file. reader.rewind()?; - obj.header = MDBShardFileHeader::deserialize(reader).unwrap(); + obj.header = MDBShardFileHeader::deserialize(reader)?; // Move cursor to end of shard file minus footer size. reader.seek(SeekFrom::End(-MDB_SHARD_FOOTER_SIZE))?; - obj.metadata = MDBShardFileFooter::deserialize(reader).unwrap(); + obj.metadata = MDBShardFileFooter::deserialize(reader)?; Ok(obj) } @@ -599,11 +599,15 @@ impl MDBShardInfo { pub fn read_all_cas_blocks_full(&self, reader: &mut R) -> Result> { let mut ret = Vec::with_capacity(self.num_cas_entries()); - reader.seek(SeekFrom::Start(self.metadata.cas_info_offset))?; + let (cas_info_start, _cas_info_end) = self.cas_info_byte_range(); + + reader.seek(SeekFrom::Start(cas_info_start))?; while let Some(cas_info) = MDBCASInfo::deserialize(reader)? { + debug_assert!(reader.stream_position()? < _cas_info_end); ret.push(cas_info); } + debug_assert_eq!(reader.stream_position()?, _cas_info_end); Ok(ret) } @@ -954,8 +958,7 @@ impl MDBShardInfo { Ok(ret) } - /// Export the current shard as an hmac keyed shard, returning the number of bytes written and the hash of the - /// resulting data. + /// Export the current shard as an hmac keyed shard, returning the number of bytes written #[allow(clippy::too_many_arguments)] pub fn export_as_keyed_shard( &self, @@ -967,52 +970,151 @@ impl MDBShardInfo { include_cas_lookup_table: bool, include_chunk_lookup_table: bool, ) -> Result { - // Go through each of the sections, rewriting things as needed. + Self::export_as_keyed_shard_impl( + reader, + writer, + hmac_key, + key_valid_for, + include_file_info, + include_cas_lookup_table, + include_chunk_lookup_table, + Some(self), + ) + } + + /// Export the current shard as an hmac keyed shard, + #[allow(clippy::too_many_arguments)] + pub fn export_as_keyed_shard_streaming( + reader: &mut R, + writer: &mut W, + hmac_key: HMACKey, + key_valid_for: std::time::Duration, + include_file_info: bool, + include_cas_lookup_table: bool, + include_chunk_lookup_table: bool, + ) -> Result { + Self::export_as_keyed_shard_impl( + reader, + writer, + hmac_key, + key_valid_for, + include_file_info, + include_cas_lookup_table, + include_chunk_lookup_table, + None, + ) + } + /// Internal implementation of exporting the current shard as an hmac keyed shard, + #[allow(clippy::too_many_arguments)] + fn export_as_keyed_shard_impl( + reader: &mut R, + writer: &mut W, + hmac_key: HMACKey, + key_valid_for: std::time::Duration, + include_file_info: bool, + include_cas_lookup_table: bool, + include_chunk_lookup_table: bool, + // Pass this in when we have it so we can use debug asserts for verification checking in tests. + self_verification: Option<&Self>, + ) -> Result { + // The footer at the end that will hold each of these sections. let mut out_footer = MDBShardFileFooter::default(); + // Read in the header, verifying all the information. + let in_header = MDBShardFileHeader::deserialize(reader)?; + // Dump out the header. let mut byte_pos = 0; - byte_pos += self.header.serialize(writer)?; + byte_pos += in_header.serialize(writer)?; - let (file_info_start, file_info_end) = self.file_info_byte_range(); + // Read in all the file information. out_footer.file_info_offset = byte_pos as u64; - if include_file_info { - reader.seek(SeekFrom::Start(self.metadata.file_info_offset))?; + // Possibly save the lookup info here. + let mut file_lookup = Vec::<(u64, u32)>::new(); - // Okay to just copy these values over as there is nothing different between the two shards - // up to this point. - byte_pos += copy(&mut reader.take(file_info_end - file_info_start), writer)? as usize; - } else { - // Serialize a single block of 00 bytes as a guard for sequential reading. - byte_pos += FileDataSequenceHeader::bookend().serialize(writer)?; + // Index of entry for lookup table + let mut index: u32 = 0; + + // materialized bytes for later storage + + let mut materialized_bytes = 0; + + loop { + let file_metadata = FileDataSequenceHeader::deserialize(reader)?; + + if file_metadata.is_bookend() { + // Serialize the bookend struct and move on. + byte_pos += file_metadata.serialize(writer)?; + break; + } + + let num_entries = file_metadata.num_entries as usize; - reader.seek(SeekFrom::Start(file_info_end))?; + let mut n_extended_bytes = 0; + + if file_metadata.contains_verification() { + n_extended_bytes += num_entries * size_of::(); + } + + if file_metadata.contains_metadata_ext() { + n_extended_bytes += size_of::(); + } + + if include_file_info { + byte_pos += file_metadata.serialize(writer)?; + + // Need to read in the metadata values so we can calculate the materialized bytes + for _ in 0..num_entries { + let entry = FileDataSequenceEntry::deserialize(reader)?; + materialized_bytes += entry.unpacked_segment_bytes as u64; + byte_pos += entry.serialize(writer)?; + } + + // Okay to just copy the rest of values over as there is nothing different between the two shards + // up to this point. + if n_extended_bytes != 0 { + byte_pos += copy(&mut reader.take(n_extended_bytes as u64), writer)? as usize; + } + + // Put in the lookup information + file_lookup.push((truncate_hash(&file_metadata.file_hash), index)); + index += (1 + num_entries + n_extended_bytes / MDB_FILE_INFO_ENTRY_SIZE) as u32; + } else { + // Discard values until the next reader break. + copy(&mut reader.take(n_extended_bytes as u64), &mut std::io::sink())?; + } } - debug_assert_eq!(reader.stream_position()?, self.metadata.cas_info_offset); + if let Some(self_) = self_verification { + debug_assert_eq!(reader.stream_position()?, self_.metadata.cas_info_offset); + } + let mut cas_lookup = Vec::<(u64, u32)>::new(); + let mut chunk_lookup = Vec::<(u64, (u32, u32))>::new(); + + // Now deal with all the cas information out_footer.cas_info_offset = byte_pos as u64; - // Now, go through and convert all the CAS blocks, including rehashing the CAS block chunks. - debug_assert_eq!(reader.stream_position()?, self.metadata.cas_info_offset); + let mut cas_index = 0; + let mut stored_bytes_on_disk = 0; + let mut stored_bytes = 0; - let mut chunk_lookup = if include_chunk_lookup_table { - Vec::<(u64, (u32, u32))>::with_capacity(self.metadata.chunk_lookup_num_entry as usize) - } else { - Vec::<(u64, (u32, u32))>::new() - }; + loop { + let cas_metadata = CASChunkSequenceHeader::deserialize(reader)?; - let mut cas_index = 0; + // All metadata gets serialized. + byte_pos += cas_metadata.serialize(writer)?; - let (cas_info_start, cas_info_end) = self.cas_info_byte_range(); - debug_assert_eq!(reader.stream_position()?, cas_info_start); + if cas_metadata.is_bookend() { + break; + } - while reader.stream_position()? < cas_info_end { - let cas_header = CASChunkSequenceHeader::deserialize(reader)?; - byte_pos += cas_header.serialize(writer)?; + if include_cas_lookup_table { + cas_lookup.push((truncate_hash(&cas_metadata.cas_hash), cas_index)); + } - for chunk_index in 0..cas_header.num_entries { + for chunk_index in 0..cas_metadata.num_entries { let mut chunk = CASChunkSequenceEntry::deserialize(reader)?; // MAke sure we don't actually put things into an unusable state. @@ -1026,45 +1128,59 @@ impl MDBShardInfo { byte_pos += chunk.serialize(writer)?; } - cas_index += 1 + cas_header.num_entries; + + cas_index += 1 + cas_metadata.num_entries; + stored_bytes_on_disk += cas_metadata.num_bytes_on_disk as u64; + stored_bytes += cas_metadata.num_bytes_in_cas as u64; } - // The file lookup section. - let (file_lookup_start, file_lookup_end) = self.file_lookup_byte_range(); - debug_assert_eq!(reader.stream_position()?, file_lookup_start); + if let Some(self_) = self_verification { + debug_assert_eq!(reader.stream_position()?, self_.metadata.file_lookup_offset); + } + // Copy over all the file lookup information if that's appropriate. out_footer.file_lookup_offset = byte_pos as u64; if include_file_info { - byte_pos += copy(&mut reader.take(file_lookup_end - file_lookup_start), writer)? as usize; + if let Some(self_) = self_verification { + debug_assert_eq!(file_lookup.len(), self_.metadata.file_lookup_num_entry as usize); + } + + for &(key, idx) in file_lookup.iter() { + write_u64(writer, key)?; + write_u32(writer, idx)?; + } + + byte_pos += file_lookup.len() * (size_of::() + size_of::()); - out_footer.file_lookup_num_entry = self.metadata.file_lookup_num_entry; + out_footer.file_lookup_num_entry = file_lookup.len() as u64; } else { out_footer.file_lookup_num_entry = 0; - reader.seek(SeekFrom::Start(file_lookup_end))?; } // CAS lookup section. - let (cas_lookup_start, cas_lookup_end) = self.cas_lookup_byte_range(); - debug_assert_eq!(reader.stream_position()?, cas_lookup_start); - out_footer.cas_lookup_offset = byte_pos as u64; if include_cas_lookup_table { - // The cas lookup table should be the same, so just copy it directly - byte_pos += copy(&mut reader.take(cas_lookup_end - cas_lookup_start), writer)? as usize; + if let Some(self_) = self_verification { + debug_assert_eq!(cas_lookup.len(), self_.metadata.cas_lookup_num_entry as usize); + } + + for &(key, idx) in cas_lookup.iter() { + write_u64(writer, key)?; + write_u32(writer, idx)?; + } - out_footer.cas_lookup_num_entry = self.metadata.cas_lookup_num_entry; + byte_pos += cas_lookup.len() * (size_of::() + size_of::()); + + out_footer.cas_lookup_num_entry = cas_lookup.len() as u64; } else { out_footer.cas_lookup_num_entry = 0; - - // Normally we would put this in here to skip reading this section, but after this - // we're done with the file, and this seek is unneeded. - // reader.seek(SeekFrom::Start(self.metadata.chunk_lookup_offset))?; } out_footer.chunk_lookup_offset = byte_pos as u64; + // Chunk lookup section. if include_chunk_lookup_table { // This one is different now that it's hmac keyed, so we need to rebuild it. chunk_lookup.sort_by_key(|s| s.0); @@ -1076,6 +1192,7 @@ impl MDBShardInfo { } byte_pos += chunk_lookup.len() * (size_of::() + 2 * size_of::()); + out_footer.chunk_lookup_num_entry = chunk_lookup.len() as u64; } else { out_footer.chunk_lookup_num_entry = 0; @@ -1085,6 +1202,7 @@ impl MDBShardInfo { // Add in the timestamps. let creation_time = std::time::SystemTime::now(); + out_footer.shard_creation_timestamp = creation_time.duration_since(UNIX_EPOCH).unwrap_or_default().as_secs(); out_footer.shard_key_expiry = creation_time .add(key_valid_for) @@ -1093,8 +1211,9 @@ impl MDBShardInfo { .as_secs(); // Copy over the stored information elsewhere - out_footer.materialized_bytes = self.metadata.materialized_bytes; - out_footer.stored_bytes = self.metadata.stored_bytes; + out_footer.materialized_bytes = materialized_bytes; + out_footer.stored_bytes_on_disk = stored_bytes_on_disk; + out_footer.stored_bytes = stored_bytes; // And we're done here! out_footer.footer_offset = byte_pos as u64;