Skip to content

Commit

Permalink
feat: impl iter for the memtable
Browse files Browse the repository at this point in the history
  • Loading branch information
evenyag committed Jan 22, 2024
1 parent 81d28d1 commit da17f29
Show file tree
Hide file tree
Showing 8 changed files with 39 additions and 31 deletions.
2 changes: 1 addition & 1 deletion src/mito2/src/flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ impl RegionFlushTask {
}

let file_id = FileId::random();
let iter = mem.iter(None, None);
let iter = mem.iter(None, None)?;
let source = Source::Iter(iter);
let create_inverted_index = self.engine_config.inverted_index.create_on_flush.auto();
let mem_threshold_index_create = self
Expand Down
2 changes: 1 addition & 1 deletion src/mito2/src/memtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ pub trait Memtable: Send + Sync + fmt::Debug {
&self,
projection: Option<&[ColumnId]>,
predicate: Option<Predicate>,
) -> BoxedBatchIterator;
) -> Result<BoxedBatchIterator>;

/// Returns true if the memtable is empty.
fn is_empty(&self) -> bool;
Expand Down
8 changes: 4 additions & 4 deletions src/mito2/src/memtable/merge_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,10 @@ impl Memtable for MergeTreeMemtable {

fn iter(
&self,
_projection: Option<&[ColumnId]>,
_predicate: Option<Predicate>,
) -> BoxedBatchIterator {
todo!()
projection: Option<&[ColumnId]>,
predicate: Option<Predicate>,
) -> Result<BoxedBatchIterator> {
self.tree.scan(projection, predicate)
}

fn is_empty(&self) -> bool {
Expand Down
19 changes: 4 additions & 15 deletions src/mito2/src/memtable/merge_tree/mutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ use store_api::storage::ColumnId;
use table::predicate::Predicate;

use crate::error::{
ComputeArrowSnafu, ComputeVectorSnafu, NewRecordBatchSnafu, PrimaryKeyLengthMismatchSnafu,
Result,
ComputeVectorSnafu, NewRecordBatchSnafu, PrimaryKeyLengthMismatchSnafu, Result,
};
use crate::memtable::key_values::KeyValue;
use crate::memtable::merge_tree::MergeTreeConfig;
Expand Down Expand Up @@ -196,7 +195,7 @@ impl MutablePart {
metrics,
};

unimplemented!()
Ok(Box::new(iter))
}

/// Returns true if the dictionary is enabled.
Expand Down Expand Up @@ -326,6 +325,8 @@ pub(crate) struct ReadMetrics {
struct Iter {
plain_vectors: Option<PlainBlockVectors>,
offsets: VecDeque<usize>,
// TODO(yingwen): scan metrics.
#[allow(unused)]
metrics: ReadMetrics,
}

Expand All @@ -347,18 +348,6 @@ impl Iterator for Iter {
}
}

/// Expand the optional projection to ids of columns to read.
fn expand_projection(
metadata: &RegionMetadataRef,
projection: Option<&[ColumnId]>,
) -> HashSet<ColumnId> {
if let Some(projection) = projection {
projection.iter().copied().collect()
} else {
metadata.field_columns().map(|c| c.column_id).collect()
}
}

/// Id of the primary key.
type KeyIdType = u16;
/// Id of a [KeyDict].
Expand Down
25 changes: 22 additions & 3 deletions src/mito2/src/memtable/merge_tree/tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,21 @@
use std::sync::{Arc, RwLock};

use store_api::metadata::RegionMetadataRef;
use store_api::storage::ColumnId;
use table::predicate::Predicate;

use crate::error::Result;
use crate::memtable::merge_tree::mutable::{MutablePart, WriteMetrics};
use crate::memtable::merge_tree::MergeTreeConfig;
use crate::memtable::KeyValues;
use crate::memtable::{BoxedBatchIterator, KeyValues};
use crate::row_converter::{McmpRowCodec, SortField};

/// The merge tree.
pub(crate) struct MergeTree {
/// Metadata of the region.
pub(crate) metadata: RegionMetadataRef,
/// Primary key codec.
row_codec: McmpRowCodec,
row_codec: Arc<McmpRowCodec>,
/// Mutable part of the tree.
mutable: RwLock<MutablePart>,
}
Expand All @@ -47,7 +49,7 @@ impl MergeTree {
);
MergeTree {
metadata,
row_codec,
row_codec: Arc::new(row_codec),
mutable: RwLock::new(MutablePart::new(config)),
}
}
Expand All @@ -57,4 +59,21 @@ impl MergeTree {
let mut part = self.mutable.write().unwrap();
part.write(&self.metadata, &self.row_codec, kvs, metrics)
}

/// Scans the tree.
pub(crate) fn scan(
&self,
projection: Option<&[ColumnId]>,
predicate: Option<Predicate>,
) -> Result<BoxedBatchIterator> {
let mutable = self.mutable.read().unwrap();

mutable.scan_part(
&self.metadata,
self.row_codec.clone(),
projection,
predicate.as_ref(),
true,
)
}
}
8 changes: 4 additions & 4 deletions src/mito2/src/memtable/time_series.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ impl Memtable for TimeSeriesMemtable {
&self,
projection: Option<&[ColumnId]>,
filters: Option<Predicate>,
) -> BoxedBatchIterator {
) -> Result<BoxedBatchIterator> {
let projection = if let Some(projection) = projection {
projection.iter().copied().collect()
} else {
Expand All @@ -227,7 +227,7 @@ impl Memtable for TimeSeriesMemtable {
.collect()
};

Box::new(self.series_set.iter_series(projection, filters))
Ok(Box::new(self.series_set.iter_series(projection, filters)))
}

fn is_empty(&self) -> bool {
Expand Down Expand Up @@ -1080,7 +1080,7 @@ mod tests {
.map(|kv| kv.timestamp().as_timestamp().unwrap().unwrap().value())
.collect::<HashSet<_>>();

let iter = memtable.iter(None, None);
let iter = memtable.iter(None, None).unwrap();
let read = iter
.flat_map(|batch| {
batch
Expand Down Expand Up @@ -1116,7 +1116,7 @@ mod tests {
let memtable = TimeSeriesMemtable::new(schema, 42, None);
memtable.write(&kvs).unwrap();

let iter = memtable.iter(Some(&[3]), None);
let iter = memtable.iter(Some(&[3]), None).unwrap();

let mut v0_all = vec![];

Expand Down
2 changes: 1 addition & 1 deletion src/mito2/src/read/seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ impl SeqScan {
async fn build_sources(&self) -> Result<Vec<Source>> {
let mut sources = Vec::with_capacity(self.memtables.len() + self.files.len());
for mem in &self.memtables {
let iter = mem.iter(Some(self.mapper.column_ids()), self.predicate.clone());
let iter = mem.iter(Some(self.mapper.column_ids()), self.predicate.clone())?;
sources.push(Source::Iter(iter));
}
for file in &self.files {
Expand Down
4 changes: 2 additions & 2 deletions src/mito2/src/test_util/memtable_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ impl Memtable for EmptyMemtable {
&self,
_projection: Option<&[ColumnId]>,
_filters: Option<Predicate>,
) -> BoxedBatchIterator {
Box::new(std::iter::empty())
) -> Result<BoxedBatchIterator> {
Ok(Box::new(std::iter::empty()))
}

fn is_empty(&self) -> bool {
Expand Down

0 comments on commit da17f29

Please sign in to comment.