Skip to content

Commit

Permalink
feat: finish scan part
Browse files Browse the repository at this point in the history
  • Loading branch information
evenyag committed Jan 22, 2024
1 parent 57ad119 commit 81d28d1
Showing 1 changed file with 141 additions and 34 deletions.
175 changes: 141 additions & 34 deletions src/mito2/src/memtable/merge_tree/mutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

//! Mutable part of the merge tree.
use std::collections::{BTreeMap, HashMap, HashSet};
use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
use std::sync::Arc;
use std::time::{Duration, Instant};

Expand All @@ -29,26 +29,28 @@ use datatypes::arrow::array::BooleanArray;
use datatypes::arrow::buffer::BooleanBuffer;
use datatypes::arrow::datatypes::Field;
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::data_type::{DataType, ConcreteDataType};
use datatypes::data_type::{ConcreteDataType, DataType};
use datatypes::scalars::{ScalarVector, ScalarVectorBuilder};
use datatypes::types::TimestampType;
use datatypes::vectors::{
BinaryVector, BinaryVectorBuilder, BooleanVector, MutableVector, UInt16VectorBuilder,
UInt64Vector, UInt64VectorBuilder, UInt8Vector, UInt8VectorBuilder, Vector, VectorOp,
VectorRef, TimestampSecondVector, TimestampMillisecondVector, TimestampMicrosecondVector, TimestampNanosecondVector, UInt32Vector,
BinaryVector, BinaryVectorBuilder, BooleanVector, MutableVector, TimestampMicrosecondVector,
TimestampMillisecondVector, TimestampNanosecondVector, TimestampSecondVector,
UInt16VectorBuilder, UInt32Vector, UInt64Vector, UInt64VectorBuilder, UInt8Vector,
UInt8VectorBuilder, Vector, VectorOp, VectorRef,
};
use snafu::{ensure, ResultExt};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::ColumnId;
use table::predicate::Predicate;

use crate::error::{
ComputeVectorSnafu, NewRecordBatchSnafu, PrimaryKeyLengthMismatchSnafu, Result, ComputeArrowSnafu,
ComputeArrowSnafu, ComputeVectorSnafu, NewRecordBatchSnafu, PrimaryKeyLengthMismatchSnafu,
Result,
};
use crate::memtable::key_values::KeyValue;
use crate::memtable::merge_tree::MergeTreeConfig;
use crate::memtable::{BoxedBatchIterator, KeyValues};
use crate::read::{Batch, BatchColumn};
use crate::read::{Batch, BatchBuilder, BatchColumn};
use crate::row_converter::{McmpRowCodec, RowCodec};

/// Initial capacity for builders.
Expand Down Expand Up @@ -148,14 +150,12 @@ impl MutablePart {
}

/// Returns an iter to scan the mutable part.
// TODO(yingwen): provide a request.
pub(crate) fn scan_part(
&self,
metadata: &RegionMetadataRef,
row_codec: Arc<McmpRowCodec>,
projection: Option<&[ColumnId]>,
predicate: Option<&Predicate>,
batch_size: usize,
dedup: bool,
) -> Result<BoxedBatchIterator> {
// TODO(yingwen): Support dictionary mode or remove dictionary mode.
Expand All @@ -182,14 +182,17 @@ impl MutablePart {
}
None => None,
};
let offsets = plain_vectors
.as_ref()
.map(|v| v.primary_key_offsets())
.unwrap_or_default();

// TODO(yingwen): prune cost.
metrics.init_cost = now.elapsed();

let iter = Iter {
plain_vectors,
projection,
batch_size,
offsets,
metrics,
};

