Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add a new method fork to create a mutable memtable #10

Merged
merged 5 commits into from
Feb 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions src/mito2/src/flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ use crate::config::MitoConfig;
use crate::error::{
Error, FlushRegionSnafu, RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, Result,
};
use crate::memtable::MemtableBuilderRef;
use crate::metrics::{FLUSH_BYTES_TOTAL, FLUSH_ELAPSED, FLUSH_ERRORS_TOTAL, FLUSH_REQUESTS_TOTAL};
use crate::read::Source;
use crate::region::version::{VersionControlData, VersionControlRef, VersionRef};
Expand Down Expand Up @@ -197,7 +196,6 @@ pub(crate) struct RegionFlushTask {
pub(crate) request_sender: mpsc::Sender<WorkerRequest>,

pub(crate) access_layer: AccessLayerRef,
pub(crate) memtable_builder: MemtableBuilderRef,
pub(crate) file_purger: FilePurgerRef,
pub(crate) listener: WorkerListener,
pub(crate) engine_config: Arc<MitoConfig>,
Expand Down Expand Up @@ -461,7 +459,9 @@ impl FlushScheduler {
}

// Now we can flush the region directly.
version_control.freeze_mutable(&task.memtable_builder);
version_control.freeze_mutable().inspect(|e| {
error!(e; "Failed to freeze the mutable memtable for region {}", region_id);
})?;
// Submit a flush job.
let job = task.into_flush_job(version_control);
if let Err(e) = self.scheduler.schedule(job) {
Expand Down Expand Up @@ -760,7 +760,6 @@ mod tests {
senders: Vec::new(),
request_sender: tx,
access_layer: env.access_layer.clone(),
memtable_builder: builder.memtable_builder(),
file_purger: builder.file_purger(),
listener: WorkerListener::default(),
engine_config: Arc::new(MitoConfig::default()),
Expand Down
12 changes: 10 additions & 2 deletions src/mito2/src/memtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,14 @@ pub trait Memtable: Send + Sync + fmt::Debug {
/// Returns true if the memtable is empty.
fn is_empty(&self) -> bool;

/// Mark the memtable as immutable.
fn mark_immutable(&self);
/// Marks the memtable as immutable.
fn freeze(&self) -> Result<()>;

/// Returns the [MemtableStats] info of Memtable.
fn stats(&self) -> MemtableStats;

/// Forks this memtable and returns a new mutable memtable with specific memtable `id`.
fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef;
}

pub type MemtableRef = Arc<dyn Memtable>;
Expand Down Expand Up @@ -157,6 +160,11 @@ impl AllocTracker {
pub(crate) fn bytes_allocated(&self) -> usize {
self.bytes_allocated.load(Ordering::Relaxed)
}

/// Returns the write buffer manager.
pub(crate) fn write_buffer_manager(&self) -> Option<WriteBufferManagerRef> {
self.write_buffer_manager.clone()
}
}

impl Drop for AllocTracker {
Expand Down
9 changes: 8 additions & 1 deletion src/mito2/src/memtable/merge_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,11 @@ impl Memtable for MergeTreeMemtable {
self.tree.is_empty()
}

fn mark_immutable(&self) {
fn freeze(&self) -> Result<()> {
self.alloc_tracker.done_allocating();

// TODO(yingwen): Freeze the tree.
Ok(())
}

fn stats(&self) -> MemtableStats {
Expand Down Expand Up @@ -131,6 +134,10 @@ impl Memtable for MergeTreeMemtable {
time_range: Some((min_timestamp, max_timestamp)),
}
}

fn fork(&self, _id: MemtableId, _metadata: &RegionMetadataRef) -> MemtableRef {
unimplemented!()
}
}

impl MergeTreeMemtable {
Expand Down
12 changes: 11 additions & 1 deletion src/mito2/src/memtable/time_series.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,10 @@ impl Memtable for TimeSeriesMemtable {
self.series_set.series.read().unwrap().is_empty()
}

fn mark_immutable(&self) {
fn freeze(&self) -> Result<()> {
self.alloc_tracker.done_allocating();

Ok(())
}

fn stats(&self) -> MemtableStats {
Expand Down Expand Up @@ -263,6 +265,14 @@ impl Memtable for TimeSeriesMemtable {
time_range: Some((min_timestamp, max_timestamp)),
}
}

fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
Arc::new(TimeSeriesMemtable::new(
metadata.clone(),
id,
self.alloc_tracker.write_buffer_manager(),
))
}
}

type SeriesRwLockMap = RwLock<BTreeMap<Vec<u8>, Arc<RwLock<Series>>>>;
Expand Down
20 changes: 13 additions & 7 deletions src/mito2/src/memtable/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
use std::sync::Arc;

use smallvec::SmallVec;
use store_api::metadata::RegionMetadataRef;

use crate::error::Result;
use crate::memtable::{MemtableId, MemtableRef};

/// A version of current memtables in a region.
Expand Down Expand Up @@ -61,28 +63,32 @@ impl MemtableVersion {
/// memtable.
///
/// Returns `None` if the mutable memtable is empty.
#[must_use]
pub(crate) fn freeze_mutable(&self, mutable: MemtableRef) -> Option<MemtableVersion> {
debug_assert!(mutable.is_empty());
pub(crate) fn freeze_mutable(
&self,
metadata: &RegionMetadataRef,
) -> Result<Option<MemtableVersion>> {
if self.mutable.is_empty() {
// No need to freeze the mutable memtable.
return None;
return Ok(None);
}

// Marks the mutable memtable as immutable so it can free the memory usage from our
// soft limit.
self.mutable.mark_immutable();
self.mutable.freeze()?;
// Fork the memtable.
let mutable = self.mutable.fork(self.mutable.id() + 1, metadata);

// Pushes the mutable memtable to immutable list.
let immutables = self
.immutables
.iter()
.cloned()
.chain([self.mutable.clone()])
.collect();
Some(MemtableVersion {
Ok(Some(MemtableVersion {
mutable,
immutables,
})
}))
}

/// Removes memtables by ids from immutable memtables.
Expand Down
13 changes: 9 additions & 4 deletions src/mito2/src/region/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use std::time::Duration;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::SequenceNumber;

use crate::error::Result;
use crate::manifest::action::RegionEdit;
use crate::memtable::version::{MemtableVersion, MemtableVersionRef};
use crate::memtable::{MemtableBuilderRef, MemtableId, MemtableRef};
Expand Down Expand Up @@ -76,14 +77,16 @@ impl VersionControl {
}

/// Freezes the mutable memtable if it is not empty.
pub(crate) fn freeze_mutable(&self, builder: &MemtableBuilderRef) {
pub(crate) fn freeze_mutable(&self) -> Result<()> {
let version = self.current().version;
if version.memtables.mutable.is_empty() {
return;
return Ok(());
}
let new_mutable = builder.build(&version.metadata);
// Safety: Immutable memtable is None.
let new_memtables = version.memtables.freeze_mutable(new_mutable).unwrap();
let new_memtables = version
.memtables
.freeze_mutable(&version.metadata)?
.unwrap();
// Create a new version with memtable switched.
let new_version = Arc::new(
VersionBuilder::from_version(version)
Expand All @@ -93,6 +96,8 @@ impl VersionControl {

let mut version_data = self.data.write().unwrap();
version_data.version = new_version;

Ok(())
}

/// Apply edit to current version.
Expand Down
8 changes: 7 additions & 1 deletion src/mito2/src/test_util/memtable_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,17 @@ impl Memtable for EmptyMemtable {
true
}

fn mark_immutable(&self) {}
fn freeze(&self) -> Result<()> {
Ok(())
}

fn stats(&self) -> MemtableStats {
MemtableStats::default()
}

fn fork(&self, id: MemtableId, _metadata: &RegionMetadataRef) -> MemtableRef {
Arc::new(EmptyMemtable::new(id))
}
}

/// Empty memtable builder.
Expand Down
1 change: 0 additions & 1 deletion src/mito2/src/worker/handle_flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,6 @@ impl<S> RegionWorkerLoop<S> {
senders: Vec::new(),
request_sender: self.sender.clone(),
access_layer: region.access_layer.clone(),
memtable_builder: self.memtable_builder.clone(),
file_purger: region.file_purger.clone(),
listener: self.listener.clone(),
engine_config,
Expand Down
Loading