Skip to content

Commit

Permalink
Columnar in logging dataflows (#30883)
Browse files Browse the repository at this point in the history
Convert logging dataflows to columnar

The aim of this PR is to convert some of the logging dataflows to use
columnar data on dataflow edges, wherever it makes sense to do so. It
introduces building blocks that we need to move columnar data across
dataflow edges, and feed them into merge batchers to create arrangements
from columnar data.

The PR is rather large, and it is best viewed file-by-file. The rough
structure is:
* containers in timely-util to support columnar data on dataflow edges
and as input to arrangements.
* Each of the log dataflows separately.
* Improvement of the `consolidate_pact` function.
* Introduction of `consolidate_and_pack` to simplify adding new
introspection sources.

The goal of this PR is to show how we can use columnar data in
Materialize as a replacement for vectors. It doesn't yet use any
columnar data in rendering of LIR plans.

The PR doesn't touch the dataflow edges from the demux operator to
calling `consolidate_and_pack` because the edges use a
`ConsolidatingContainerBuilder`, which is more efficient for
vector-based containers (and in fact, lacks an implementation for any
other container).

Part of MaterializeInc/database-issues#3748.

### Checklist

- [ ] This PR has adequate test coverage / QA involvement has been duly
considered. ([trigger-ci for additional test/nightly
runs](https://trigger-ci.dev.materialize.com/))
- [ ] This PR has an associated up-to-date [design
doc](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/README.md),
is a design doc
([template](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/00000000_template.md)),
or is sufficiently small to not require a design.
  <!-- Reference the design in the description. -->
- [ ] If this PR evolves [an existing `$T ⇔ Proto$T`
mapping](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/command-and-response-binary-encoding.md)
(possibly in a backwards-incompatible way), then it is tagged with a
`T-proto` label.
- [ ] If this PR will require changes to cloud orchestration or tests,
there is a companion cloud PR to account for those changes that is
tagged with the release-blocker label
([example](MaterializeInc/cloud#5021)).
<!-- Ask in #team-cloud on Slack if you need help preparing the cloud
PR. -->
- [ ] If this PR includes major [user-facing behavior
changes](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/guide-changes.md#what-changes-require-a-release-note),
I have pinged the relevant PM to schedule a changelog post.

---------

Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru authored Jan 22, 2025
1 parent 2a8a66e commit a077232
Show file tree
Hide file tree
Showing 22 changed files with 1,094 additions and 644 deletions.
32 changes: 18 additions & 14 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/compute-types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ publish = false
workspace = true

[dependencies]
columnar = "0.2.2"
columnation = "0.1.0"
differential-dataflow = "0.13.3"
itertools = "0.12.1"
Expand Down
25 changes: 7 additions & 18 deletions src/compute-types/src/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
use std::collections::{BTreeMap, BTreeSet};
use std::num::NonZeroU64;

use columnar::Columnar;
use mz_expr::{
CollectionPlan, EvalError, Id, LetRecLimit, LocalId, MapFilterProject, MirScalarExpr,
OptimizedMirRelationExpr, TableFunc,
Expand Down Expand Up @@ -161,14 +162,12 @@ impl AvailableCollections {
}

/// An identifier for an LIR node.
///
/// LirIds start at 1, not 0, which let's us get a better struct packing in `ComputeEvent::LirMapping`.
#[derive(Clone, Copy, Debug, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize)]
pub struct LirId(NonZeroU64);
#[derive(Clone, Copy, Debug, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize, Columnar)]
pub struct LirId(u64);

impl LirId {
fn as_u64(&self) -> u64 {
self.0.into()
self.0
}
}

Expand All @@ -186,11 +185,11 @@ impl std::fmt::Display for LirId {

impl RustType<u64> for LirId {
fn into_proto(&self) -> u64 {
u64::from(self.0)
self.0
}

fn from_proto(proto: u64) -> Result<Self, mz_proto::TryFromProtoError> {
Ok(Self(proto.try_into()?))
Ok(Self(proto))
}
}

Expand Down Expand Up @@ -513,7 +512,7 @@ impl Arbitrary for LirId {
type Parameters = ();

fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
let lir_id = NonZeroU64::arbitrary();
let lir_id = u64::arbitrary();
lir_id.prop_map(LirId).boxed()
}
}
Expand Down Expand Up @@ -1132,16 +1131,6 @@ mod tests {

use super::*;

#[mz_ore::test]
fn test_option_lirid_fits_in_usize() {
let option_lirid_size = std::mem::size_of::<Option<LirId>>();
let usize_size = std::mem::size_of::<usize>();
assert!(
option_lirid_size <= usize_size,
"Option<LirId> (size {option_lirid_size}) should fit in usize (size {usize_size})"
);
}

proptest! {
#![proptest_config(ProptestConfig::with_cases(10))]
#[mz_ore::test]
Expand Down
2 changes: 1 addition & 1 deletion src/compute-types/src/plan/lowering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ impl Context {
pub fn new(debug_name: String, features: &OptimizerFeatures) -> Self {
Self {
arrangements: Default::default(),
next_lir_id: LirId(std::num::NonZero::<u64>::MIN),
next_lir_id: LirId(1),
debug_info: LirDebugInfo {
debug_name,
id: GlobalId::Transient(0),
Expand Down
1 change: 1 addition & 0 deletions src/compute/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ workspace = true
anyhow = "1.0.66"
async-stream = "0.3.3"
bytesize = "1.1.0"
columnar = "0.2.2"
crossbeam-channel = "0.5.8"
dec = { version = "0.4.8", features = ["serde"] }
differential-dataflow = "0.13.3"
Expand Down
4 changes: 2 additions & 2 deletions src/compute/src/compute_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,7 @@ impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A> {

// Log the receipt of the peek.
if let Some(logger) = self.compute_state.compute_logger.as_mut() {
logger.log(pending.as_log_event(true));
logger.log(&pending.as_log_event(true));
}

self.process_peek(&mut Antichain::new(), pending);
Expand Down Expand Up @@ -891,7 +891,7 @@ impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A> {

// Log responding to the peek request.
if let Some(logger) = self.compute_state.compute_logger.as_mut() {
logger.log(log_event);
logger.log(&log_event);
}
}

Expand Down
10 changes: 5 additions & 5 deletions src/compute/src/extensions/arrange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,10 +263,10 @@ where
.stream
.unary(Pipeline, "ArrangementSize", |_cap, info| {
let address = info.address;
logger.log(ComputeEvent::ArrangementHeapSizeOperator(
logger.log(&ComputeEvent::ArrangementHeapSizeOperator(
ArrangementHeapSizeOperator {
operator_id,
address,
address: address.to_vec(),
},
));
move |input, output| {
Expand All @@ -281,15 +281,15 @@ where

let size = size.try_into().expect("must fit");
if size != old_size {
logger.log(ComputeEvent::ArrangementHeapSize(ArrangementHeapSize {
logger.log(&ComputeEvent::ArrangementHeapSize(ArrangementHeapSize {
operator_id,
delta_size: size - old_size,
}));
}

let capacity = capacity.try_into().expect("must fit");
if capacity != old_capacity {
logger.log(ComputeEvent::ArrangementHeapCapacity(
logger.log(&ComputeEvent::ArrangementHeapCapacity(
ArrangementHeapCapacity {
operator_id,
delta_capacity: capacity - old_capacity,
Expand All @@ -299,7 +299,7 @@ where

let allocations = allocations.try_into().expect("must fit");
if allocations != old_allocations {
logger.log(ComputeEvent::ArrangementHeapAllocations(
logger.log(&ComputeEvent::ArrangementHeapAllocations(
ArrangementHeapAllocations {
operator_id,
delta_allocations: allocations - old_allocations,
Expand Down
Loading

0 comments on commit a077232

Please sign in to comment.