Skip to content

Commit

Permalink
feat: Implement freeze and fork for index (#12)
Browse files Browse the repository at this point in the history
* feat: write to index and data

* feat: freeze memtable

* feat: write pk to cache

* feat: impl fork for the merge tree memtable

* feat: impl is_empty

* feat: freeze shard

* feat: remove cache

* feat: shard reader

* chore: remove todo

* fix: key bytes stats

* feat: add bytes to tracker in forked memtable

* style: fix clippy

* chore: update comment
  • Loading branch information
evenyag authored Feb 6, 2024
1 parent 166df16 commit aab7791
Show file tree
Hide file tree
Showing 6 changed files with 408 additions and 66 deletions.
1 change: 1 addition & 0 deletions src/mito2/src/memtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
//! Memtables are write buffers for regions.
pub mod key_values;
#[allow(unused)]
pub mod merge_tree;
pub mod time_series;
pub(crate) mod version;
Expand Down
56 changes: 46 additions & 10 deletions src/mito2/src/memtable/merge_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,17 @@
//! - Flushes mutable parts to immutable parts
//! - Merges small immutable parts into a big immutable part
#[allow(unused)]
mod data;
#[allow(unused)]
mod index;
// TODO(yingwen): Remove this mod.
mod mutable;
mod tree;

use std::fmt;
use std::sync::atomic::{AtomicI64, AtomicU32, Ordering};
use std::sync::Arc;

use common_base::readable_size::ReadableSize;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::ColumnId;
use table::predicate::Predicate;
Expand All @@ -45,16 +45,27 @@ pub(crate) type ShardId = u32;
/// Index of a primary key in a shard.
pub(crate) type PkIndex = u16;
/// Id of a primary key.
#[allow(unused)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) struct PkId {
pub(crate) shard_id: ShardId,
pub(crate) pk_index: PkIndex,
}

/// Config for the merge tree memtable.
#[derive(Debug, Default, Clone)]
pub struct MergeTreeConfig {}
#[derive(Debug, Clone)]
pub struct MergeTreeConfig {
/// Max keys in an index shard.
index_max_keys_per_shard: usize,
}

impl Default for MergeTreeConfig {
fn default() -> Self {
Self {
// TODO(yingwen): Use 4096 or find a proper value.
index_max_keys_per_shard: 8192,
}
}
}

/// Memtable based on a merge tree.
pub struct MergeTreeMemtable {
Expand Down Expand Up @@ -104,7 +115,8 @@ impl Memtable for MergeTreeMemtable {
fn freeze(&self) -> Result<()> {
self.alloc_tracker.done_allocating();

// TODO(yingwen): Freeze the tree.
self.tree.freeze()?;

Ok(())
}

Expand Down Expand Up @@ -136,8 +148,14 @@ impl Memtable for MergeTreeMemtable {
}
}

fn fork(&self, _id: MemtableId, _metadata: &RegionMetadataRef) -> MemtableRef {
unimplemented!()
fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
let tree = self.tree.fork(metadata.clone());

Arc::new(MergeTreeMemtable::with_tree(
id,
tree,
self.alloc_tracker.write_buffer_manager(),
))
}
}

Expand All @@ -149,10 +167,28 @@ impl MergeTreeMemtable {
write_buffer_manager: Option<WriteBufferManagerRef>,
config: &MergeTreeConfig,
) -> Self {
Self::with_tree(id, MergeTree::new(metadata, config), write_buffer_manager)
}

/// Creates a mutable memtable from the tree.
///
/// It also adds the bytes used by shared parts (e.g. index) to the memory usage.
fn with_tree(
id: MemtableId,
tree: MergeTree,
write_buffer_manager: Option<WriteBufferManagerRef>,
) -> Self {
let alloc_tracker = AllocTracker::new(write_buffer_manager);
// Track space allocated by the tree.
let allocated = tree.shared_memory_size();
// Here we still add the bytes of shared parts to the tracker as the old memtable
// will release its tracker soon.
alloc_tracker.on_allocation(allocated);

Self {
id,
tree: Arc::new(MergeTree::new(metadata, config)),
alloc_tracker: AllocTracker::new(write_buffer_manager),
tree: Arc::new(tree),
alloc_tracker,
max_timestamp: AtomicI64::new(i64::MIN),
min_timestamp: AtomicI64::new(i64::MAX),
}
Expand Down
5 changes: 5 additions & 0 deletions src/mito2/src/memtable/merge_tree/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,11 @@ impl DataBuffer {
pub fn num_rows(&self) -> usize {
self.ts_builder.len()
}

/// Returns whether the buffer is empty.
pub fn is_empty(&self) -> bool {
self.num_rows() == 0
}
}

struct DataPartEncoder<'a> {
Expand Down
Loading

0 comments on commit aab7791

Please sign in to comment.