From c3a8ff33e9fe4d3432bc607865b42916f17c3769 Mon Sep 17 00:00:00 2001 From: Arindam Das <dasarindam.mails@gmail.com> Date: Sat, 7 Oct 2023 20:29:54 +0530 Subject: [PATCH 01/10] style: remove redundant .deref() invocations --- src/storage/commit_log/segmented_log/index.rs | 4 ++-- src/storage/commit_log/segmented_log/mod.rs | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/storage/commit_log/segmented_log/index.rs b/src/storage/commit_log/segmented_log/index.rs index e26c11e52..175c16784 100644 --- a/src/storage/commit_log/segmented_log/index.rs +++ b/src/storage/commit_log/segmented_log/index.rs @@ -539,7 +539,7 @@ pub(crate) mod test { }; use futures_lite::StreamExt; use num::{CheckedSub, FromPrimitive, ToPrimitive, Unsigned, Zero}; - use std::{future::Future, hash::Hasher, marker::PhantomData, ops::Deref}; + use std::{future::Future, hash::Hasher, marker::PhantomData}; fn _test_records_provider<'a, const N: usize>( record_source: &'a [&'a [u8; N]], @@ -557,7 +557,7 @@ pub(crate) mod test { H: Hasher + Default, { record_source - .map(|x| RecordHeader::compute::<H>(x.deref())) + .map(|x| RecordHeader::compute::<H>(x)) .scan((0, 0), |(index, position), record_header| { let index_record = IndexRecord::with_position_and_record_header::<u32>(*position, record_header) diff --git a/src/storage/commit_log/segmented_log/mod.rs b/src/storage/commit_log/segmented_log/mod.rs index 9bd0d3e52..4a2ae9d58 100644 --- a/src/storage/commit_log/segmented_log/mod.rs +++ b/src/storage/commit_log/segmented_log/mod.rs @@ -1011,7 +1011,7 @@ pub(crate) mod test { let record_count = segmented_log_stream .zip(futures_lite::stream::iter(expected_records)) .map(|(record, expected_record_value)| { - assert_eq!(record.value.deref(), expected_record_value.deref()); + assert_eq!(record.value.deref(), expected_record_value); Some(()) }) .count() @@ -1030,8 +1030,8 @@ pub(crate) mod test { .zip(segmented_log_stream_bounded) .zip(futures_lite::stream::iter(expected_records)) .map(|((record_x, record_y), expected_record_value)| { - assert_eq!(record_x.value.deref(), expected_record_value.deref()); - assert_eq!(record_y.value.deref(), expected_record_value.deref()); + assert_eq!(record_x.value.deref(), expected_record_value); + assert_eq!(record_y.value.deref(), expected_record_value); Some(()) }) .count() From c8b5e849eddb588fef2166b528883382ba849f6f Mon Sep 17 00:00:00 2001 From: Arindam Das <dasarindam.mails@gmail.com> Date: Sat, 7 Oct 2023 20:33:10 +0530 Subject: [PATCH 02/10] style: correct local variable name to better reflect purpose --- src/storage/commit_log/segmented_log/index.rs | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/src/storage/commit_log/segmented_log/index.rs b/src/storage/commit_log/segmented_log/index.rs index 175c16784..ee2bc2f7c 100644 --- a/src/storage/commit_log/segmented_log/index.rs +++ b/src/storage/commit_log/segmented_log/index.rs @@ -70,7 +70,7 @@ impl<SR: SizedRecord, const REPR_SIZE: usize> PersistentSizedRecord<SR, REPR_SIZ where S: Storage, { - let index_record_bytes = source + let record_bytes = source .read( position, &<S::Size as FromPrimitive>::from_usize(REPR_SIZE) @@ -79,7 +79,7 @@ impl<SR: SizedRecord, const REPR_SIZE: usize> PersistentSizedRecord<SR, REPR_SIZ .await .map_err(IndexError::StorageError)?; - let mut cursor = Cursor::new(index_record_bytes.deref()); + let mut cursor = Cursor::new(record_bytes.deref()); SR::read(&mut cursor).map(Self).map_err(IndexError::IoError) } @@ -556,9 +556,9 @@ pub(crate) mod test { where H: Hasher + Default, { - record_source - .map(|x| RecordHeader::compute::<H>(x)) - .scan((0, 0), |(index, position), record_header| { + record_source.map(|x| RecordHeader::compute::<H>(x)).scan( + (0, 0), + |(index, position), record_header| { let index_record = IndexRecord::with_position_and_record_header::<u32>(*position, record_header) .unwrap(); @@ -567,7 +567,8 @@ pub(crate) mod test { *position += record_header.length as u32; Some(index_record) - }) + }, + ) } async fn _test_index_contains_records<S, Idx, I>( From 5455dfce4f1b84af3f5a3e9196f74ad9a1c56f5f Mon Sep 17 00:00:00 2001 From: Arindam Das <dasarindam.mails@gmail.com> Date: Sun, 8 Oct 2023 20:40:37 +0530 Subject: [PATCH 03/10] style: renamed local var to reflect purpose better --- src/storage/commit_log/segmented_log/index.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/storage/commit_log/segmented_log/index.rs b/src/storage/commit_log/segmented_log/index.rs index ee2bc2f7c..9793f7520 100644 --- a/src/storage/commit_log/segmented_log/index.rs +++ b/src/storage/commit_log/segmented_log/index.rs @@ -228,13 +228,13 @@ where pub fn estimated_index_records_len_in_storage( storage: &S, ) -> Result<usize, IndexError<S::Error>> { - let index_record_storage_size = storage + let index_storage_size = storage .size() .to_usize() .ok_or(IndexError::IncompatibleSizeType)?; - let estimated_index_records_len = index_record_storage_size - .saturating_sub(INDEX_BASE_MARKER_LENGTH) - / INDEX_RECORD_LENGTH; + + let estimated_index_records_len = + index_storage_size.saturating_sub(INDEX_BASE_MARKER_LENGTH) / INDEX_RECORD_LENGTH; Ok(estimated_index_records_len) } From 6949f229705873f01cb1db0ba933a008da001961 Mon Sep 17 00:00:00 2001 From: Arindam Das <dasarindam.mails@gmail.com> Date: Sun, 8 Oct 2023 22:01:10 +0530 Subject: [PATCH 04/10] style: shortened internal identifiers to ease reading --- src/storage/commit_log/segmented_log/index.rs | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/src/storage/commit_log/segmented_log/index.rs b/src/storage/commit_log/segmented_log/index.rs index 9793f7520..3ef1440bf 100644 --- a/src/storage/commit_log/segmented_log/index.rs +++ b/src/storage/commit_log/segmented_log/index.rs @@ -390,12 +390,9 @@ impl<S: Storage, Idx> Sizable for Index<S, Idx> { impl<S: Storage, Idx> Index<S, Idx> { #[inline] - fn underlying_storage_position( - normalized_index: usize, - ) -> Result<S::Position, IndexError<S::Error>> { - let storage_position = - (INDEX_BASE_MARKER_LENGTH + INDEX_RECORD_LENGTH * normalized_index) as u64; - u64_as_position!(storage_position, S::Position) + fn index_record_position(normalized_index: usize) -> Result<S::Position, IndexError<S::Error>> { + let position = (INDEX_BASE_MARKER_LENGTH + INDEX_RECORD_LENGTH * normalized_index) as u64; + u64_as_position!(position, S::Position) } } @@ -442,10 +439,9 @@ where .ok_or(IndexError::IndexGapEncountered) .map(|&x| x) } else { - let position = Self::underlying_storage_position(normalized_index)?; PersistentSizedRecord::<IndexRecord, INDEX_RECORD_LENGTH>::read_at( &self.storage, - &position, + &Self::index_record_position(normalized_index)?, ) .await .map(|x| x.into_inner()) @@ -496,7 +492,7 @@ where let normalized_index = self.internal_normalized_index(idx)?; self.storage - .truncate(&Self::underlying_storage_position(normalized_index)?) + .truncate(&Self::index_record_position(normalized_index)?) .await .map_err(IndexError::StorageError)?; From b7c167dcb9f2d504b936057b0b6c3c928c5624fc Mon Sep 17 00:00:00 2001 From: Arindam Das <dasarindam.mails@gmail.com> Date: Mon, 9 Oct 2023 18:26:52 +0530 Subject: [PATCH 05/10] refactor: avoid reading storage size multiple times --- src/storage/commit_log/segmented_log/index.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/storage/commit_log/segmented_log/index.rs b/src/storage/commit_log/segmented_log/index.rs index 3ef1440bf..fcab4aae4 100644 --- a/src/storage/commit_log/segmented_log/index.rs +++ b/src/storage/commit_log/segmented_log/index.rs @@ -258,9 +258,9 @@ where ) -> Result<Vec<IndexRecord>, IndexError<S::Error>> { let mut position = INDEX_BASE_MARKER_LENGTH as u64; - let mut index_records = Vec::<IndexRecord>::with_capacity( - Self::estimated_index_records_len_in_storage(storage)?, - ); + let estimated_index_records_len = Self::estimated_index_records_len_in_storage(storage)?; + + let mut index_records = Vec::<IndexRecord>::with_capacity(estimated_index_records_len); while let Ok(index_record) = PersistentSizedRecord::<IndexRecord, INDEX_RECORD_LENGTH>::read_at( @@ -275,8 +275,6 @@ where index_records.shrink_to_fit(); - let estimated_index_records_len = Self::estimated_index_records_len_in_storage(storage)?; - if index_records.len() != estimated_index_records_len { Err(IndexError::InconsistentIndexSize) } else { From 6a8273f2c5c33151addcd1b4fa4e8c773efaba81 Mon Sep 17 00:00:00 2001 From: Arindam Das <dasarindam.mails@gmail.com> Date: Fri, 13 Oct 2023 21:02:30 +0530 Subject: [PATCH 06/10] chore: removes unecessary tarpaulin cfgs --- src/bin/laminarmq_server.rs | 2 -- src/common/split.rs | 1 - 2 files changed, 3 deletions(-) diff --git a/src/bin/laminarmq_server.rs b/src/bin/laminarmq_server.rs index d84c80cfc..e8651b880 100644 --- a/src/bin/laminarmq_server.rs +++ b/src/bin/laminarmq_server.rs @@ -13,7 +13,6 @@ const THREAD_NAME: &str = "laminarmq_server_thread_0"; struct State; -#[cfg(not(tarpaulin_include))] #[instrument(skip(_shared_state))] async fn request_handler( _shared_state: Rc<State>, @@ -37,7 +36,6 @@ async fn request_handler( } } -#[cfg(not(tarpaulin_include))] #[cfg(target_os = "linux")] fn main() { let fmt_subscriber = FmtSubscriber::builder() diff --git a/src/common/split.rs b/src/common/split.rs index a3d3414be..c9e918fd5 100644 --- a/src/common/split.rs +++ b/src/common/split.rs @@ -8,7 +8,6 @@ pub trait SplitAt<T>: Deref<Target = [T]> + Sized { fn split_at(self, at: usize) -> Option<(Self, Self)>; } -#[cfg(not(tarpaulin_include))] impl<T> SplitAt<T> for Vec<T> { fn split_at(mut self, at: usize) -> Option<(Self, Self)> { if at > self.len() { From fd06c290f7c517d7abfffcfc73946631a8d8c0a9 Mon Sep 17 00:00:00 2001 From: Arindam Das <dasarindam.mails@gmail.com> Date: Fri, 13 Oct 2023 22:27:06 +0530 Subject: [PATCH 07/10] feat: represents metadata_bytes_len with a u32 in the serialized bytes layout --- .../commit_log/segmented_log/segment.rs | 27 ++++++++++++++----- 1 file changed, 21 insertions(+), 6 deletions(-) diff --git a/src/storage/commit_log/segmented_log/segment.rs b/src/storage/commit_log/segmented_log/segment.rs index 7dced5ac8..dce193428 100644 --- a/src/storage/commit_log/segmented_log/segment.rs +++ b/src/storage/commit_log/segmented_log/segment.rs @@ -84,6 +84,7 @@ pub enum SegmentError<StorageError, SerDeError> { RecordMetadataNotFound, InvalidAppendIdx, InvalidIndexRecordGenerated, + UsizeU32Inconvertible, SegmentMaxed, } @@ -149,17 +150,21 @@ where .map_err(SegmentError::StoreError)?; let metadata_bytes_len_bytes_len = - SERP::serialized_size(&0_usize).map_err(SegmentError::SerializationError)?; + SERP::serialized_size(&0_u32).map_err(SegmentError::SerializationError)?; let (metadata_bytes_len_bytes, metadata_with_value) = record_content .split_at(metadata_bytes_len_bytes_len) .ok_or(SegmentError::RecordMetadataNotFound)?; - let metadata_size = SERP::deserialize(&metadata_bytes_len_bytes) + let metadata_bytes_len: u32 = SERP::deserialize(&metadata_bytes_len_bytes) .map_err(SegmentError::SerializationError)?; + let metadata_bytes_len: usize = metadata_bytes_len + .try_into() + .map_err(|_| SegmentError::UsizeU32Inconvertible)?; + let (metadata_bytes, value) = metadata_with_value - .split_at(metadata_size) + .split_at(metadata_bytes_len) .ok_or(SegmentError::RecordMetadataNotFound)?; let metadata = @@ -229,8 +234,13 @@ where let metadata_bytes = SERP::serialize(&metadata).map_err(SegmentError::SerializationError)?; + let metadata_bytes_len: u32 = metadata_bytes + .len() + .try_into() + .map_err(|_| SegmentError::UsizeU32Inconvertible)?; + let metadata_bytes_len_bytes = - SERP::serialize(&metadata_bytes.len()).map_err(SegmentError::SerializationError)?; + SERP::serialize(&metadata_bytes_len).map_err(SegmentError::SerializationError)?; enum SBuf<XBuf, YBuf> { XBuf(XBuf), @@ -295,8 +305,13 @@ where let metadata_bytes = SERP::serialize(&metadata).map_err(SegmentError::SerializationError)?; + let metadata_bytes_len: u32 = metadata_bytes + .len() + .try_into() + .map_err(|_| SegmentError::UsizeU32Inconvertible)?; + let metadata_bytes_len_bytes = - SERP::serialize(&metadata_bytes.len()).map_err(SegmentError::SerializationError)?; + SERP::serialize(&metadata_bytes_len).map_err(SegmentError::SerializationError)?; let stream = futures_lite::stream::iter([ Ok::<&[u8], Infallible>(metadata_bytes_len_bytes.deref()), @@ -505,7 +520,7 @@ pub(crate) mod test { Size: FromPrimitive, SERP: SerializationProvider, { - let metadata_len_serialized_size = SERP::serialized_size(&0_usize).ok()?; + let metadata_len_serialized_size = SERP::serialized_size(&0_u32).ok()?; let metadata_serialized_size = SERP::serialized_size(&MetaWithIdx { metadata: M::default(), From 926af41ba30633205e967a22afea9ece2f6323b2 Mon Sep 17 00:00:00 2001 From: Arindam Das <dasarindam.mails@gmail.com> Date: Sat, 14 Oct 2023 12:16:21 +0530 Subject: [PATCH 08/10] refactor: rename Segment constructor to include all arguments --- src/storage/commit_log/segmented_log/mod.rs | 21 ++++++++++--------- .../commit_log/segmented_log/segment.rs | 8 +++---- 2 files changed, 15 insertions(+), 14 deletions(-) diff --git a/src/storage/commit_log/segmented_log/mod.rs b/src/storage/commit_log/segmented_log/mod.rs index 4a2ae9d58..db4b7e396 100644 --- a/src/storage/commit_log/segmented_log/mod.rs +++ b/src/storage/commit_log/segmented_log/mod.rs @@ -167,7 +167,7 @@ where for segment_base_index in read_segment_base_indices { read_segments.push( - Segment::with_segment_storage_provider_config_and_base_index( + Segment::with_segment_storage_provider_config_base_index_and_cache_index_records( &mut segment_storage_provider, config.segment_config, segment_base_index, @@ -178,14 +178,15 @@ where ); } - let write_segment = Segment::with_segment_storage_provider_config_and_base_index( - &mut segment_storage_provider, - config.segment_config, - write_segment_base_index, - true, // write segment is always cached - ) - .await - .map_err(SegmentedLogError::SegmentError)?; + let write_segment = + Segment::with_segment_storage_provider_config_base_index_and_cache_index_records( + &mut segment_storage_provider, + config.segment_config, + write_segment_base_index, + true, // write segment is always cached + ) + .await + .map_err(SegmentedLogError::SegmentError)?; let cache = match config.num_index_cached_read_segments { Some(cache_capacity) => { @@ -213,7 +214,7 @@ where macro_rules! new_write_segment { ($segmented_log:ident, $base_index:ident) => { - Segment::with_segment_storage_provider_config_and_base_index( + Segment::with_segment_storage_provider_config_base_index_and_cache_index_records( &mut $segmented_log.segment_storage_provider, $segmented_log.config.segment_config, $base_index, diff --git a/src/storage/commit_log/segmented_log/segment.rs b/src/storage/commit_log/segmented_log/segment.rs index dce193428..daa9e2a75 100644 --- a/src/storage/commit_log/segmented_log/segment.rs +++ b/src/storage/commit_log/segmented_log/segment.rs @@ -428,7 +428,7 @@ where Idx: Unsigned + FromPrimitive + Copy + Eq, SERP: SerializationProvider, { - pub async fn with_segment_storage_provider_config_and_base_index<SSP>( + pub async fn with_segment_storage_provider_config_base_index_and_cache_index_records<SSP>( segment_storage_provider: &mut SSP, config: Config<S::Size>, base_index: Idx, @@ -562,7 +562,7 @@ pub(crate) mod test { let config = _segment_config::<M, Idx, S::Size, SERP>(_RECORDS[0].len(), _RECORDS.len()).unwrap(); - let mut segment = Segment::<S, M, H, Idx, S::Size, SERP>::with_segment_storage_provider_config_and_base_index( + let mut segment = Segment::<S, M, H, Idx, S::Size, SERP>::with_segment_storage_provider_config_base_index_and_cache_index_records( &mut _segment_storage_provider, config, segment_base_index, @@ -593,7 +593,7 @@ pub(crate) mod test { segment.close().await.unwrap(); - let mut segment = Segment::<S, M, H, Idx, S::Size, SERP>::with_segment_storage_provider_config_and_base_index( + let mut segment = Segment::<S, M, H, Idx, S::Size, SERP>::with_segment_storage_provider_config_base_index_and_cache_index_records( &mut _segment_storage_provider, config, segment_base_index, @@ -669,7 +669,7 @@ pub(crate) mod test { segment.remove().await.unwrap(); - let segment = Segment::<S, M, H, Idx, S::Size, SERP>::with_segment_storage_provider_config_and_base_index( + let segment = Segment::<S, M, H, Idx, S::Size, SERP>::with_segment_storage_provider_config_base_index_and_cache_index_records( &mut _segment_storage_provider, config, segment_base_index, From c049c97c6160d764437569edb6d70d8210b144b2 Mon Sep 17 00:00:00 2001 From: Arindam Das <dasarindam.mails@gmail.com> Date: Sun, 22 Oct 2023 13:23:52 +0530 Subject: [PATCH 09/10] refactor: rename cache_index_records to cache_index_records_flag to better reflect purpose --- src/storage/commit_log/segmented_log/mod.rs | 6 +++--- src/storage/commit_log/segmented_log/segment.rs | 12 ++++++------ 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/src/storage/commit_log/segmented_log/mod.rs b/src/storage/commit_log/segmented_log/mod.rs index db4b7e396..bbe8ea287 100644 --- a/src/storage/commit_log/segmented_log/mod.rs +++ b/src/storage/commit_log/segmented_log/mod.rs @@ -167,7 +167,7 @@ where for segment_base_index in read_segment_base_indices { read_segments.push( - Segment::with_segment_storage_provider_config_base_index_and_cache_index_records( + Segment::with_segment_storage_provider_config_base_index_and_cache_index_records_flag( &mut segment_storage_provider, config.segment_config, segment_base_index, @@ -179,7 +179,7 @@ where } let write_segment = - Segment::with_segment_storage_provider_config_base_index_and_cache_index_records( + Segment::with_segment_storage_provider_config_base_index_and_cache_index_records_flag( &mut segment_storage_provider, config.segment_config, write_segment_base_index, @@ -214,7 +214,7 @@ where macro_rules! new_write_segment { ($segmented_log:ident, $base_index:ident) => { - Segment::with_segment_storage_provider_config_base_index_and_cache_index_records( + Segment::with_segment_storage_provider_config_base_index_and_cache_index_records_flag( &mut $segmented_log.segment_storage_provider, $segmented_log.config.segment_config, $base_index, diff --git a/src/storage/commit_log/segmented_log/segment.rs b/src/storage/commit_log/segmented_log/segment.rs index daa9e2a75..d9348d442 100644 --- a/src/storage/commit_log/segmented_log/segment.rs +++ b/src/storage/commit_log/segmented_log/segment.rs @@ -428,11 +428,11 @@ where Idx: Unsigned + FromPrimitive + Copy + Eq, SERP: SerializationProvider, { - pub async fn with_segment_storage_provider_config_base_index_and_cache_index_records<SSP>( + pub async fn with_segment_storage_provider_config_base_index_and_cache_index_records_flag<SSP>( segment_storage_provider: &mut SSP, config: Config<S::Size>, base_index: Idx, - cache_index_records: bool, + cache_index_records_flag: bool, ) -> Result<Self, SegmentError<S::Error, SERP::Error>> where SSP: SegmentStorageProvider<S, Idx>, @@ -442,7 +442,7 @@ where .await .map_err(SegmentError::StorageError)?; - let index = if cache_index_records { + let index = if cache_index_records_flag { Index::with_storage_and_base_index(segment_storage.index, base_index).await } else { Index::with_storage_index_records_option_and_validated_base_index( @@ -562,7 +562,7 @@ pub(crate) mod test { let config = _segment_config::<M, Idx, S::Size, SERP>(_RECORDS[0].len(), _RECORDS.len()).unwrap(); - let mut segment = Segment::<S, M, H, Idx, S::Size, SERP>::with_segment_storage_provider_config_base_index_and_cache_index_records( + let mut segment = Segment::<S, M, H, Idx, S::Size, SERP>::with_segment_storage_provider_config_base_index_and_cache_index_records_flag( &mut _segment_storage_provider, config, segment_base_index, @@ -593,7 +593,7 @@ pub(crate) mod test { segment.close().await.unwrap(); - let mut segment = Segment::<S, M, H, Idx, S::Size, SERP>::with_segment_storage_provider_config_base_index_and_cache_index_records( + let mut segment = Segment::<S, M, H, Idx, S::Size, SERP>::with_segment_storage_provider_config_base_index_and_cache_index_records_flag( &mut _segment_storage_provider, config, segment_base_index, @@ -669,7 +669,7 @@ pub(crate) mod test { segment.remove().await.unwrap(); - let segment = Segment::<S, M, H, Idx, S::Size, SERP>::with_segment_storage_provider_config_base_index_and_cache_index_records( + let segment = Segment::<S, M, H, Idx, S::Size, SERP>::with_segment_storage_provider_config_base_index_and_cache_index_records_flag( &mut _segment_storage_provider, config, segment_base_index, From 4e9190fface11b898c44e675860c5821a0b1f212 Mon Sep 17 00:00:00 2001 From: Arindam Das <dasarindam.mails@gmail.com> Date: Sun, 22 Oct 2023 14:07:44 +0530 Subject: [PATCH 10/10] refactor: inline index_cache_read_segments to clarify intent --- src/storage/commit_log/segmented_log/mod.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/storage/commit_log/segmented_log/mod.rs b/src/storage/commit_log/segmented_log/mod.rs index bbe8ea287..957adbedf 100644 --- a/src/storage/commit_log/segmented_log/mod.rs +++ b/src/storage/commit_log/segmented_log/mod.rs @@ -163,15 +163,13 @@ where read_segment_base_indices.len(), ); - let index_cache_read_segments = config.num_index_cached_read_segments.is_none(); - for segment_base_index in read_segment_base_indices { read_segments.push( Segment::with_segment_storage_provider_config_base_index_and_cache_index_records_flag( &mut segment_storage_provider, config.segment_config, segment_base_index, - index_cache_read_segments, + config.num_index_cached_read_segments.is_none(), ) .await .map_err(SegmentedLogError::SegmentError)?,