diff --git a/src/mito2/src/memtable/merge_tree/index.rs b/src/mito2/src/memtable/merge_tree/index.rs index b4dae22dcda1..4dd7ad9101ba 100644 --- a/src/mito2/src/memtable/merge_tree/index.rs +++ b/src/mito2/src/memtable/merge_tree/index.rs @@ -14,6 +14,10 @@ //! Primary key index of the merge tree. +use std::cmp::Ordering; +use std::collections::BinaryHeap; +use std::sync::{Arc, RwLock}; + use datatypes::arrow::array::{Array, ArrayBuilder, BinaryArray, BinaryBuilder}; use datatypes::arrow::compute; use snafu::ResultExt; @@ -31,12 +35,42 @@ pub(crate) struct IndexConfig { pub(crate) max_keys_per_shard: usize, } +impl Default for IndexConfig { + fn default() -> Self { + Self { + // TODO(yingwen): Use 4096 or find a proper value. + max_keys_per_shard: 8192, + } + } +} + /// Primary key index. -pub(crate) struct KeyIndex {} +pub(crate) struct KeyIndex { + config: IndexConfig, + // TODO(yingwen): 1. Support multiple shard. + shard: RwLock, +} impl KeyIndex { - pub(crate) fn add_primary_key(&mut self, key: &[u8]) -> Result { - unimplemented!() + pub(crate) fn new(config: IndexConfig) -> KeyIndex { + KeyIndex { + config, + shard: RwLock::new(MutableShard::new(0)), + } + } + + pub(crate) fn add_primary_key(&self, key: &[u8]) -> Result { + let mut shard = self.shard.write().unwrap(); + let pkid = shard.try_add_primary_key(&self.config, key)?; + // TODO(yingwen): Switch shard if current shard is full. + Ok(pkid.expect("shard is full")) + } + + pub(crate) fn scan_index(&self) -> Result { + let shard = self.shard.read().unwrap(); + let reader = shard.scan_shard()?; + + Ok(Box::new(reader)) } } @@ -47,11 +81,20 @@ impl KeyIndex { struct MutableShard { shard_id: ShardId, key_buffer: KeyBuffer, - dict_blocks: Vec, + dict_blocks: Vec, num_keys: usize, } impl MutableShard { + fn new(shard_id: ShardId) -> MutableShard { + MutableShard { + shard_id, + key_buffer: KeyBuffer::new(MAX_KEYS_PER_BLOCK.into()), + dict_blocks: Vec::new(), + num_keys: 0, + } + } + fn try_add_primary_key(&mut self, config: &IndexConfig, key: &[u8]) -> Result> { // The shard is full. if self.num_keys >= config.max_keys_per_shard { @@ -61,17 +104,29 @@ impl MutableShard { if self.key_buffer.len() >= MAX_KEYS_PER_BLOCK.into() { // The write buffer is full. let dict_block = self.key_buffer.finish()?; - self.dict_blocks.push(dict_block); + self.dict_blocks.push(Arc::new(dict_block)); } // Safety: we check the buffer length. let pk_index = self.key_buffer.push_key(key); + self.num_keys += 1; Ok(Some(PkId { shard_id: self.shard_id, pk_index, })) } + + fn scan_shard(&self) -> Result { + let block = self.key_buffer.finish_cloned()?; + let mut readers = Vec::with_capacity(self.dict_blocks.len() + 1); + readers.push(DictBlockReader::new(Arc::new(block))); + for block in &self.dict_blocks { + readers.push(DictBlockReader::new(block.clone())); + } + + Ok(ReaderMerger::from_readers(readers)) + } } // TODO(yingwen): Bench using custom container for binary and ids so we can @@ -81,6 +136,7 @@ impl MutableShard { /// Now it doesn't support searching index by key. The memtable should use another /// cache to map primary key to its index. struct KeyBuffer { + // TODO(yingwen): Maybe use BTreeMap as key builder. // We use arrow's binary builder as out default binary builder // is LargeBinaryBuilder primary_key_builder: BinaryBuilder, @@ -88,6 +144,13 @@ struct KeyBuffer { } impl KeyBuffer { + fn new(item_capacity: usize) -> KeyBuffer { + KeyBuffer { + primary_key_builder: BinaryBuilder::with_capacity(item_capacity, 0), + next_pk_index: 0, + } + } + /// Pushes a new key and returns its pk index. /// /// # Panics @@ -147,6 +210,8 @@ struct DictBlock { index_weight: Vec, } +type DictBlockRef = Arc; + impl DictBlock { fn try_from_unsorted(primary_key: BinaryArray) -> Result { assert!(primary_key.len() <= PkIndex::MAX.into()); @@ -178,9 +243,153 @@ impl DictBlock { Ok(dict) } - fn get_key(&self, index: PkIndex) -> &[u8] { + fn len(&self) -> usize { + self.primary_key.len() + } + + /// Get key by [PkIndex]. + fn key_by_pk_index(&self, index: PkIndex) -> &[u8] { // Casting index to usize is safe. let pos = self.index_weight[index as usize]; - self.primary_key.value(pos as usize) + self.key_at(pos as usize) + } + + /// Get key at position. + fn key_at(&self, pos: usize) -> &[u8] { + self.primary_key.value(pos) + } + + /// Get [PkIndex] at position. + fn pk_index_at(&self, pos: usize) -> PkIndex { + self.ordered_pk_index[pos] + } +} + +/// Reader to scan index keys. +pub(crate) trait IndexReader: Send { + /// Returns whether the reader is valid. + fn is_valid(&self) -> bool; + + /// Returns current key. + /// + /// # Panics + /// Panics if the reader is invalid. + fn current_key(&self) -> &[u8]; + + /// Returns current pk index. + /// + /// # Panics + /// Panics if the reader is invalid. + fn current_pk_index(&self) -> PkIndex; + + /// Advance the reader. + /// + /// # Panics + /// Panics if the reader is invalid. + fn next(&mut self); +} + +pub(crate) type BoxedIndexReader = Box; + +struct DictBlockReader { + block: DictBlockRef, + current: usize, +} + +impl DictBlockReader { + fn new(block: DictBlockRef) -> Self { + Self { block, current: 0 } + } +} + +impl IndexReader for DictBlockReader { + fn is_valid(&self) -> bool { + self.current < self.block.len() + } + + fn current_key(&self) -> &[u8] { + self.block.key_at(self.current) + } + + fn current_pk_index(&self) -> PkIndex { + self.block.pk_index_at(self.current) + } + + fn next(&mut self) { + assert!(self.is_valid()); + self.current += 1; + } +} + +/// Wrapper for heap merge. +/// +/// Reader inside the wrapper must be valid. +struct HeapWrapper(DictBlockReader); + +impl PartialEq for HeapWrapper { + fn eq(&self, other: &HeapWrapper) -> bool { + self.0.current_key() == other.0.current_key() + } +} + +impl Eq for HeapWrapper {} + +impl PartialOrd for HeapWrapper { + fn partial_cmp(&self, other: &HeapWrapper) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for HeapWrapper { + fn cmp(&self, other: &HeapWrapper) -> Ordering { + // The std binary heap is a max heap, but we want the nodes are ordered in + // ascend order, so we compare the nodes in reverse order. + other.0.current_key().cmp(self.0.current_key()) + } +} + +struct ReaderMerger { + heap: BinaryHeap, +} + +impl ReaderMerger { + fn from_readers(readers: Vec) -> ReaderMerger { + let heap = readers + .into_iter() + .filter_map(|reader| { + if reader.is_valid() { + Some(HeapWrapper(reader)) + } else { + None + } + }) + .collect(); + + ReaderMerger { heap } + } +} + +impl IndexReader for ReaderMerger { + fn is_valid(&self) -> bool { + !self.heap.is_empty() + } + + fn current_key(&self) -> &[u8] { + self.heap.peek().unwrap().0.current_key() + } + + fn current_pk_index(&self) -> PkIndex { + self.heap.peek().unwrap().0.current_pk_index() + } + + fn next(&mut self) { + while let Some(mut top) = self.heap.pop() { + top.0.next(); + if top.0.is_valid() { + self.heap.push(top); + break; + } + // Top is exhausted, try next node. + } } }