diff --git a/src/mito2/src/memtable.rs b/src/mito2/src/memtable.rs index 3558a0e6cd3d..97d8e453b1d3 100644 --- a/src/mito2/src/memtable.rs +++ b/src/mito2/src/memtable.rs @@ -15,6 +15,7 @@ //! Memtables are write buffers for regions. pub mod key_values; +#[allow(unused)] pub mod merge_tree; pub mod time_series; pub(crate) mod version; diff --git a/src/mito2/src/memtable/merge_tree.rs b/src/mito2/src/memtable/merge_tree.rs index beda15a6e95d..43a4d7190c16 100644 --- a/src/mito2/src/memtable/merge_tree.rs +++ b/src/mito2/src/memtable/merge_tree.rs @@ -16,10 +16,9 @@ //! - Flushes mutable parts to immutable parts //! - Merges small immutable parts into a big immutable part -#[allow(unused)] mod data; -#[allow(unused)] mod index; +// TODO(yingwen): Remove this mod. mod mutable; mod tree; @@ -27,6 +26,7 @@ use std::fmt; use std::sync::atomic::{AtomicI64, AtomicU32, Ordering}; use std::sync::Arc; +use common_base::readable_size::ReadableSize; use store_api::metadata::RegionMetadataRef; use store_api::storage::ColumnId; use table::predicate::Predicate; @@ -45,7 +45,6 @@ pub(crate) type ShardId = u32; /// Index of a primary key in a shard. pub(crate) type PkIndex = u16; /// Id of a primary key. -#[allow(unused)] #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub(crate) struct PkId { pub(crate) shard_id: ShardId, @@ -53,8 +52,20 @@ pub(crate) struct PkId { } /// Config for the merge tree memtable. -#[derive(Debug, Default, Clone)] -pub struct MergeTreeConfig {} +#[derive(Debug, Clone)] +pub struct MergeTreeConfig { + /// Max keys in an index shard. + index_max_keys_per_shard: usize, +} + +impl Default for MergeTreeConfig { + fn default() -> Self { + Self { + // TODO(yingwen): Use 4096 or find a proper value. + index_max_keys_per_shard: 8192, + } + } +} /// Memtable based on a merge tree. pub struct MergeTreeMemtable { @@ -104,7 +115,8 @@ impl Memtable for MergeTreeMemtable { fn freeze(&self) -> Result<()> { self.alloc_tracker.done_allocating(); - // TODO(yingwen): Freeze the tree. + self.tree.freeze()?; + Ok(()) } @@ -136,8 +148,14 @@ impl Memtable for MergeTreeMemtable { } } - fn fork(&self, _id: MemtableId, _metadata: &RegionMetadataRef) -> MemtableRef { - unimplemented!() + fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef { + let tree = self.tree.fork(metadata.clone()); + + Arc::new(MergeTreeMemtable::with_tree( + id, + tree, + self.alloc_tracker.write_buffer_manager(), + )) } } @@ -149,10 +167,28 @@ impl MergeTreeMemtable { write_buffer_manager: Option, config: &MergeTreeConfig, ) -> Self { + Self::with_tree(id, MergeTree::new(metadata, config), write_buffer_manager) + } + + /// Creates a mutable memtable from the tree. + /// + /// It also adds the bytes used by shared parts (e.g. index) to the memory usage. + fn with_tree( + id: MemtableId, + tree: MergeTree, + write_buffer_manager: Option, + ) -> Self { + let alloc_tracker = AllocTracker::new(write_buffer_manager); + // Track space allocated by the tree. + let allocated = tree.shared_memory_size(); + // Here we still add the bytes of shared parts to the tracker as the old memtable + // will release its tracker soon. + alloc_tracker.on_allocation(allocated); + Self { id, - tree: Arc::new(MergeTree::new(metadata, config)), - alloc_tracker: AllocTracker::new(write_buffer_manager), + tree: Arc::new(tree), + alloc_tracker, max_timestamp: AtomicI64::new(i64::MIN), min_timestamp: AtomicI64::new(i64::MAX), } diff --git a/src/mito2/src/memtable/merge_tree/data.rs b/src/mito2/src/memtable/merge_tree/data.rs index dd15e513c16c..dc2336cbbef4 100644 --- a/src/mito2/src/memtable/merge_tree/data.rs +++ b/src/mito2/src/memtable/merge_tree/data.rs @@ -125,6 +125,11 @@ impl DataBuffer { pub fn num_rows(&self) -> usize { self.ts_builder.len() } + + /// Returns whether the buffer is empty. + pub fn is_empty(&self) -> bool { + self.num_rows() == 0 + } } struct DataPartEncoder<'a> { diff --git a/src/mito2/src/memtable/merge_tree/index.rs b/src/mito2/src/memtable/merge_tree/index.rs index b53e259e4f01..6b3776926707 100644 --- a/src/mito2/src/memtable/merge_tree/index.rs +++ b/src/mito2/src/memtable/merge_tree/index.rs @@ -23,6 +23,7 @@ use datatypes::arrow::compute; use snafu::ResultExt; use crate::error::{ComputeArrowSnafu, Result}; +use crate::memtable::merge_tree::mutable::WriteMetrics; use crate::memtable::merge_tree::{PkId, PkIndex, ShardId}; // TODO(yingwen): Consider using byte size to manage block. @@ -30,80 +31,120 @@ use crate::memtable::merge_tree::{PkId, PkIndex, ShardId}; const MAX_KEYS_PER_BLOCK: u16 = 256; /// Config for the index. +#[derive(Debug, Clone)] pub(crate) struct IndexConfig { /// Max keys in an index shard. pub(crate) max_keys_per_shard: usize, } -impl Default for IndexConfig { - fn default() -> Self { - Self { - // TODO(yingwen): Use 4096 or find a proper value. - max_keys_per_shard: 8192, - } - } -} - /// Primary key index. pub(crate) struct KeyIndex { config: IndexConfig, - // TODO(yingwen): 1. Support multiple shard. - shard: RwLock, + // TODO(yingwen): Support multiple shard. + shard: RwLock, } +pub(crate) type KeyIndexRef = Arc; + impl KeyIndex { pub(crate) fn new(config: IndexConfig) -> KeyIndex { KeyIndex { config, - shard: RwLock::new(MutableShard::new(0)), + shard: RwLock::new(Shard::new(0)), } } - pub(crate) fn write_primary_key(&self, key: &[u8]) -> Result { + pub(crate) fn write_primary_key(&self, key: &[u8], metrics: &mut WriteMetrics) -> Result { let mut shard = self.shard.write().unwrap(); - let pkid = shard.try_add_primary_key(&self.config, key)?; + let pk_id = shard.try_add_primary_key(&self.config, key, metrics)?; // TODO(yingwen): Switch shard if current shard is full. - Ok(pkid.expect("shard is full")) + Ok(pk_id.expect("shard is full")) } - pub(crate) fn scan_index(&self) -> Result { + pub(crate) fn scan_shard(&self, shard_id: ShardId) -> Result { let shard = self.shard.read().unwrap(); + assert_eq!(shard.shard_id, shard_id); let reader = shard.scan_shard()?; - Ok(Box::new(reader)) + Ok(reader) + } + + /// Freezes the index. + pub(crate) fn freeze(&self) -> Result<()> { + let mut shard = self.shard.write().unwrap(); + shard.freeze() + } + + /// Returns a new index for write. + /// + /// Callers must freeze the index first. + pub(crate) fn fork(&self) -> KeyIndex { + let current_shard = self.shard.read().unwrap(); + let shard = current_shard.fork(); + + KeyIndex { + config: self.config.clone(), + shard: RwLock::new(shard), + } + } + + pub(crate) fn memory_size(&self) -> usize { + self.shard.read().unwrap().memory_size() } } +type PkIndexMap = BTreeMap, PkIndex>; + // TODO(yingwen): Support partition index (partition by a column, e.g. table_id) to // reduce null columns and eliminate lock contention. We only need to partition the // write buffer but modify dicts with partition lock held. /// Mutable shard for the index. -struct MutableShard { +struct Shard { shard_id: ShardId, // TODO(yingwen): Reuse keys. - pk_to_index: BTreeMap, PkIndex>, + pk_to_index: PkIndexMap, key_buffer: KeyBuffer, dict_blocks: Vec, num_keys: usize, + /// Bytes allocated by keys in the [Shard::pk_to_index]. + key_bytes_in_index: usize, + // TODO(yingwen): Serialize the shard instead of keeping the map. + shared_index: Option>, } -impl MutableShard { - fn new(shard_id: ShardId) -> MutableShard { - MutableShard { +impl Shard { + fn new(shard_id: ShardId) -> Shard { + Shard { shard_id, pk_to_index: BTreeMap::new(), key_buffer: KeyBuffer::new(MAX_KEYS_PER_BLOCK.into()), dict_blocks: Vec::new(), num_keys: 0, + key_bytes_in_index: 0, + shared_index: None, } } - fn try_add_primary_key(&mut self, config: &IndexConfig, key: &[u8]) -> Result> { + fn try_add_primary_key( + &mut self, + config: &IndexConfig, + key: &[u8], + metrics: &mut WriteMetrics, + ) -> Result> { // The shard is full. if self.num_keys >= config.max_keys_per_shard { return Ok(None); } + // The shard is immutable, find in the shared index. + if let Some(shared_index) = &self.shared_index { + let pk_id = shared_index.get(key).map(|pk_index| PkId { + shard_id: self.shard_id, + pk_index: *pk_index, + }); + return Ok(pk_id); + } + // Already in the shard. if let Some(pk_index) = self.pk_to_index.get(key).copied() { return Ok(Some(PkId { @@ -124,20 +165,69 @@ impl MutableShard { self.pk_to_index.insert(key.to_vec(), pk_index); self.num_keys += 1; + // Since we store the key two times so the bytes usage doubled. + metrics.key_bytes += key.len() * 2; + self.key_bytes_in_index += key.len(); + Ok(Some(PkId { shard_id: self.shard_id, pk_index, })) } - fn scan_shard(&self) -> Result { - let sorted_pk_indices = self.pk_to_index.values().copied().collect(); + fn scan_shard(&self) -> Result { + let pk_indices = self.sorted_pk_indices(); let block = self.key_buffer.finish_cloned(); let mut blocks = Vec::with_capacity(self.dict_blocks.len() + 1); blocks.extend_from_slice(&self.dict_blocks); blocks.push(block); - Ok(BlocksReader::new(blocks, sorted_pk_indices)) + Ok(ShardReader::new(blocks, pk_indices)) + } + + fn freeze(&mut self) -> Result<()> { + if self.key_buffer.is_empty() { + return Ok(()); + } + + let dict_block = self.key_buffer.finish(); + self.dict_blocks.push(dict_block); + + // Freezes the pk to index map. + let pk_to_index = std::mem::take(&mut self.pk_to_index); + self.shared_index = Some(Arc::new(pk_to_index)); + + Ok(()) + } + + fn fork(&self) -> Shard { + Shard { + shard_id: self.shard_id, + pk_to_index: BTreeMap::new(), + key_buffer: KeyBuffer::new(MAX_KEYS_PER_BLOCK.into()), + dict_blocks: self.dict_blocks.clone(), + num_keys: self.num_keys, + key_bytes_in_index: self.key_bytes_in_index, + shared_index: self.shared_index.clone(), + } + } + + fn sorted_pk_indices(&self) -> Vec { + if let Some(shared_index) = &self.shared_index { + return shared_index.values().copied().collect(); + } + + self.pk_to_index.values().copied().collect() + } + + fn memory_size(&self) -> usize { + self.key_bytes_in_index + + self.key_buffer.buffer_memory_size() + + self + .dict_blocks + .iter() + .map(|block| block.buffer_memory_size()) + .sum::() } } @@ -179,6 +269,10 @@ impl KeyBuffer { self.primary_key_builder.len() } + fn is_empty(&self) -> bool { + self.primary_key_builder.is_empty() + } + /// Gets the primary key by its index. /// /// # Panics @@ -197,6 +291,10 @@ impl KeyBuffer { fn finish(&mut self) -> DictBlock { let primary_key = self.primary_key_builder.finish(); + // Reserve capacity for the new builder. `finish()` the builder will leave the builder + // empty with capacity 0. + // TODO(yingwen): Do we need to reserve capacity for data? + self.primary_key_builder = BinaryBuilder::with_capacity(primary_key.len(), 0); DictBlock::from_unsorted(primary_key) } @@ -206,6 +304,16 @@ impl KeyBuffer { DictBlock::from_unsorted(primary_key) } + + fn buffer_memory_size(&self) -> usize { + self.primary_key_builder.values_slice().len() + + std::mem::size_of_val(self.primary_key_builder.offsets_slice()) + + self + .primary_key_builder + .validity_slice() + .map(|v| v.len()) + .unwrap_or(0) + } } // The array should be cheap to clone. @@ -229,6 +337,10 @@ impl DictBlock { let pos = index % MAX_KEYS_PER_BLOCK; self.primary_key.value(pos as usize) } + + fn buffer_memory_size(&self) -> usize { + self.primary_key.get_buffer_memory_size() + } } /// Reader to scan index keys. @@ -257,16 +369,16 @@ pub(crate) trait IndexReader: Send { pub(crate) type BoxedIndexReader = Box; -pub(crate) struct BlocksReader { +pub(crate) struct ShardReader { blocks: Vec, sorted_pk_indices: Vec, /// Current offset in the `sorted_pk_indices`. offset: usize, } -impl BlocksReader { - fn new(blocks: Vec, sorted_pk_indices: Vec) -> BlocksReader { - BlocksReader { +impl ShardReader { + fn new(blocks: Vec, sorted_pk_indices: Vec) -> ShardReader { + ShardReader { blocks, sorted_pk_indices, offset: 0, @@ -277,9 +389,18 @@ impl BlocksReader { let block_idx = pk_index / MAX_KEYS_PER_BLOCK; self.blocks[block_idx as usize].key_by_pk_index(pk_index) } + + fn compute_pk_weights(&self, pk_weights: &mut Vec) { + pk_weights.clear(); + pk_weights.resize(self.sorted_pk_indices.len(), 0); + + for (weight, pk_index) in self.sorted_pk_indices.iter().enumerate() { + pk_weights[*pk_index as usize] = weight as u16; + } + } } -impl IndexReader for BlocksReader { +impl IndexReader for ShardReader { fn is_valid(&self) -> bool { self.offset < self.sorted_pk_indices.len() } @@ -330,8 +451,9 @@ mod tests { max_keys_per_shard: (MAX_KEYS_PER_BLOCK * 3).into(), }); let mut last_pk_id = None; + let mut metrics = WriteMetrics::default(); for key in &keys { - let pk_id = index.write_primary_key(key).unwrap(); + let pk_id = index.write_primary_key(key, &mut metrics).unwrap(); last_pk_id = Some(pk_id); } assert_eq!( @@ -341,6 +463,8 @@ mod tests { }, last_pk_id.unwrap() ); + let key_bytes: usize = keys.iter().map(|key| key.len() * 2).sum(); + assert_eq!(key_bytes, metrics.key_bytes); let mut expect: Vec<_> = keys .into_iter() @@ -350,11 +474,32 @@ mod tests { expect.sort_unstable_by(|a, b| a.0.cmp(&b.0)); let mut result = Vec::with_capacity(expect.len()); - let mut reader = index.scan_index().unwrap(); + let mut reader = index.scan_shard(0).unwrap(); while reader.is_valid() { result.push((reader.current_key().to_vec(), reader.current_pk_index())); reader.next(); } assert_eq!(expect, result); } + + #[test] + fn test_index_memory_size() { + let index = KeyIndex::new(IndexConfig { + max_keys_per_shard: (MAX_KEYS_PER_BLOCK * 3).into(), + }); + let mut metrics = WriteMetrics::default(); + // 513 keys + let num_keys = MAX_KEYS_PER_BLOCK * 2 + 1; + // Writes 2 blocks + for i in 0..num_keys { + // Each key is 5 bytes. + let key = format!("{i:05}"); + index + .write_primary_key(key.as_bytes(), &mut metrics) + .unwrap(); + } + // num_keys * 5 * 2 + assert_eq!(5130, metrics.key_bytes); + assert_eq!(8850, index.memory_size()); + } } diff --git a/src/mito2/src/memtable/merge_tree/tree.rs b/src/mito2/src/memtable/merge_tree/tree.rs index 38829388aafc..ec3788aef788 100644 --- a/src/mito2/src/memtable/merge_tree/tree.rs +++ b/src/mito2/src/memtable/merge_tree/tree.rs @@ -16,24 +16,34 @@ use std::sync::{Arc, RwLock}; +use api::v1::OpType; +use common_time::Timestamp; +use snafu::ensure; use store_api::metadata::RegionMetadataRef; use store_api::storage::ColumnId; use table::predicate::Predicate; -use crate::error::Result; -use crate::memtable::merge_tree::mutable::{MutablePart, WriteMetrics}; -use crate::memtable::merge_tree::MergeTreeConfig; +use crate::error::{PrimaryKeyLengthMismatchSnafu, Result}; +use crate::memtable::key_values::KeyValue; +use crate::memtable::merge_tree::data::DataBuffer; +use crate::memtable::merge_tree::index::{IndexConfig, KeyIndex, KeyIndexRef}; +use crate::memtable::merge_tree::mutable::WriteMetrics; +use crate::memtable::merge_tree::{MergeTreeConfig, PkId}; use crate::memtable::{BoxedBatchIterator, KeyValues}; -use crate::row_converter::{McmpRowCodec, SortField}; +use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; + +/// Initial capacity for the data buffer. +const DATA_INIT_CAP: usize = 8; /// The merge tree. pub(crate) struct MergeTree { + /// Config of the tree. + config: MergeTreeConfig, /// Metadata of the region. pub(crate) metadata: RegionMetadataRef, /// Primary key codec. row_codec: Arc, - /// Mutable part of the tree. - mutable: RwLock, + parts: RwLock, } pub(crate) type MergeTreeRef = Arc; @@ -47,39 +57,184 @@ impl MergeTree { .map(|c| SortField::new(c.column_schema.data_type.clone())) .collect(), ); + + let index = (!metadata.primary_key.is_empty()).then(|| { + Arc::new(KeyIndex::new(IndexConfig { + max_keys_per_shard: config.index_max_keys_per_shard, + })) + }); + let data_buffer = DataBuffer::with_capacity(metadata.clone(), DATA_INIT_CAP); + let parts = TreeParts { + immutable: false, + index, + data_buffer, + }; + MergeTree { + config: config.clone(), metadata, row_codec: Arc::new(row_codec), - mutable: RwLock::new(MutablePart::new(config)), + parts: RwLock::new(parts), } } + // TODO(yingwen): The size computed from values is inaccurate. /// Write key-values into the tree. + /// + /// # Panics + /// Panics if the tree is immutable (frozen). pub(crate) fn write(&self, kvs: &KeyValues, metrics: &mut WriteMetrics) -> Result<()> { - let mut part = self.mutable.write().unwrap(); - part.write(&self.metadata, &self.row_codec, kvs, metrics) + let mut primary_key = Vec::new(); + let has_pk = !self.metadata.primary_key.is_empty(); + + for kv in kvs.iter() { + ensure!( + kv.num_primary_keys() == self.row_codec.num_fields(), + PrimaryKeyLengthMismatchSnafu { + expect: self.row_codec.num_fields(), + actual: kv.num_primary_keys(), + } + ); + // Safety: timestamp of kv must be both present and a valid timestamp value. + let ts = kv.timestamp().as_timestamp().unwrap().unwrap().value(); + metrics.min_ts = metrics.min_ts.min(ts); + metrics.max_ts = metrics.max_ts.max(ts); + metrics.value_bytes += kv.fields().map(|v| v.data_size()).sum::(); + + if !has_pk { + // No primary key. + // Now we always assign the first shard and the first pk index to the id. + let pk_id = PkId { + shard_id: 0, + pk_index: 0, + }; + self.write_with_id(pk_id, kv); + continue; + } + + // Encode primary key. + primary_key.clear(); + self.row_codec + .encode_to_vec(kv.primary_keys(), &mut primary_key)?; + + // Write rows with primary keys. + self.write_with_key(&primary_key, kv, metrics)?; + } + + metrics.value_bytes += + kvs.num_rows() * (std::mem::size_of::() + std::mem::size_of::()); + + Ok(()) } /// Scans the tree. pub(crate) fn scan( &self, - projection: Option<&[ColumnId]>, - predicate: Option, + _projection: Option<&[ColumnId]>, + _predicate: Option, ) -> Result { - let mutable = self.mutable.read().unwrap(); - - mutable.scan_part( - &self.metadata, - self.row_codec.clone(), - projection, - predicate.as_ref(), - true, - ) + todo!() } /// Returns true if the tree is empty. pub(crate) fn is_empty(&self) -> bool { - let mutable = self.mutable.write().unwrap(); - mutable.is_empty() + let parts = self.parts.read().unwrap(); + // Gets whether the memtable is empty from the data part. + parts.data_buffer.is_empty() + // TODO(yingwen): Also consider other parts if we freeze the data buffer. + } + + /// Marks the tree as immutable. + /// + /// Once the tree becomes immutable, callers should not write to it again. + pub(crate) fn freeze(&self) -> Result<()> { + let mut parts = self.parts.write().unwrap(); + parts.immutable = true; + // Freezes the index. + if let Some(index) = &parts.index { + index.freeze()?; + } + + Ok(()) } + + /// Forks an immutable tree. Returns a mutable tree that inherits the index + /// of this tree. + pub(crate) fn fork(&self, metadata: RegionMetadataRef) -> MergeTree { + if metadata.primary_key != self.metadata.primary_key { + // The priamry key is changed. We can't reuse fields. + return MergeTree::new(metadata, &self.config); + } + + let current_parts = self.parts.read().unwrap(); + let index = current_parts + .index + .as_ref() + .map(|index| Arc::new(index.fork())); + // New parts. + let parts = TreeParts { + immutable: false, + index, + data_buffer: DataBuffer::with_capacity(metadata.clone(), DATA_INIT_CAP), + }; + + MergeTree { + config: self.config.clone(), + metadata, + // We can reuse row codec. + row_codec: self.row_codec.clone(), + parts: RwLock::new(parts), + } + } + + /// Returns the memory size of shared parts. + pub(crate) fn shared_memory_size(&self) -> usize { + let parts = self.parts.read().unwrap(); + parts + .index + .as_ref() + .map(|index| index.memory_size()) + .unwrap_or(0) + } + + fn write_with_key( + &self, + primary_key: &[u8], + kv: KeyValue, + metrics: &mut WriteMetrics, + ) -> Result<()> { + // Write the pk to the index. + let pk_id = self.write_primary_key(primary_key, metrics)?; + // Writes data. + self.write_with_id(pk_id, kv); + + Ok(()) + } + + fn write_with_id(&self, pk_id: PkId, kv: KeyValue) { + let mut parts = self.parts.write().unwrap(); + assert!(!parts.immutable); + parts.data_buffer.write_row(pk_id, kv) + } + + fn write_primary_key(&self, key: &[u8], metrics: &mut WriteMetrics) -> Result { + let index = { + let parts = self.parts.read().unwrap(); + assert!(!parts.immutable); + // Safety: The region has primary keys. + parts.index.clone().unwrap() + }; + + index.write_primary_key(key, metrics) + } +} + +struct TreeParts { + /// Whether the tree is immutable. + immutable: bool, + /// Index part of the tree. If the region doesn't have a primary key, this field + /// is `None`. + index: Option, + /// Data buffer of the tree. + data_buffer: DataBuffer, } diff --git a/src/mito2/src/test_util/version_util.rs b/src/mito2/src/test_util/version_util.rs index 0f755f19b0fa..5c1ca381067b 100644 --- a/src/mito2/src/test_util/version_util.rs +++ b/src/mito2/src/test_util/version_util.rs @@ -25,7 +25,7 @@ use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder} use store_api::storage::RegionId; use crate::manifest::action::RegionEdit; -use crate::memtable::{MemtableBuilder, MemtableBuilderRef}; +use crate::memtable::MemtableBuilder; use crate::region::version::{Version, VersionBuilder, VersionControl}; use crate::sst::file::{FileId, FileMeta}; use crate::sst::file_purger::FilePurgerRef;