Expand Down Expand Up @@ -322,8 +325,7 @@ pub(crate) struct ReadMetrics {
/// Iterator of the mutable part.
struct Iter {
plain_vectors: Option<PlainBlockVectors>,
projection: HashSet<ColumnId>,
batch_size: usize,
offsets: VecDeque<usize>,
metrics: ReadMetrics,
}

Expand All @@ -332,10 +334,16 @@ impl Iterator for Iter {

fn next(&mut self) -> Option<Self::Item> {
let plain_vectors = self.plain_vectors.as_ref()?;
let start = self.offsets.pop_front()?;
let end = self.offsets.front().copied()?;

// TODO(yingwen): filter deleted.

unimplemented!()
let batch = plain_vectors
.slice_to_batch(start, end)
.and_then(|mut batch| {
batch.filter_deleted()?;
Ok(batch)
});
Some(batch)
}
}

Expand Down Expand Up @@ -624,7 +632,12 @@ impl PlainBlockVectors {
debug_assert_eq!(pk_vector.len(), seq_values.len());
debug_assert_eq!(ts_values.len(), seq_values.len());

let mut index_and_key: Vec<_> = pk_values.zip(ts_values.iter()).zip(seq_values.iter()).enumerate().map(|(index, key)| (index, (key.0.0, *key.0.1, *key.1))).collect();
let mut index_and_key: Vec<_> = pk_values
.zip(ts_values.iter())
.zip(seq_values.iter())
.enumerate()
.map(|(index, key)| (index, (key.0 .0, *key.0 .1, *key.1)))
.collect();
index_and_key.sort_unstable_by(|(_, a), (_, b)| {
a.0.cmp(&b.0) // compare pk
.then_with(|| a.1.cmp(&b.1)) // compare timestamp
Expand All @@ -635,9 +648,7 @@ impl PlainBlockVectors {
index_and_key.dedup_by_key(|x| x.1);
}

UInt32Vector::from_iter_values(
index_and_key.iter().map(|(idx, _)| *idx as u32),
)
UInt32Vector::from_iter_values(index_and_key.iter().map(|(idx, _)| *idx as u32))
}

/// Returns indices to sort vectors by timestamp, seq desc.
Expand All @@ -646,7 +657,11 @@ impl PlainBlockVectors {
let seq_values = self.value.sequence.as_arrow().values();
debug_assert_eq!(ts_values.len(), seq_values.len());

let mut index_and_key: Vec<_> = ts_values.iter().zip(seq_values.iter()).enumerate().collect();
let mut index_and_key: Vec<_> = ts_values
.iter()
.zip(seq_values.iter())
.enumerate()
.collect();
index_and_key.sort_unstable_by(|(_, a), (_, b)| {
a.0.cmp(&b.0) // compare timestamp
.then_with(|| b.1.cmp(&a.1)) // then compare seq desc
Expand All @@ -657,9 +672,7 @@ impl PlainBlockVectors {
index_and_key.dedup_by_key(|x| x.1);
}

UInt32Vector::from_iter_values(
index_and_key.iter().map(|(idx, _)| *idx as u32),
)
UInt32Vector::from_iter_values(index_and_key.iter().map(|(idx, _)| *idx as u32))
}

/// Evaluate the expression and compute the new mask.
Expand Down Expand Up @@ -748,6 +761,41 @@ impl PlainBlockVectors {
RecordBatch::try_new(schema, arrays).context(NewRecordBatchSnafu)
}

/// Converts the slice to a [Batch].
fn slice_to_batch(&self, start: usize, end: usize) -> Result<Batch> {
let num_rows = end - start;
let primary_key = self
.key
.as_ref()
.and_then(|pk_vector| pk_vector.get_data(start))
.map(|pk| pk.to_vec())
.unwrap_or_default();

let mut builder = BatchBuilder::new(primary_key);
builder
.timestamps_array(self.value.timestamp.slice(start, num_rows).to_arrow_array())?
.sequences_array(
self.value
.sequence
.get_slice(start, num_rows)
.to_arrow_array(),
)?
.op_types_array(
self.value
.op_type
.get_slice(start, num_rows)
.to_arrow_array(),
)?;
for batch_column in &self.value.fields {
builder.push_field(BatchColumn {
column_id: batch_column.column_id,
data: batch_column.data.slice(start, num_rows),
});
}

builder.build()
}

fn decode_primary_keys_to(
&self,
codec: &Arc<McmpRowCodec>,
Expand Down Expand Up @@ -791,14 +839,38 @@ impl PlainBlockVectors {
fn take_in_place(&mut self, indices: &UInt32Vector) -> Result<()> {
// Safety: we know the vector type.
if let Some(key_vector) = &self.key {
let key_vector = key_vector.take(indices).context(ComputeVectorSnafu)?.as_any().downcast_ref::<BinaryVector>().unwrap().clone();
let key_vector = key_vector
.take(indices)
.context(ComputeVectorSnafu)?
.as_any()
.downcast_ref::<BinaryVector>()
.unwrap()
.clone();
self.key = Some(key_vector);
}
self.value.timestamp = self.value.timestamp.take(indices).context(ComputeVectorSnafu)?;
self.value.sequence = self.value.sequence.take(indices).context(ComputeVectorSnafu)?
.as_any().downcast_ref::<UInt64Vector>().unwrap().clone();
self.value.op_type = self.value.op_type.take(indices).context(ComputeVectorSnafu)?
.as_any().downcast_ref::<UInt8Vector>().unwrap().clone();
self.value.timestamp = self
.value
.timestamp
.take(indices)
.context(ComputeVectorSnafu)?;
self.value.sequence = self
.value
.sequence
.take(indices)
.context(ComputeVectorSnafu)?
.as_any()
.downcast_ref::<UInt64Vector>()
.unwrap()
.clone();
self.value.op_type = self
.value
.op_type
.take(indices)
.context(ComputeVectorSnafu)?
.as_any()
.downcast_ref::<UInt8Vector>()
.unwrap()
.clone();
for batch_column in &mut self.value.fields {
batch_column.data = batch_column
.data
Expand All @@ -808,6 +880,37 @@ impl PlainBlockVectors {

Ok(())
}

/// Compute offsets of different primary keys in the array.
fn primary_key_offsets(&self) -> VecDeque<usize> {
let Some(pk_vector) = &self.key else {
return VecDeque::new();
};
if pk_vector.is_empty() {
return VecDeque::new();
}

// Init offsets.
let mut offsets = VecDeque::new();
offsets.push_back(0);
for (i, (current, next)) in pk_vector
.iter_data()
.take(pk_vector.len() - 1)
.zip(pk_vector.iter_data().skip(1))
.enumerate()
{
// Safety: key is not null.
let (current, next) = (current.unwrap(), next.unwrap());
if current != next {
// We meet a new key, push the next index as end offset of the current key.
offsets.push_back(i + 1);
}
}
// The last end offset.
offsets.push_back(pk_vector.len());

offsets
}
}

/// [ValueVectors] holds immutable vectors of field columns, `timestamp`, `sequence` and `op_type`.
Expand All @@ -831,25 +934,29 @@ impl ValueVectors {
// Safety: time index always has timestamp type.
match self.timestamp.data_type() {
ConcreteDataType::Timestamp(t) => match t {
TimestampType::Second(_) => self.timestamp
TimestampType::Second(_) => self
.timestamp
.as_any()
.downcast_ref::<TimestampSecondVector>()
.unwrap()
.as_arrow()
.values(),
TimestampType::Millisecond(_) => self.timestamp
TimestampType::Millisecond(_) => self
.timestamp
.as_any()
.downcast_ref::<TimestampMillisecondVector>()
.unwrap()
.as_arrow()
.values(),
TimestampType::Microsecond(_) => self.timestamp
TimestampType::Microsecond(_) => self
.timestamp
.as_any()
.downcast_ref::<TimestampMicrosecondVector>()
.unwrap()
.as_arrow()
.values(),
TimestampType::Nanosecond(_) => self.timestamp
TimestampType::Nanosecond(_) => self
.timestamp
.as_any()
.downcast_ref::<TimestampNanosecondVector>()
.unwrap()
Expand Down

0 comments on commit 81d28d1

Please sign in to comment.