diff --git a/src/bin/laminarmq_server.rs b/src/bin/laminarmq_server.rs index d84c80cfc..e8651b880 100644 --- a/src/bin/laminarmq_server.rs +++ b/src/bin/laminarmq_server.rs @@ -13,7 +13,6 @@ const THREAD_NAME: &str = "laminarmq_server_thread_0"; struct State; -#[cfg(not(tarpaulin_include))] #[instrument(skip(_shared_state))] async fn request_handler( _shared_state: Rc, @@ -37,7 +36,6 @@ async fn request_handler( } } -#[cfg(not(tarpaulin_include))] #[cfg(target_os = "linux")] fn main() { let fmt_subscriber = FmtSubscriber::builder() diff --git a/src/common/split.rs b/src/common/split.rs index a3d3414be..c9e918fd5 100644 --- a/src/common/split.rs +++ b/src/common/split.rs @@ -8,7 +8,6 @@ pub trait SplitAt: Deref + Sized { fn split_at(self, at: usize) -> Option<(Self, Self)>; } -#[cfg(not(tarpaulin_include))] impl SplitAt for Vec { fn split_at(mut self, at: usize) -> Option<(Self, Self)> { if at > self.len() { diff --git a/src/storage/commit_log/segmented_log/index.rs b/src/storage/commit_log/segmented_log/index.rs index e26c11e52..fcab4aae4 100644 --- a/src/storage/commit_log/segmented_log/index.rs +++ b/src/storage/commit_log/segmented_log/index.rs @@ -70,7 +70,7 @@ impl PersistentSizedRecord::from_usize(REPR_SIZE) @@ -79,7 +79,7 @@ impl PersistentSizedRecord Result> { - let index_record_storage_size = storage + let index_storage_size = storage .size() .to_usize() .ok_or(IndexError::IncompatibleSizeType)?; - let estimated_index_records_len = index_record_storage_size - .saturating_sub(INDEX_BASE_MARKER_LENGTH) - / INDEX_RECORD_LENGTH; + + let estimated_index_records_len = + index_storage_size.saturating_sub(INDEX_BASE_MARKER_LENGTH) / INDEX_RECORD_LENGTH; Ok(estimated_index_records_len) } @@ -258,9 +258,9 @@ where ) -> Result, IndexError> { let mut position = INDEX_BASE_MARKER_LENGTH as u64; - let mut index_records = Vec::::with_capacity( - Self::estimated_index_records_len_in_storage(storage)?, - ); + let estimated_index_records_len = Self::estimated_index_records_len_in_storage(storage)?; + + let mut index_records = Vec::::with_capacity(estimated_index_records_len); while let Ok(index_record) = PersistentSizedRecord::::read_at( @@ -275,8 +275,6 @@ where index_records.shrink_to_fit(); - let estimated_index_records_len = Self::estimated_index_records_len_in_storage(storage)?; - if index_records.len() != estimated_index_records_len { Err(IndexError::InconsistentIndexSize) } else { @@ -390,12 +388,9 @@ impl Sizable for Index { impl Index { #[inline] - fn underlying_storage_position( - normalized_index: usize, - ) -> Result> { - let storage_position = - (INDEX_BASE_MARKER_LENGTH + INDEX_RECORD_LENGTH * normalized_index) as u64; - u64_as_position!(storage_position, S::Position) + fn index_record_position(normalized_index: usize) -> Result> { + let position = (INDEX_BASE_MARKER_LENGTH + INDEX_RECORD_LENGTH * normalized_index) as u64; + u64_as_position!(position, S::Position) } } @@ -442,10 +437,9 @@ where .ok_or(IndexError::IndexGapEncountered) .map(|&x| x) } else { - let position = Self::underlying_storage_position(normalized_index)?; PersistentSizedRecord::::read_at( &self.storage, - &position, + &Self::index_record_position(normalized_index)?, ) .await .map(|x| x.into_inner()) @@ -496,7 +490,7 @@ where let normalized_index = self.internal_normalized_index(idx)?; self.storage - .truncate(&Self::underlying_storage_position(normalized_index)?) + .truncate(&Self::index_record_position(normalized_index)?) .await .map_err(IndexError::StorageError)?; @@ -539,7 +533,7 @@ pub(crate) mod test { }; use futures_lite::StreamExt; use num::{CheckedSub, FromPrimitive, ToPrimitive, Unsigned, Zero}; - use std::{future::Future, hash::Hasher, marker::PhantomData, ops::Deref}; + use std::{future::Future, hash::Hasher, marker::PhantomData}; fn _test_records_provider<'a, const N: usize>( record_source: &'a [&'a [u8; N]], @@ -556,9 +550,9 @@ pub(crate) mod test { where H: Hasher + Default, { - record_source - .map(|x| RecordHeader::compute::(x.deref())) - .scan((0, 0), |(index, position), record_header| { + record_source.map(|x| RecordHeader::compute::(x)).scan( + (0, 0), + |(index, position), record_header| { let index_record = IndexRecord::with_position_and_record_header::(*position, record_header) .unwrap(); @@ -567,7 +561,8 @@ pub(crate) mod test { *position += record_header.length as u32; Some(index_record) - }) + }, + ) } async fn _test_index_contains_records( diff --git a/src/storage/commit_log/segmented_log/mod.rs b/src/storage/commit_log/segmented_log/mod.rs index 9bd0d3e52..957adbedf 100644 --- a/src/storage/commit_log/segmented_log/mod.rs +++ b/src/storage/commit_log/segmented_log/mod.rs @@ -163,29 +163,28 @@ where read_segment_base_indices.len(), ); - let index_cache_read_segments = config.num_index_cached_read_segments.is_none(); - for segment_base_index in read_segment_base_indices { read_segments.push( - Segment::with_segment_storage_provider_config_and_base_index( + Segment::with_segment_storage_provider_config_base_index_and_cache_index_records_flag( &mut segment_storage_provider, config.segment_config, segment_base_index, - index_cache_read_segments, + config.num_index_cached_read_segments.is_none(), ) .await .map_err(SegmentedLogError::SegmentError)?, ); } - let write_segment = Segment::with_segment_storage_provider_config_and_base_index( - &mut segment_storage_provider, - config.segment_config, - write_segment_base_index, - true, // write segment is always cached - ) - .await - .map_err(SegmentedLogError::SegmentError)?; + let write_segment = + Segment::with_segment_storage_provider_config_base_index_and_cache_index_records_flag( + &mut segment_storage_provider, + config.segment_config, + write_segment_base_index, + true, // write segment is always cached + ) + .await + .map_err(SegmentedLogError::SegmentError)?; let cache = match config.num_index_cached_read_segments { Some(cache_capacity) => { @@ -213,7 +212,7 @@ where macro_rules! new_write_segment { ($segmented_log:ident, $base_index:ident) => { - Segment::with_segment_storage_provider_config_and_base_index( + Segment::with_segment_storage_provider_config_base_index_and_cache_index_records_flag( &mut $segmented_log.segment_storage_provider, $segmented_log.config.segment_config, $base_index, @@ -1011,7 +1010,7 @@ pub(crate) mod test { let record_count = segmented_log_stream .zip(futures_lite::stream::iter(expected_records)) .map(|(record, expected_record_value)| { - assert_eq!(record.value.deref(), expected_record_value.deref()); + assert_eq!(record.value.deref(), expected_record_value); Some(()) }) .count() @@ -1030,8 +1029,8 @@ pub(crate) mod test { .zip(segmented_log_stream_bounded) .zip(futures_lite::stream::iter(expected_records)) .map(|((record_x, record_y), expected_record_value)| { - assert_eq!(record_x.value.deref(), expected_record_value.deref()); - assert_eq!(record_y.value.deref(), expected_record_value.deref()); + assert_eq!(record_x.value.deref(), expected_record_value); + assert_eq!(record_y.value.deref(), expected_record_value); Some(()) }) .count() diff --git a/src/storage/commit_log/segmented_log/segment.rs b/src/storage/commit_log/segmented_log/segment.rs index 7dced5ac8..d9348d442 100644 --- a/src/storage/commit_log/segmented_log/segment.rs +++ b/src/storage/commit_log/segmented_log/segment.rs @@ -84,6 +84,7 @@ pub enum SegmentError { RecordMetadataNotFound, InvalidAppendIdx, InvalidIndexRecordGenerated, + UsizeU32Inconvertible, SegmentMaxed, } @@ -149,17 +150,21 @@ where .map_err(SegmentError::StoreError)?; let metadata_bytes_len_bytes_len = - SERP::serialized_size(&0_usize).map_err(SegmentError::SerializationError)?; + SERP::serialized_size(&0_u32).map_err(SegmentError::SerializationError)?; let (metadata_bytes_len_bytes, metadata_with_value) = record_content .split_at(metadata_bytes_len_bytes_len) .ok_or(SegmentError::RecordMetadataNotFound)?; - let metadata_size = SERP::deserialize(&metadata_bytes_len_bytes) + let metadata_bytes_len: u32 = SERP::deserialize(&metadata_bytes_len_bytes) .map_err(SegmentError::SerializationError)?; + let metadata_bytes_len: usize = metadata_bytes_len + .try_into() + .map_err(|_| SegmentError::UsizeU32Inconvertible)?; + let (metadata_bytes, value) = metadata_with_value - .split_at(metadata_size) + .split_at(metadata_bytes_len) .ok_or(SegmentError::RecordMetadataNotFound)?; let metadata = @@ -229,8 +234,13 @@ where let metadata_bytes = SERP::serialize(&metadata).map_err(SegmentError::SerializationError)?; + let metadata_bytes_len: u32 = metadata_bytes + .len() + .try_into() + .map_err(|_| SegmentError::UsizeU32Inconvertible)?; + let metadata_bytes_len_bytes = - SERP::serialize(&metadata_bytes.len()).map_err(SegmentError::SerializationError)?; + SERP::serialize(&metadata_bytes_len).map_err(SegmentError::SerializationError)?; enum SBuf { XBuf(XBuf), @@ -295,8 +305,13 @@ where let metadata_bytes = SERP::serialize(&metadata).map_err(SegmentError::SerializationError)?; + let metadata_bytes_len: u32 = metadata_bytes + .len() + .try_into() + .map_err(|_| SegmentError::UsizeU32Inconvertible)?; + let metadata_bytes_len_bytes = - SERP::serialize(&metadata_bytes.len()).map_err(SegmentError::SerializationError)?; + SERP::serialize(&metadata_bytes_len).map_err(SegmentError::SerializationError)?; let stream = futures_lite::stream::iter([ Ok::<&[u8], Infallible>(metadata_bytes_len_bytes.deref()), @@ -413,11 +428,11 @@ where Idx: Unsigned + FromPrimitive + Copy + Eq, SERP: SerializationProvider, { - pub async fn with_segment_storage_provider_config_and_base_index( + pub async fn with_segment_storage_provider_config_base_index_and_cache_index_records_flag( segment_storage_provider: &mut SSP, config: Config, base_index: Idx, - cache_index_records: bool, + cache_index_records_flag: bool, ) -> Result> where SSP: SegmentStorageProvider, @@ -427,7 +442,7 @@ where .await .map_err(SegmentError::StorageError)?; - let index = if cache_index_records { + let index = if cache_index_records_flag { Index::with_storage_and_base_index(segment_storage.index, base_index).await } else { Index::with_storage_index_records_option_and_validated_base_index( @@ -505,7 +520,7 @@ pub(crate) mod test { Size: FromPrimitive, SERP: SerializationProvider, { - let metadata_len_serialized_size = SERP::serialized_size(&0_usize).ok()?; + let metadata_len_serialized_size = SERP::serialized_size(&0_u32).ok()?; let metadata_serialized_size = SERP::serialized_size(&MetaWithIdx { metadata: M::default(), @@ -547,7 +562,7 @@ pub(crate) mod test { let config = _segment_config::(_RECORDS[0].len(), _RECORDS.len()).unwrap(); - let mut segment = Segment::::with_segment_storage_provider_config_and_base_index( + let mut segment = Segment::::with_segment_storage_provider_config_base_index_and_cache_index_records_flag( &mut _segment_storage_provider, config, segment_base_index, @@ -578,7 +593,7 @@ pub(crate) mod test { segment.close().await.unwrap(); - let mut segment = Segment::::with_segment_storage_provider_config_and_base_index( + let mut segment = Segment::::with_segment_storage_provider_config_base_index_and_cache_index_records_flag( &mut _segment_storage_provider, config, segment_base_index, @@ -654,7 +669,7 @@ pub(crate) mod test { segment.remove().await.unwrap(); - let segment = Segment::::with_segment_storage_provider_config_and_base_index( + let segment = Segment::::with_segment_storage_provider_config_base_index_and_cache_index_records_flag( &mut _segment_storage_provider, config, segment_base_index,