diff --git a/src/adapter/src/catalog/migrate.rs b/src/adapter/src/catalog/migrate.rs index 27862675a0009..119f3397b67dc 100644 --- a/src/adapter/src/catalog/migrate.rs +++ b/src/adapter/src/catalog/migrate.rs @@ -139,7 +139,7 @@ async fn ast_rewrite_postgres_source_timeline_id_0_80_0( use mz_sql_parser::ast::{PgConfigOption, PgConfigOptionName, Value, WithOptionValue}; use mz_storage_types::connections::inline::IntoInlineConnection; use mz_storage_types::connections::PostgresConnection; - use mz_storage_types::sources::{ + use mz_storage_types::sources::postgres::{ PostgresSourcePublicationDetails, ProtoPostgresSourcePublicationDetails, }; use prost::Message; diff --git a/src/catalog/src/memory/objects.rs b/src/catalog/src/memory/objects.rs index a95395a1a411b..f96727189e1ac 100644 --- a/src/catalog/src/memory/objects.rs +++ b/src/catalog/src/memory/objects.rs @@ -580,8 +580,8 @@ impl Source { Some("debezium") } SourceEnvelope::Upsert(upsert_envelope) => match upsert_envelope.style { - mz_storage_types::sources::UpsertStyle::Default(_) => Some("upsert"), - mz_storage_types::sources::UpsertStyle::Debezium { .. } => { + mz_storage_types::sources::envelope::UpsertStyle::Default(_) => Some("upsert"), + mz_storage_types::sources::envelope::UpsertStyle::Debezium { .. } => { // NOTE(aljoscha): Should we somehow mark that this is // using upsert internally? See note above about // DEBEZIUM. diff --git a/src/sql/src/plan/statement/ddl.rs b/src/sql/src/plan/statement/ddl.rs index aa770758afdbb..9d253451af297 100644 --- a/src/sql/src/plan/statement/ddl.rs +++ b/src/sql/src/plan/statement/ddl.rs @@ -52,12 +52,17 @@ use mz_storage_types::sources::encoding::{ included_column_desc, AvroEncoding, ColumnSpec, CsvEncoding, DataEncoding, DataEncodingInner, ProtobufEncoding, RegexEncoding, SourceDataEncoding, SourceDataEncodingInner, }; -use mz_storage_types::sources::{ - GenericSourceConnection, KafkaMetadataKind, KafkaSourceConnection, KeyEnvelope, LoadGenerator, - LoadGeneratorSourceConnection, PostgresSourceConnection, PostgresSourcePublicationDetails, - ProtoPostgresSourcePublicationDetails, SourceConnection, SourceDesc, SourceEnvelope, - TestScriptSourceConnection, Timeline, UnplannedSourceEnvelope, UpsertStyle, +use mz_storage_types::sources::envelope::{ + KeyEnvelope, SourceEnvelope, UnplannedSourceEnvelope, UpsertStyle, }; +use mz_storage_types::sources::kafka::{KafkaMetadataKind, KafkaSourceConnection}; +use mz_storage_types::sources::load_generator::{LoadGenerator, LoadGeneratorSourceConnection}; +use mz_storage_types::sources::postgres::{ + PostgresSourceConnection, PostgresSourcePublicationDetails, + ProtoPostgresSourcePublicationDetails, +}; +use mz_storage_types::sources::testscript::TestScriptSourceConnection; +use mz_storage_types::sources::{GenericSourceConnection, SourceConnection, SourceDesc, Timeline}; use prost::Message; use crate::ast::display::AstDisplay; diff --git a/src/sql/src/pure.rs b/src/sql/src/pure.rs index fa9a86c1c1520..536ab33791926 100644 --- a/src/sql/src/pure.rs +++ b/src/sql/src/pure.rs @@ -39,9 +39,8 @@ use mz_storage_types::configuration::StorageConfiguration; use mz_storage_types::connections::inline::IntoInlineConnection; use mz_storage_types::connections::Connection; use mz_storage_types::errors::ContextCreationError; -use mz_storage_types::sources::{ - GenericSourceConnection, PostgresSourcePublicationDetails, SourceConnection, -}; +use mz_storage_types::sources::postgres::PostgresSourcePublicationDetails; +use mz_storage_types::sources::{GenericSourceConnection, SourceConnection}; use prost::Message; use protobuf_native::compiler::{SourceTreeDescriptorDatabase, VirtualSourceTree}; use protobuf_native::MessageLite; @@ -433,13 +432,17 @@ async fn purify_create_source( let mut subsources = vec![]; let progress_desc = match &connection { - CreateSourceConnection::Kafka { .. } => &mz_storage_types::sources::KAFKA_PROGRESS_DESC, - CreateSourceConnection::Postgres { .. } => &mz_storage_types::sources::PG_PROGRESS_DESC, + CreateSourceConnection::Kafka { .. } => { + &mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC + } + CreateSourceConnection::Postgres { .. } => { + &mz_storage_types::sources::postgres::PG_PROGRESS_DESC + } CreateSourceConnection::LoadGenerator { .. } => { - &mz_storage_types::sources::LOAD_GEN_PROGRESS_DESC + &mz_storage_types::sources::load_generator::LOAD_GEN_PROGRESS_DESC } CreateSourceConnection::TestScript { .. } => { - &mz_storage_types::sources::TEST_SCRIPT_PROGRESS_DESC + &mz_storage_types::sources::testscript::TEST_SCRIPT_PROGRESS_DESC } }; diff --git a/src/storage-types/build.rs b/src/storage-types/build.rs index dda0c8faa389e..6ee2a034c6cec 100644 --- a/src/storage-types/build.rs +++ b/src/storage-types/build.rs @@ -127,6 +127,11 @@ fn main() { "storage-types/src/sinks.proto", "storage-types/src/sources.proto", "storage-types/src/sources/encoding.proto", + "storage-types/src/sources/envelope.proto", + "storage-types/src/sources/kafka.proto", + "storage-types/src/sources/postgres.proto", + "storage-types/src/sources/load_generator.proto", + "storage-types/src/sources/testscript.proto", ], &[".."], ) diff --git a/src/storage-types/src/connections/aws.proto b/src/storage-types/src/connections/aws.proto index 0c58a03131ee7..0a50e785aa63c 100644 --- a/src/storage-types/src/connections/aws.proto +++ b/src/storage-types/src/connections/aws.proto @@ -11,8 +11,6 @@ syntax = "proto3"; -import "google/protobuf/empty.proto"; - import "repr/src/global_id.proto"; import "storage-types/src/connections.proto"; diff --git a/src/storage-types/src/errors.proto b/src/storage-types/src/errors.proto index f378f069af973..2fffdc30a662b 100644 --- a/src/storage-types/src/errors.proto +++ b/src/storage-types/src/errors.proto @@ -10,7 +10,6 @@ syntax = "proto3"; import "expr/src/scalar.proto"; -import "expr/src/id.proto"; import "repr/src/global_id.proto"; import "repr/src/row.proto"; import "storage-types/src/shim.proto"; diff --git a/src/storage-types/src/sources.proto b/src/storage-types/src/sources.proto index 6132dee04fd13..686db79d6215a 100644 --- a/src/storage-types/src/sources.proto +++ b/src/storage-types/src/sources.proto @@ -11,19 +11,18 @@ syntax = "proto3"; import "google/protobuf/empty.proto"; -import "postgres-util/src/desc.proto"; -import "proto/src/chrono.proto"; import "proto/src/proto.proto"; import "repr/src/global_id.proto"; -import "repr/src/relation_and_scalar.proto"; import "repr/src/row.proto"; import "storage-types/src/controller.proto"; -import "storage-types/src/connections.proto"; -import "storage-types/src/connections/aws.proto"; import "storage-types/src/errors.proto"; import "storage-types/src/instances.proto"; import "storage-types/src/sources/encoding.proto"; -import "expr/src/scalar.proto"; +import "storage-types/src/sources/envelope.proto"; +import "storage-types/src/sources/postgres.proto"; +import "storage-types/src/sources/kafka.proto"; +import "storage-types/src/sources/load_generator.proto"; +import "storage-types/src/sources/testscript.proto"; package mz_storage_types.sources; @@ -31,29 +30,6 @@ message ProtoMzOffset { uint64 offset = 1; } -message ProtoKafkaMetadataKind { - oneof kind { - google.protobuf.Empty partition = 1; - google.protobuf.Empty offset = 2; - google.protobuf.Empty timestamp = 3; - google.protobuf.Empty headers = 4; - ProtoKafkaHeader header = 5; - } -} - -message ProtoKafkaHeader { - string key = 1; - bool use_bytes = 2; -} - -message ProtoKeyEnvelope { - oneof kind { - google.protobuf.Empty none = 1; - google.protobuf.Empty flattened = 2; - string named = 3; - } -} - message ProtoTimeline { oneof kind { google.protobuf.Empty epoch_milliseconds = 1; @@ -62,119 +38,21 @@ message ProtoTimeline { } } -message ProtoSourceEnvelope { - oneof kind { - ProtoNoneEnvelope none = 1; - ProtoDebeziumEnvelope debezium = 2; - ProtoUpsertEnvelope upsert = 3; - google.protobuf.Empty cdc_v2 = 4; - } -} - -message ProtoNoneEnvelope { - ProtoKeyEnvelope key_envelope = 1; - uint64 key_arity = 2; -} - -message ProtoUpsertEnvelope { - ProtoUpsertStyle style = 1; - repeated uint64 key_indices = 2; - uint64 source_arity = 3; - reserved 4; -} - -message ProtoUpsertStyle { - message ProtoDebezium { - uint64 after_idx = 1; - } - - oneof kind { - ProtoKeyEnvelope default = 1; - ProtoDebezium debezium = 2; - } -} - -message ProtoDebeziumEnvelope { - uint64 before_idx = 1; - uint64 after_idx = 2; - ProtoDebeziumDedupProjection dedup = 3; -} - -message ProtoDebeziumTransactionMetadata { - mz_repr.global_id.ProtoGlobalId tx_metadata_global_id = 1; - uint64 tx_status_idx = 2; - uint64 tx_transaction_id_idx = 3; - uint64 tx_data_collections_idx = 4; - uint64 tx_data_collections_data_collection_idx = 5; - uint64 tx_data_collections_event_count_idx = 6; - string tx_data_collection_name = 7; - uint64 data_transaction_idx = 8; - uint64 data_transaction_id_idx = 9; -} - -message ProtoDebeziumDedupProjection { - uint64 op_idx = 1; - uint64 source_idx = 2; - uint64 snapshot_idx = 3; - ProtoDebeziumSourceProjection source_projection = 4; - ProtoDebeziumTransactionMetadata tx_metadata = 6; -} - -message ProtoDebeziumSourceProjection { - message ProtoMySql { - uint64 file = 1; - uint64 pos = 2; - uint64 row = 3; - } - - message ProtoPostgres { - uint64 sequence = 1; - uint64 lsn = 2; - } - - message ProtoSqlServer { - uint64 change_lsn = 1; - uint64 event_serial_no = 2; - } - - oneof kind { - ProtoMySql my_sql = 1; - ProtoPostgres postgres = 2; - ProtoSqlServer sql_server = 3; - } -} - -message ProtoKafkaMetadataColumn { - string name = 1; - ProtoKafkaMetadataKind kind = 2; -} - -message ProtoKafkaSourceConnection { - reserved 5, 6, 7, 8, 9, 10, 12, 14; - mz_storage_types.connections.ProtoKafkaConnection connection = 1; - mz_repr.global_id.ProtoGlobalId connection_id = 13; - string topic = 2; - map start_offsets = 3; - optional string group_id_prefix = 4; - repeated ProtoKafkaMetadataColumn metadata_columns = 11; - mz_proto.ProtoDuration topic_metadata_refresh_interval = 15; -} - message ProtoSourceDesc { reserved 4; ProtoSourceConnection connection = 1; mz_storage_types.sources.encoding.ProtoSourceDataEncoding encoding = 2; - ProtoSourceEnvelope envelope = 3; + mz_storage_types.sources.envelope.ProtoSourceEnvelope envelope = 3; mz_proto.ProtoDuration timestamp_interval = 5; } message ProtoSourceConnection { reserved 2, 3, 5; oneof kind { - ProtoKafkaSourceConnection kafka = 1; - ProtoPostgresSourceConnection postgres = 4; - ProtoLoadGeneratorSourceConnection loadgen = 6; - ProtoTestScriptSourceConnection testscript = 7; + mz_storage_types.sources.kafka.ProtoKafkaSourceConnection kafka = 1; + mz_storage_types.sources.postgres.ProtoPostgresSourceConnection postgres = 4; + mz_storage_types.sources.load_generator.ProtoLoadGeneratorSourceConnection loadgen = 6; + mz_storage_types.sources.testscript.ProtoTestScriptSourceConnection testscript = 7; } } @@ -185,55 +63,6 @@ message ProtoSourceData { } } -message ProtoPostgresSourceConnection { - message ProtoPostgresTableCast { - repeated mz_expr.scalar.ProtoMirScalarExpr column_casts = 1; - } - - mz_repr.global_id.ProtoGlobalId connection_id = 6; - mz_storage_types.connections.ProtoPostgresConnection connection = 1; - string publication = 2; - ProtoPostgresSourcePublicationDetails details = 4; - repeated ProtoPostgresTableCast table_casts = 5; - // Describes the position in the source's publication that the table cast - // correlates to; meant to be iterated over in tandem with table_casts - repeated uint64 table_cast_pos = 7; -} - -message ProtoPostgresSourcePublicationDetails { - repeated mz_postgres_util.desc.ProtoPostgresTableDesc tables = 1; - string slot = 2; - optional uint64 timeline_id = 3; -} - -message ProtoLoadGeneratorSourceConnection { - reserved 1; - oneof generator { - ProtoCounterLoadGenerator counter = 6; - google.protobuf.Empty auction = 3; - ProtoTpchLoadGenerator tpch = 4; - google.protobuf.Empty datums = 5; - google.protobuf.Empty marketing = 7; - } - optional uint64 tick_micros = 2; -} - -message ProtoTestScriptSourceConnection { - string desc_json = 1; -} - -message ProtoCounterLoadGenerator { - optional uint64 max_cardinality = 1; -} - -message ProtoTpchLoadGenerator { - int64 count_supplier = 1; - int64 count_part = 2; - int64 count_customer = 3; - int64 count_orders = 4; - int64 count_clerk = 5; -} - message ProtoCompression { oneof kind { google.protobuf.Empty gzip = 1; diff --git a/src/storage-types/src/sources.rs b/src/storage-types/src/sources.rs index 275e20faa5caa..439d05a868ba8 100644 --- a/src/storage-types/src/sources.rs +++ b/src/storage-types/src/sources.rs @@ -9,20 +9,19 @@ //! Types and traits related to the introduction of changing collections into `dataflow`. -use std::collections::{BTreeMap, BTreeSet}; +use std::collections::BTreeMap; use std::fmt::Debug; use std::hash::Hash; use std::ops::{Add, AddAssign, Deref, DerefMut}; use std::str::FromStr; use std::time::Duration; -use anyhow::{anyhow, bail}; use bytes::BufMut; -use dec::OrderedDecimal; + use itertools::EitherOrBoth::Both; use itertools::Itertools; -use mz_expr::{MirScalarExpr, PartitionId}; -use mz_ore::now::NowFn; +use mz_expr::PartitionId; + use mz_persist_types::columnar::{ ColumnFormat, ColumnGet, ColumnPush, Data, DataType, PartDecoder, PartEncoder, Schema, }; @@ -30,18 +29,17 @@ use mz_persist_types::dyn_struct::{DynStruct, DynStructCfg, ValidityMut, Validit use mz_persist_types::stats::StatsFn; use mz_persist_types::Codec; use mz_proto::{IntoRustIfSome, ProtoMapEntry, ProtoType, RustType, TryFromProtoError}; -use mz_repr::adt::numeric::{Numeric, NumericMaxScale}; + use mz_repr::{ - ColumnType, Datum, DatumDecoderT, DatumEncoderT, GlobalId, RelationDesc, RelationType, Row, - RowDecoder, RowEncoder, ScalarType, + ColumnType, Datum, DatumDecoderT, DatumEncoderT, GlobalId, RelationDesc, Row, RowDecoder, + RowEncoder, }; -use mz_timely_util::order::{Interval, Partitioned, RangeBound}; -use once_cell::sync::Lazy; + use proptest::prelude::{any, Arbitrary, BoxedStrategy, Strategy}; use proptest_derive::Arbitrary; use prost::Message; use serde::{Deserialize, Serialize}; -use timely::dataflow::operators::to_stream::Event; + use timely::order::{PartialOrder, TotalOrder}; use timely::progress::timestamp::Refines; use timely::progress::{PathSummary, Timestamp}; @@ -50,16 +48,26 @@ use crate::connections::inline::{ ConnectionAccess, ConnectionResolver, InlinedConnection, IntoInlineConnection, ReferencedConnection, }; -use crate::connections::ConnectionContext; + use crate::controller::{CollectionMetadata, StorageError}; use crate::errors::{DataflowError, ProtoDataflowError}; use crate::instances::StorageInstanceId; -use crate::sources::encoding::{DataEncoding, DataEncodingInner, SourceDataEncoding}; + use crate::sources::proto_ingestion_description::{ProtoSourceExport, ProtoSourceImport}; -use crate::sources::proto_load_generator_source_connection::Generator as ProtoGenerator; use crate::AlterCompatible; pub mod encoding; +pub mod envelope; +pub mod kafka; +pub mod load_generator; +pub mod postgres; +pub mod testscript; + +pub use crate::sources::envelope::SourceEnvelope; +pub use crate::sources::kafka::KafkaSourceConnection; +pub use crate::sources::load_generator::LoadGeneratorSourceConnection; +pub use crate::sources::postgres::PostgresSourceConnection; +pub use crate::sources::testscript::TestScriptSourceConnection; include!(concat!(env!("OUT_DIR"), "/mz_storage_types.sources.rs")); @@ -379,90 +387,6 @@ impl SourceTimestamp for MzOffset { } } -impl SourceTimestamp for Partitioned { - fn from_compat_ts(pid: PartitionId, offset: MzOffset) -> Self { - match pid { - PartitionId::Kafka(pid) => Partitioned::with_partition(pid, offset), - PartitionId::None => panic!("invalid partitioned partition {pid}"), - } - } - - fn try_into_compat_ts(&self) -> Option<(PartitionId, MzOffset)> { - let pid = self.partition()?; - Some((PartitionId::Kafka(*pid), *self.timestamp())) - } - - fn encode_row(&self) -> Row { - use mz_repr::adt::range; - let mut row = Row::with_capacity(2); - let mut packer = row.packer(); - - let to_numeric = |p: i32| Datum::from(OrderedDecimal(Numeric::from(p))); - - let (lower, upper) = match self.interval() { - Interval::Range(l, u) => match (l, u) { - (RangeBound::Bottom, RangeBound::Top) => { - ((Datum::Null, false), (Datum::Null, false)) - } - (RangeBound::Bottom, RangeBound::Elem(pid)) => { - ((Datum::Null, false), (to_numeric(*pid), false)) - } - (RangeBound::Elem(pid), RangeBound::Top) => { - ((to_numeric(*pid), false), (Datum::Null, false)) - } - (RangeBound::Elem(l_pid), RangeBound::Elem(u_pid)) => { - ((to_numeric(*l_pid), false), (to_numeric(*u_pid), false)) - } - o => unreachable!("don't know how to handle this partition {o:?}"), - }, - Interval::Point(pid) => ((to_numeric(*pid), true), (to_numeric(*pid), true)), - }; - - let offset = self.timestamp().offset; - - packer - .push_range(range::Range::new(Some(( - range::RangeBound::new(lower.0, lower.1), - range::RangeBound::new(upper.0, upper.1), - )))) - .expect("pushing range must not generate errors"); - - packer.push(Datum::UInt64(offset)); - row - } - fn decode_row(row: &Row) -> Self { - let mut datums = row.iter(); - - match (datums.next(), datums.next(), datums.next()) { - (Some(Datum::Range(range)), Some(Datum::UInt64(offset)), None) => { - let mut range = range.into_bounds(|b| b.datum()); - //XXX: why do we have to canonicalize on read? - range.canonicalize().expect("ranges must be valid"); - let range = range.inner.expect("empty range"); - - let lower = range.lower.bound.map(|row| { - i32::try_from(row.unwrap_numeric().0) - .expect("only i32 values converted to ranges") - }); - let upper = range.upper.bound.map(|row| { - i32::try_from(row.unwrap_numeric().0) - .expect("only i32 values converted to ranges") - }); - - match (range.lower.inclusive, range.upper.inclusive) { - (true, true) => { - assert_eq!(lower, upper); - Partitioned::with_partition(lower.unwrap(), MzOffset::from(offset)) - } - (false, false) => Partitioned::with_range(lower, upper, MzOffset::from(offset)), - _ => panic!("invalid timestamp"), - } - } - invalid_binding => unreachable!("invalid binding {:?}", invalid_binding), - } - } -} - /// Universal language for describing message positions in Materialize, in a source independent /// way. Individual sources like Kafka or File sources should explicitly implement their own offset /// type that converts to/From MzOffsets. A 0-MzOffset denotes an empty stream. @@ -623,90 +547,6 @@ impl PartialOrder for MzOffset { impl TotalOrder for MzOffset {} -/// Which piece of metadata a column corresponds to -#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] -pub enum KafkaMetadataKind { - Partition, - Offset, - Timestamp, - Headers, - Header { key: String, use_bytes: bool }, -} - -impl RustType for KafkaMetadataKind { - fn into_proto(&self) -> ProtoKafkaMetadataKind { - use proto_kafka_metadata_kind::Kind; - ProtoKafkaMetadataKind { - kind: Some(match self { - KafkaMetadataKind::Partition => Kind::Partition(()), - KafkaMetadataKind::Offset => Kind::Offset(()), - KafkaMetadataKind::Timestamp => Kind::Timestamp(()), - KafkaMetadataKind::Headers => Kind::Headers(()), - KafkaMetadataKind::Header { key, use_bytes } => Kind::Header(ProtoKafkaHeader { - key: key.clone(), - use_bytes: *use_bytes, - }), - }), - } - } - - fn from_proto(proto: ProtoKafkaMetadataKind) -> Result { - use proto_kafka_metadata_kind::Kind; - let kind = proto - .kind - .ok_or_else(|| TryFromProtoError::missing_field("ProtoKafkaMetadataKind::kind"))?; - Ok(match kind { - Kind::Partition(()) => KafkaMetadataKind::Partition, - Kind::Offset(()) => KafkaMetadataKind::Offset, - Kind::Timestamp(()) => KafkaMetadataKind::Timestamp, - Kind::Headers(()) => KafkaMetadataKind::Headers, - Kind::Header(ProtoKafkaHeader { key, use_bytes }) => { - KafkaMetadataKind::Header { key, use_bytes } - } - }) - } -} - -/// Whether and how to include the decoded key of a stream in dataflows -#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] -pub enum KeyEnvelope { - /// Never include the key in the output row - None, - /// For composite key encodings, pull the fields from the encoding into columns. - Flattened, - /// Always use the given name for the key. - /// - /// * For a single-field key, this means that the column will get the given name. - /// * For a multi-column key, the columns will get packed into a [`ScalarType::Record`], and - /// that Record will get the given name. - Named(String), -} - -impl RustType for KeyEnvelope { - fn into_proto(&self) -> ProtoKeyEnvelope { - use proto_key_envelope::Kind; - ProtoKeyEnvelope { - kind: Some(match self { - KeyEnvelope::None => Kind::None(()), - KeyEnvelope::Flattened => Kind::Flattened(()), - KeyEnvelope::Named(name) => Kind::Named(name.clone()), - }), - } - } - - fn from_proto(proto: ProtoKeyEnvelope) -> Result { - use proto_key_envelope::Kind; - let kind = proto - .kind - .ok_or_else(|| TryFromProtoError::missing_field("ProtoKeyEnvelope::kind"))?; - Ok(match kind { - Kind::None(()) => KeyEnvelope::None, - Kind::Flattened(()) => KeyEnvelope::Flattened, - Kind::Named(name) => KeyEnvelope::Named(name), - }) - } -} - /// The meaning of the timestamp number produced by data sources. This type /// is not concerned with the source of the timestamp (like if the data came /// from a Debezium consistency topic or a CDCv2 stream), instead only what the @@ -805,568 +645,6 @@ impl FromStr for Timeline { } } -/// `SourceEnvelope`s describe how to turn a stream of messages from `SourceDesc`s -/// into a _differential stream_, that is, a stream of (data, time, diff) -/// triples. -/// -/// PostgreSQL sources skip any explicit envelope handling, effectively -/// asserting that `SourceEnvelope` is `None` with `KeyEnvelope::None`. -#[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] -pub enum SourceEnvelope { - /// The most trivial version is `None`, which typically produces triples where the diff - /// is `1`. However, some sources are able to produce values with more exotic diff's, - /// such as the posgres source. Currently, this is the only variant usable with - /// those sources. - /// - /// If the `KeyEnvelope` is present, - /// include the key columns as an output column of the source with the given properties. - None(NoneEnvelope), - /// `Debezium` avoids holding onto previously seen values by trusting the required - /// `before` and `after` value fields coming from the upstream source. - Debezium(DebeziumEnvelope), - /// `Upsert` holds onto previously seen values and produces `1` or `-1` diffs depending on - /// whether or not the required _key_ outputed by the source has been seen before. This also - /// supports a `Debezium` mode. - Upsert(UpsertEnvelope), - /// `CdcV2` requires sources output messages in a strict form that requires a upstream-provided - /// timeline. - CdcV2, -} - -impl RustType for SourceEnvelope { - fn into_proto(&self) -> ProtoSourceEnvelope { - use proto_source_envelope::Kind; - ProtoSourceEnvelope { - kind: Some(match self { - SourceEnvelope::None(e) => Kind::None(e.into_proto()), - SourceEnvelope::Debezium(e) => Kind::Debezium(e.into_proto()), - SourceEnvelope::Upsert(e) => Kind::Upsert(e.into_proto()), - SourceEnvelope::CdcV2 => Kind::CdcV2(()), - }), - } - } - - fn from_proto(proto: ProtoSourceEnvelope) -> Result { - use proto_source_envelope::Kind; - let kind = proto - .kind - .ok_or_else(|| TryFromProtoError::missing_field("ProtoSourceEnvelope::kind"))?; - Ok(match kind { - Kind::None(e) => SourceEnvelope::None(e.into_rust()?), - Kind::Debezium(e) => SourceEnvelope::Debezium(e.into_rust()?), - Kind::Upsert(e) => SourceEnvelope::Upsert(e.into_rust()?), - Kind::CdcV2(()) => SourceEnvelope::CdcV2, - }) - } -} - -/// `UnplannedSourceEnvelope` is a `SourceEnvelope` missing some information. This information -/// is obtained in `UnplannedSourceEnvelope::desc`, where -/// `UnplannedSourceEnvelope::into_source_envelope` -/// creates a full `SourceEnvelope` -#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] -pub enum UnplannedSourceEnvelope { - None(KeyEnvelope), - Debezium(DebeziumEnvelope), - Upsert { style: UpsertStyle }, - CdcV2, -} - -#[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] -pub struct NoneEnvelope { - pub key_envelope: KeyEnvelope, - pub key_arity: usize, -} - -impl RustType for NoneEnvelope { - fn into_proto(&self) -> ProtoNoneEnvelope { - ProtoNoneEnvelope { - key_envelope: Some(self.key_envelope.into_proto()), - key_arity: self.key_arity.into_proto(), - } - } - - fn from_proto(proto: ProtoNoneEnvelope) -> Result { - Ok(NoneEnvelope { - key_envelope: proto - .key_envelope - .into_rust_if_some("ProtoNoneEnvelope::key_envelope")?, - key_arity: proto.key_arity.into_rust()?, - }) - } -} - -#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] -pub struct UpsertEnvelope { - /// Full arity, including the key columns - pub source_arity: usize, - /// What style of Upsert we are using - pub style: UpsertStyle, - /// The indices of the keys in the full value row, used - /// to deduplicate data in `upsert_core` - pub key_indices: Vec, -} - -impl Arbitrary for UpsertEnvelope { - type Strategy = BoxedStrategy; - type Parameters = (); - - fn arbitrary_with(_: Self::Parameters) -> Self::Strategy { - ( - any::(), - any::(), - proptest::collection::vec(any::(), 1..4), - ) - .prop_map(|(source_arity, style, key_indices)| Self { - source_arity, - style, - key_indices, - }) - .boxed() - } -} - -impl RustType for UpsertEnvelope { - fn into_proto(&self) -> ProtoUpsertEnvelope { - ProtoUpsertEnvelope { - source_arity: self.source_arity.into_proto(), - style: Some(self.style.into_proto()), - key_indices: self.key_indices.into_proto(), - } - } - - fn from_proto(proto: ProtoUpsertEnvelope) -> Result { - Ok(UpsertEnvelope { - source_arity: proto.source_arity.into_rust()?, - style: proto - .style - .into_rust_if_some("ProtoUpsertEnvelope::style")?, - key_indices: proto.key_indices.into_rust()?, - }) - } -} - -#[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] -pub enum UpsertStyle { - /// `ENVELOPE UPSERT`, where the key shape depends on the independent - /// `KeyEnvelope` - Default(KeyEnvelope), - /// `ENVELOPE DEBEZIUM UPSERT` - Debezium { after_idx: usize }, -} - -impl RustType for UpsertStyle { - fn into_proto(&self) -> ProtoUpsertStyle { - use proto_upsert_style::{Kind, ProtoDebezium}; - ProtoUpsertStyle { - kind: Some(match self { - UpsertStyle::Default(e) => Kind::Default(e.into_proto()), - UpsertStyle::Debezium { after_idx } => Kind::Debezium(ProtoDebezium { - after_idx: after_idx.into_proto(), - }), - }), - } - } - - fn from_proto(proto: ProtoUpsertStyle) -> Result { - use proto_upsert_style::Kind; - let kind = proto - .kind - .ok_or_else(|| TryFromProtoError::missing_field("ProtoUpsertStyle::kind"))?; - Ok(match kind { - Kind::Default(e) => UpsertStyle::Default(e.into_rust()?), - Kind::Debezium(d) => UpsertStyle::Debezium { - after_idx: d.after_idx.into_rust()?, - }, - }) - } -} - -#[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] -pub struct DebeziumEnvelope { - /// The column index containing the `before` row - pub before_idx: usize, - /// The column index containing the `after` row - pub after_idx: usize, - /// Details about how to deduplicate the data in the topic. - pub dedup: DebeziumDedupProjection, -} - -impl RustType for DebeziumEnvelope { - fn into_proto(&self) -> ProtoDebeziumEnvelope { - ProtoDebeziumEnvelope { - before_idx: self.before_idx.into_proto(), - after_idx: self.after_idx.into_proto(), - dedup: Some(self.dedup.into_proto()), - } - } - - fn from_proto(proto: ProtoDebeziumEnvelope) -> Result { - Ok(DebeziumEnvelope { - before_idx: proto.before_idx.into_rust()?, - after_idx: proto.after_idx.into_rust()?, - dedup: proto - .dedup - .into_rust_if_some("ProtoDebeziumEnvelope::dedup")?, - }) - } -} - -#[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] -pub struct DebeziumTransactionMetadata { - pub tx_metadata_global_id: GlobalId, - pub tx_status_idx: usize, - pub tx_transaction_id_idx: usize, - pub tx_data_collections_idx: usize, - pub tx_data_collections_data_collection_idx: usize, - pub tx_data_collections_event_count_idx: usize, - pub tx_data_collection_name: String, - /// The column index containing the debezium transaction metadata. - pub data_transaction_idx: usize, - pub data_transaction_id_idx: usize, -} - -impl RustType for DebeziumTransactionMetadata { - fn into_proto(&self) -> ProtoDebeziumTransactionMetadata { - ProtoDebeziumTransactionMetadata { - tx_metadata_global_id: Some(self.tx_metadata_global_id.into_proto()), - tx_status_idx: self.tx_status_idx.into_proto(), - tx_transaction_id_idx: self.tx_transaction_id_idx.into_proto(), - tx_data_collections_idx: self.tx_data_collections_idx.into_proto(), - tx_data_collections_data_collection_idx: self - .tx_data_collections_data_collection_idx - .into_proto(), - tx_data_collections_event_count_idx: self - .tx_data_collections_event_count_idx - .into_proto(), - tx_data_collection_name: self.tx_data_collection_name.clone(), - data_transaction_idx: self.data_transaction_idx.into_proto(), - data_transaction_id_idx: self.data_transaction_id_idx.into_proto(), - } - } - - fn from_proto(proto: ProtoDebeziumTransactionMetadata) -> Result { - Ok(DebeziumTransactionMetadata { - tx_metadata_global_id: proto - .tx_metadata_global_id - .into_rust_if_some("ProtoDebeziumTransactionMetadata::tx_metadata_global_id")?, - tx_status_idx: proto.tx_status_idx.into_rust()?, - tx_transaction_id_idx: proto.tx_transaction_id_idx.into_rust()?, - tx_data_collections_idx: proto.tx_data_collections_idx.into_rust()?, - tx_data_collections_data_collection_idx: proto - .tx_data_collections_data_collection_idx - .into_rust()?, - tx_data_collections_event_count_idx: proto - .tx_data_collections_event_count_idx - .into_rust()?, - tx_data_collection_name: proto.tx_data_collection_name, - data_transaction_idx: proto.data_transaction_idx.into_rust()?, - data_transaction_id_idx: proto.data_transaction_id_idx.into_rust()?, - }) - } -} - -#[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] -pub struct DebeziumDedupProjection { - /// The column index for the `op` field. - pub op_idx: usize, - /// The column index containing the debezium source metadata - pub source_idx: usize, - /// The record index of the `source.snapshot` field - pub snapshot_idx: usize, - /// The upstream database specific fields - pub source_projection: DebeziumSourceProjection, - /// Details about the transaction metadata. - pub tx_metadata: Option, -} - -impl RustType for DebeziumDedupProjection { - fn into_proto(&self) -> ProtoDebeziumDedupProjection { - ProtoDebeziumDedupProjection { - op_idx: self.op_idx.into_proto(), - source_idx: self.source_idx.into_proto(), - snapshot_idx: self.snapshot_idx.into_proto(), - source_projection: Some(self.source_projection.into_proto()), - tx_metadata: self.tx_metadata.into_proto(), - } - } - - fn from_proto(proto: ProtoDebeziumDedupProjection) -> Result { - Ok(DebeziumDedupProjection { - op_idx: proto.op_idx.into_rust()?, - source_idx: proto.source_idx.into_rust()?, - snapshot_idx: proto.snapshot_idx.into_rust()?, - source_projection: proto - .source_projection - .into_rust_if_some("ProtoDebeziumDedupProjection::source_projection")?, - tx_metadata: proto.tx_metadata.into_rust()?, - }) - } -} - -/// Debezium generates records that contain metadata about the upstream database. The structure of -/// this metadata depends on the type of connection used. This struct records the relevant indices -/// in the record, calculated during planning, so that the dataflow operator can unpack the -/// structure and extract the relevant information. -#[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] -pub enum DebeziumSourceProjection { - MySql { - file: usize, - pos: usize, - row: usize, - }, - Postgres { - sequence: usize, - lsn: usize, - }, - SqlServer { - change_lsn: usize, - event_serial_no: usize, - }, -} - -impl RustType for DebeziumSourceProjection { - fn into_proto(&self) -> ProtoDebeziumSourceProjection { - use proto_debezium_source_projection::{Kind, ProtoMySql, ProtoPostgres, ProtoSqlServer}; - ProtoDebeziumSourceProjection { - kind: Some(match self { - DebeziumSourceProjection::MySql { file, pos, row } => Kind::MySql(ProtoMySql { - file: file.into_proto(), - pos: pos.into_proto(), - row: row.into_proto(), - }), - DebeziumSourceProjection::Postgres { sequence, lsn } => { - Kind::Postgres(ProtoPostgres { - sequence: sequence.into_proto(), - lsn: lsn.into_proto(), - }) - } - DebeziumSourceProjection::SqlServer { - change_lsn, - event_serial_no, - } => Kind::SqlServer(ProtoSqlServer { - change_lsn: change_lsn.into_proto(), - event_serial_no: event_serial_no.into_proto(), - }), - }), - } - } - - fn from_proto(proto: ProtoDebeziumSourceProjection) -> Result { - use proto_debezium_source_projection::{Kind, ProtoMySql, ProtoPostgres, ProtoSqlServer}; - let kind = proto.kind.ok_or_else(|| { - TryFromProtoError::missing_field("ProtoDebeziumSourceProjection::kind") - })?; - Ok(match kind { - Kind::MySql(ProtoMySql { file, pos, row }) => DebeziumSourceProjection::MySql { - file: file.into_rust()?, - pos: pos.into_rust()?, - row: row.into_rust()?, - }, - Kind::Postgres(ProtoPostgres { sequence, lsn }) => DebeziumSourceProjection::Postgres { - sequence: sequence.into_rust()?, - lsn: lsn.into_rust()?, - }, - Kind::SqlServer(ProtoSqlServer { - change_lsn, - event_serial_no, - }) => DebeziumSourceProjection::SqlServer { - change_lsn: change_lsn.into_rust()?, - event_serial_no: event_serial_no.into_rust()?, - }, - }) - } -} - -/// Computes the indices of the value's relation description that appear in the key. -/// -/// Returns an error if it detects a common columns between the two relations that has the same -/// name but a different type, if a key column is missing from the value, and if the key relation -/// has a column with no name. -fn match_key_indices( - key_desc: &RelationDesc, - value_desc: &RelationDesc, -) -> anyhow::Result> { - let mut indices = Vec::new(); - for (name, key_type) in key_desc.iter() { - let (index, value_type) = value_desc - .get_by_name(name) - .ok_or_else(|| anyhow!("Value schema missing primary key column: {}", name))?; - - if key_type == value_type { - indices.push(index); - } else { - bail!( - "key and value column types do not match: key {:?} vs. value {:?}", - key_type, - value_type - ); - } - } - Ok(indices) -} - -impl UnplannedSourceEnvelope { - /// Transforms an `UnplannedSourceEnvelope` into a `SourceEnvelope` - /// - /// Panics if the input envelope is `UnplannedSourceEnvelope::Upsert` and - /// key is not passed as `Some` - // TODO(petrosagg): This API looks very error prone. Can we statically enforce it somehow? - fn into_source_envelope( - self, - key: Option>, - key_arity: Option, - source_arity: Option, - ) -> SourceEnvelope { - match self { - UnplannedSourceEnvelope::Upsert { - style: upsert_style, - } => SourceEnvelope::Upsert(UpsertEnvelope { - style: upsert_style, - key_indices: key.expect( - "into_source_envelope to be passed \ - correct parameters for UnplannedSourceEnvelope::Upsert", - ), - source_arity: source_arity.expect( - "into_source_envelope to be passed \ - correct parameters for UnplannedSourceEnvelope::Upsert", - ), - }), - UnplannedSourceEnvelope::Debezium(inner) => SourceEnvelope::Debezium(inner), - UnplannedSourceEnvelope::None(key_envelope) => SourceEnvelope::None(NoneEnvelope { - key_envelope, - key_arity: key_arity.unwrap_or(0), - }), - UnplannedSourceEnvelope::CdcV2 => SourceEnvelope::CdcV2, - } - } - - /// Computes the output relation of this envelope when applied on top of the decoded key and - /// value relation desc - pub fn desc( - self, - key_desc: Option, - value_desc: RelationDesc, - metadata_desc: RelationDesc, - ) -> anyhow::Result<(SourceEnvelope, RelationDesc)> { - Ok(match &self { - UnplannedSourceEnvelope::None(key_envelope) - | UnplannedSourceEnvelope::Upsert { - style: UpsertStyle::Default(key_envelope), - .. - } => { - let key_desc = match key_desc { - Some(desc) => desc, - None => { - return Ok(( - self.into_source_envelope(None, None, None), - value_desc.concat(metadata_desc), - )) - } - }; - let key_arity = key_desc.arity(); - - let (keyed, key) = match key_envelope { - KeyEnvelope::None => (value_desc, None), - KeyEnvelope::Flattened => { - // Add the key columns as a key. - let key_indices: Vec = (0..key_desc.arity()).collect(); - let key_desc = key_desc.with_key(key_indices.clone()); - (key_desc.concat(value_desc), Some(key_indices)) - } - KeyEnvelope::Named(key_name) => { - let key_desc = { - // if the key has multiple objects, nest them as a record inside of a single name - if key_desc.arity() > 1 { - let key_type = key_desc.typ(); - let key_as_record = RelationType::new(vec![ColumnType { - nullable: false, - scalar_type: ScalarType::Record { - fields: key_desc - .iter_names() - .zip(key_type.column_types.iter()) - .map(|(name, ty)| (name.clone(), ty.clone())) - .collect(), - custom_id: None, - }, - }]); - - RelationDesc::new(key_as_record, [key_name.to_string()]) - } else { - key_desc.with_names([key_name.to_string()]) - } - }; - let (key_desc, key) = match self { - UnplannedSourceEnvelope::None(_) => (key_desc, None), - // If we're applying the upsert logic the key column will be unique - UnplannedSourceEnvelope::Upsert { .. } => { - (key_desc.with_key(vec![0]), Some(vec![0])) - } - _ => unreachable!(), - }; - (key_desc.concat(value_desc), key) - } - }; - let desc = keyed.concat(metadata_desc); - ( - self.into_source_envelope(key, Some(key_arity), Some(desc.arity())), - desc, - ) - } - UnplannedSourceEnvelope::Debezium(DebeziumEnvelope { after_idx, .. }) - | UnplannedSourceEnvelope::Upsert { - style: UpsertStyle::Debezium { after_idx }, - .. - } => match &value_desc.typ().column_types[*after_idx].scalar_type { - ScalarType::Record { fields, .. } => { - let mut desc = RelationDesc::from_names_and_types(fields.clone()); - let key = key_desc.map(|k| match_key_indices(&k, &desc)).transpose()?; - if let Some(key) = key.clone() { - desc = desc.with_key(key); - } - - let desc = match self { - UnplannedSourceEnvelope::Upsert { .. } => desc.concat(metadata_desc), - _ => desc, - }; - - ( - self.into_source_envelope(key, None, Some(desc.arity())), - desc, - ) - } - ty => bail!( - "Incorrect type for Debezium value, expected Record, got {:?}", - ty - ), - }, - UnplannedSourceEnvelope::CdcV2 => { - // the correct types - - // CdcV2 row data are in a record in a record in a list - match &value_desc.typ().column_types[0].scalar_type { - ScalarType::List { element_type, .. } => match &**element_type { - ScalarType::Record { fields, .. } => { - // TODO maybe check this by name - match &fields[0].1.scalar_type { - ScalarType::Record { fields, .. } => ( - self.into_source_envelope(None, None, None), - RelationDesc::from_names_and_types(fields.clone()), - ), - ty => { - bail!("Unexpected type for MATERIALIZE envelope: {:?}", ty) - } - } - } - ty => bail!("Unexpected type for MATERIALIZE envelope: {:?}", ty), - }, - ty => bail!("Unexpected type for MATERIALIZE envelope: {:?}", ty), - } - } - }) - } -} - /// A connection to an external system pub trait SourceConnection: Debug + Clone + PartialEq + crate::AlterCompatible { /// The name of the external system (e.g kafka, postgres, etc). @@ -1388,278 +666,6 @@ pub trait SourceConnection: Debug + Clone + PartialEq + crate::AlterCompatible { fn metadata_columns(&self) -> Vec<(&str, ColumnType)>; } -#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] -pub struct KafkaSourceConnection { - pub connection: C::Kafka, - pub connection_id: GlobalId, - pub topic: String, - // Map from partition -> starting offset - pub start_offsets: BTreeMap, - pub group_id_prefix: Option, - pub metadata_columns: Vec<(String, KafkaMetadataKind)>, - pub topic_metadata_refresh_interval: Duration, -} - -impl IntoInlineConnection - for KafkaSourceConnection -{ - fn into_inline_connection(self, r: R) -> KafkaSourceConnection { - let KafkaSourceConnection { - connection, - connection_id, - topic, - start_offsets, - group_id_prefix, - metadata_columns, - topic_metadata_refresh_interval, - } = self; - KafkaSourceConnection { - connection: r.resolve_connection(connection).unwrap_kafka(), - connection_id, - topic, - start_offsets, - group_id_prefix, - metadata_columns, - topic_metadata_refresh_interval, - } - } -} - -pub static KAFKA_PROGRESS_DESC: Lazy = Lazy::new(|| { - RelationDesc::empty() - .with_column( - "partition", - ScalarType::Range { - element_type: Box::new(ScalarType::Numeric { max_scale: None }), - } - .nullable(false), - ) - .with_column("offset", ScalarType::UInt64.nullable(true)) -}); - -impl KafkaSourceConnection { - /// Returns the client ID to register with librdkafka with. - /// - /// The caller is responsible for providing the source ID as it is not known - /// to `KafkaSourceConnection`. - pub fn client_id(&self, connection_context: &ConnectionContext, source_id: GlobalId) -> String { - format!( - "materialize-{}-{}-{}", - connection_context.environment_id, self.connection_id, source_id, - ) - } - - /// Returns the ID for the consumer group the configured source will use. - /// - /// The caller is responsible for providing the source ID as it is not known - /// to `KafkaSourceConnection`. - pub fn group_id(&self, connection_context: &ConnectionContext, source_id: GlobalId) -> String { - format!( - "{}{}", - self.group_id_prefix.as_deref().unwrap_or(""), - self.client_id(connection_context, source_id) - ) - } -} - -impl SourceConnection for KafkaSourceConnection { - fn name(&self) -> &'static str { - "kafka" - } - - fn upstream_name(&self) -> Option<&str> { - Some(self.topic.as_str()) - } - - fn timestamp_desc(&self) -> RelationDesc { - KAFKA_PROGRESS_DESC.clone() - } - - fn connection_id(&self) -> Option { - Some(self.connection_id) - } - - fn metadata_columns(&self) -> Vec<(&str, ColumnType)> { - self.metadata_columns - .iter() - .map(|(name, kind)| { - let typ = match kind { - KafkaMetadataKind::Partition => ScalarType::Int32.nullable(false), - KafkaMetadataKind::Offset => ScalarType::UInt64.nullable(false), - KafkaMetadataKind::Timestamp => { - ScalarType::Timestamp { precision: None }.nullable(false) - } - KafkaMetadataKind::Header { - use_bytes: true, .. - } => ScalarType::Bytes.nullable(true), - KafkaMetadataKind::Header { - use_bytes: false, .. - } => ScalarType::String.nullable(true), - KafkaMetadataKind::Headers => ScalarType::List { - element_type: Box::new(ScalarType::Record { - fields: vec![ - ( - "key".into(), - ColumnType { - nullable: false, - scalar_type: ScalarType::String, - }, - ), - ( - "value".into(), - ColumnType { - nullable: false, - scalar_type: ScalarType::Bytes, - }, - ), - ], - custom_id: None, - }), - custom_id: None, - } - .nullable(false), - }; - (&**name, typ) - }) - .collect() - } -} - -impl crate::AlterCompatible for KafkaSourceConnection { - fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), StorageError> { - if self == other { - return Ok(()); - } - - let KafkaSourceConnection { - // Connection details may change - connection: _, - connection_id, - topic, - start_offsets, - group_id_prefix, - metadata_columns, - topic_metadata_refresh_interval, - } = self; - - let compatibility_checks = [ - (connection_id == &other.connection_id, "connection_id"), - (topic == &other.topic, "topic"), - (start_offsets == &other.start_offsets, "start_offsets"), - (group_id_prefix == &other.group_id_prefix, "group_id_prefix"), - ( - metadata_columns == &other.metadata_columns, - "metadata_columns", - ), - ( - topic_metadata_refresh_interval == &other.topic_metadata_refresh_interval, - "topic_metadata_refresh_interval", - ), - ]; - - for (compatible, field) in compatibility_checks { - if !compatible { - tracing::warn!( - "KafkaSourceConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}", - self, - other - ); - - return Err(StorageError::InvalidAlter { id }); - } - } - - Ok(()) - } -} - -impl Arbitrary for KafkaSourceConnection -where - <::Kafka as Arbitrary>::Strategy: 'static, -{ - type Strategy = BoxedStrategy; - type Parameters = (); - - fn arbitrary_with(_: Self::Parameters) -> Self::Strategy { - ( - any::(), - any::(), - any::(), - proptest::collection::btree_map(any::(), any::(), 1..4), - any::>(), - proptest::collection::vec(any::<(String, KafkaMetadataKind)>(), 0..4), - any::(), - ) - .prop_map( - |( - connection, - connection_id, - topic, - start_offsets, - group_id_prefix, - metadata_columns, - topic_metadata_refresh_interval, - )| KafkaSourceConnection { - connection, - connection_id, - topic, - start_offsets, - group_id_prefix, - metadata_columns, - topic_metadata_refresh_interval, - }, - ) - .boxed() - } -} - -impl RustType for KafkaSourceConnection { - fn into_proto(&self) -> ProtoKafkaSourceConnection { - ProtoKafkaSourceConnection { - connection: Some(self.connection.into_proto()), - connection_id: Some(self.connection_id.into_proto()), - topic: self.topic.clone(), - start_offsets: self.start_offsets.clone(), - group_id_prefix: self.group_id_prefix.clone(), - metadata_columns: self - .metadata_columns - .iter() - .map(|(name, kind)| ProtoKafkaMetadataColumn { - name: name.into_proto(), - kind: Some(kind.into_proto()), - }) - .collect(), - topic_metadata_refresh_interval: Some( - self.topic_metadata_refresh_interval.into_proto(), - ), - } - } - - fn from_proto(proto: ProtoKafkaSourceConnection) -> Result { - let mut metadata_columns = Vec::with_capacity(proto.metadata_columns.len()); - for c in proto.metadata_columns { - let kind = c.kind.into_rust_if_some("ProtoKafkaMetadataColumn::kind")?; - metadata_columns.push((c.name, kind)); - } - - Ok(KafkaSourceConnection { - connection: proto - .connection - .into_rust_if_some("ProtoKafkaSourceConnection::connection")?, - connection_id: proto - .connection_id - .into_rust_if_some("ProtoKafkaSourceConnection::connection_id")?, - topic: proto.topic, - start_offsets: proto.start_offsets, - group_id_prefix: proto.group_id_prefix, - metadata_columns, - topic_metadata_refresh_interval: proto - .topic_metadata_refresh_interval - .into_rust_if_some("ProtoKafkaSourceConnection::topic_metadata_refresh_interval")?, - }) - } -} - #[derive(Arbitrary, Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)] pub enum Compression { Gzip, @@ -2015,709 +1021,6 @@ impl RustType for GenericSourceConnection { - pub connection_id: GlobalId, - pub connection: C::Pg, - /// The cast expressions to convert the incoming string encoded rows to - /// their target types, keyed by their position in the source. - pub table_casts: BTreeMap>, - pub publication: String, - pub publication_details: PostgresSourcePublicationDetails, -} - -impl IntoInlineConnection - for PostgresSourceConnection -{ - fn into_inline_connection(self, r: R) -> PostgresSourceConnection { - let PostgresSourceConnection { - connection_id, - connection, - table_casts, - publication, - publication_details, - } = self; - - PostgresSourceConnection { - connection_id, - connection: r.resolve_connection(connection).unwrap_pg(), - table_casts, - publication, - publication_details, - } - } -} - -impl Arbitrary for PostgresSourceConnection { - type Strategy = BoxedStrategy; - type Parameters = (); - - fn arbitrary_with(_: Self::Parameters) -> Self::Strategy { - ( - any::(), - any::(), - proptest::collection::btree_map( - any::(), - proptest::collection::vec(any::(), 1..4), - 1..4, - ), - any::(), - any::(), - ) - .prop_map( - |(connection, connection_id, table_casts, publication, details)| Self { - connection, - connection_id, - table_casts, - publication, - publication_details: details, - }, - ) - .boxed() - } -} - -pub static PG_PROGRESS_DESC: Lazy = - Lazy::new(|| RelationDesc::empty().with_column("lsn", ScalarType::UInt64.nullable(true))); - -impl SourceConnection for PostgresSourceConnection { - fn name(&self) -> &'static str { - "postgres" - } - - fn upstream_name(&self) -> Option<&str> { - None - } - - fn timestamp_desc(&self) -> RelationDesc { - PG_PROGRESS_DESC.clone() - } - - fn connection_id(&self) -> Option { - Some(self.connection_id) - } - - fn metadata_columns(&self) -> Vec<(&str, ColumnType)> { - vec![] - } -} - -impl crate::AlterCompatible for PostgresSourceConnection { - fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), StorageError> { - if self == other { - return Ok(()); - } - - let PostgresSourceConnection { - connection_id, - // Connection details may change - connection: _, - table_casts, - publication, - publication_details, - } = self; - - let compatibility_checks = [ - (connection_id == &other.connection_id, "connection_id"), - ( - table_casts - .iter() - .merge_join_by(&other.table_casts, |(l_key, _), (r_key, _)| { - l_key.cmp(r_key) - }) - .all(|r| match r { - Both((_, l_val), (_, r_val)) => l_val == r_val, - _ => true, - }), - "table_casts", - ), - (publication == &other.publication, "publication"), - ( - publication_details == &other.publication_details, - "publication_details", - ), - ]; - - for (compatible, field) in compatibility_checks { - if !compatible { - tracing::warn!( - "PostgresSourceConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}", - self, - other - ); - - return Err(StorageError::InvalidAlter { id }); - } - } - - Ok(()) - } -} - -impl RustType for PostgresSourceConnection { - fn into_proto(&self) -> ProtoPostgresSourceConnection { - use proto_postgres_source_connection::ProtoPostgresTableCast; - let mut table_casts = Vec::with_capacity(self.table_casts.len()); - let mut table_cast_pos = Vec::with_capacity(self.table_casts.len()); - for (pos, table_cast_cols) in self.table_casts.iter() { - table_casts.push(ProtoPostgresTableCast { - column_casts: table_cast_cols - .iter() - .cloned() - .map(|cast| cast.into_proto()) - .collect(), - }); - table_cast_pos.push(mz_ore::cast::usize_to_u64(*pos)); - } - - ProtoPostgresSourceConnection { - connection: Some(self.connection.into_proto()), - connection_id: Some(self.connection_id.into_proto()), - publication: self.publication.clone(), - details: Some(self.publication_details.into_proto()), - table_casts, - table_cast_pos, - } - } - - fn from_proto(proto: ProtoPostgresSourceConnection) -> Result { - // If we get the wrong number of table cast positions, we have to just - // accept all of the table casts. This is somewhat harmless, as the - // worst thing that happens is that we generate unused snapshots from - // the upstream PG publication, and this will (hopefully) correct - // itself on the next version upgrade. - let table_cast_pos = if proto.table_casts.len() == proto.table_cast_pos.len() { - proto.table_cast_pos - } else { - (1..proto.table_casts.len() + 1) - .map(mz_ore::cast::usize_to_u64) - .collect() - }; - - let mut table_casts = BTreeMap::new(); - for (pos, cast) in table_cast_pos - .into_iter() - .zip_eq(proto.table_casts.into_iter()) - { - let mut column_casts = vec![]; - for cast in cast.column_casts { - column_casts.push(cast.into_rust()?); - } - table_casts.insert(mz_ore::cast::u64_to_usize(pos), column_casts); - } - - Ok(PostgresSourceConnection { - connection: proto - .connection - .into_rust_if_some("ProtoPostgresSourceConnection::connection")?, - connection_id: proto - .connection_id - .into_rust_if_some("ProtoPostgresSourceConnection::connection_id")?, - publication: proto.publication, - publication_details: proto - .details - .into_rust_if_some("ProtoPostgresSourceConnection::details")?, - table_casts, - }) - } -} - -#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] -pub struct PostgresSourcePublicationDetails { - pub tables: Vec, - pub slot: String, - /// The active timeline_id when this source was created - /// The None value indicates an unknown timeline, to account for sources that existed - /// prior to this field being introduced - pub timeline_id: Option, -} - -impl RustType for PostgresSourcePublicationDetails { - fn into_proto(&self) -> ProtoPostgresSourcePublicationDetails { - ProtoPostgresSourcePublicationDetails { - tables: self.tables.iter().map(|t| t.into_proto()).collect(), - slot: self.slot.clone(), - timeline_id: self.timeline_id.clone(), - } - } - - fn from_proto(proto: ProtoPostgresSourcePublicationDetails) -> Result { - Ok(PostgresSourcePublicationDetails { - tables: proto - .tables - .into_iter() - .map(mz_postgres_util::desc::PostgresTableDesc::from_proto) - .collect::>()?, - slot: proto.slot, - timeline_id: proto.timeline_id, - }) - } -} - -#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] -pub struct LoadGeneratorSourceConnection { - pub load_generator: LoadGenerator, - pub tick_micros: Option, -} - -pub static LOAD_GEN_PROGRESS_DESC: Lazy = - Lazy::new(|| RelationDesc::empty().with_column("offset", ScalarType::UInt64.nullable(true))); - -impl SourceConnection for LoadGeneratorSourceConnection { - fn name(&self) -> &'static str { - "load-generator" - } - - fn upstream_name(&self) -> Option<&str> { - None - } - - fn timestamp_desc(&self) -> RelationDesc { - LOAD_GEN_PROGRESS_DESC.clone() - } - - fn connection_id(&self) -> Option { - None - } - - fn metadata_columns(&self) -> Vec<(&str, ColumnType)> { - vec![] - } -} - -impl crate::AlterCompatible for LoadGeneratorSourceConnection {} - -#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] -pub enum LoadGenerator { - Auction, - Counter { - /// How many values will be emitted - /// before old ones are retracted, or `None` for - /// an append-only collection. - max_cardinality: Option, - }, - Datums, - Marketing, - Tpch { - count_supplier: i64, - count_part: i64, - count_customer: i64, - count_orders: i64, - count_clerk: i64, - }, -} - -impl LoadGenerator { - fn data_encoding_inner(&self) -> DataEncodingInner { - match self { - LoadGenerator::Auction => DataEncodingInner::RowCodec(RelationDesc::empty()), - LoadGenerator::Datums => { - let mut desc = - RelationDesc::empty().with_column("rowid", ScalarType::Int64.nullable(false)); - let typs = ScalarType::enumerate(); - let mut names = BTreeSet::new(); - for typ in typs { - // Cut out variant information from the debug print. - let mut name = format!("_{:?}", typ) - .split(' ') - .next() - .unwrap() - .to_lowercase(); - // Incase we ever have multiple variants of the same type, create - // unique names for them. - while names.contains(&name) { - name.push('_'); - } - names.insert(name.clone()); - desc = desc.with_column(name, typ.clone().nullable(true)); - } - DataEncodingInner::RowCodec(desc) - } - LoadGenerator::Counter { .. } => DataEncodingInner::RowCodec( - RelationDesc::empty().with_column("counter", ScalarType::Int64.nullable(false)), - ), - LoadGenerator::Marketing => DataEncodingInner::RowCodec(RelationDesc::empty()), - LoadGenerator::Tpch { .. } => DataEncodingInner::RowCodec(RelationDesc::empty()), - } - } - - pub fn data_encoding(&self) -> SourceDataEncoding { - SourceDataEncoding::Single(DataEncoding::new(self.data_encoding_inner())) - } - - /// Returns the list of table names and their column types that this generator generates - pub fn views(&self) -> Vec<(&str, RelationDesc)> { - match self { - LoadGenerator::Auction => vec![ - ( - "organizations", - RelationDesc::empty() - .with_column("id", ScalarType::Int64.nullable(false)) - .with_column("name", ScalarType::String.nullable(false)) - .with_key(vec![0]), - ), - ( - "users", - RelationDesc::empty() - .with_column("id", ScalarType::Int64.nullable(false)) - .with_column("org_id", ScalarType::Int64.nullable(false)) - .with_column("name", ScalarType::String.nullable(false)) - .with_key(vec![0]), - ), - ( - "accounts", - RelationDesc::empty() - .with_column("id", ScalarType::Int64.nullable(false)) - .with_column("org_id", ScalarType::Int64.nullable(false)) - .with_column("balance", ScalarType::Int64.nullable(false)) - .with_key(vec![0]), - ), - ( - "auctions", - RelationDesc::empty() - .with_column("id", ScalarType::Int64.nullable(false)) - .with_column("seller", ScalarType::Int64.nullable(false)) - .with_column("item", ScalarType::String.nullable(false)) - .with_column( - "end_time", - ScalarType::TimestampTz { precision: None }.nullable(false), - ) - .with_key(vec![0]), - ), - ( - "bids", - RelationDesc::empty() - .with_column("id", ScalarType::Int64.nullable(false)) - .with_column("buyer", ScalarType::Int64.nullable(false)) - .with_column("auction_id", ScalarType::Int64.nullable(false)) - .with_column("amount", ScalarType::Int32.nullable(false)) - .with_column( - "bid_time", - ScalarType::TimestampTz { precision: None }.nullable(false), - ) - .with_key(vec![0]), - ), - ], - LoadGenerator::Counter { max_cardinality: _ } => vec![], - LoadGenerator::Marketing => { - vec![ - ( - "customers", - RelationDesc::empty() - .with_column("id", ScalarType::Int64.nullable(false)) - .with_column("email", ScalarType::String.nullable(false)) - .with_column("income", ScalarType::Int64.nullable(false)) - .with_key(vec![0]), - ), - ( - "impressions", - RelationDesc::empty() - .with_column("id", ScalarType::Int64.nullable(false)) - .with_column("customer_id", ScalarType::Int64.nullable(false)) - .with_column("campaign_id", ScalarType::Int64.nullable(false)) - .with_column( - "impression_time", - ScalarType::TimestampTz { precision: None }.nullable(false), - ) - .with_key(vec![0]), - ), - ( - "clicks", - RelationDesc::empty() - .with_column("impression_id", ScalarType::Int64.nullable(false)) - .with_column( - "click_time", - ScalarType::TimestampTz { precision: None }.nullable(false), - ) - .without_keys(), - ), - ( - "leads", - RelationDesc::empty() - .with_column("id", ScalarType::Int64.nullable(false)) - .with_column("customer_id", ScalarType::Int64.nullable(false)) - .with_column( - "created_at", - ScalarType::TimestampTz { precision: None }.nullable(false), - ) - .with_column( - "converted_at", - ScalarType::TimestampTz { precision: None }.nullable(true), - ) - .with_column("conversion_amount", ScalarType::Int64.nullable(true)) - .with_key(vec![0]), - ), - ( - "coupons", - RelationDesc::empty() - .with_column("id", ScalarType::Int64.nullable(false)) - .with_column("lead_id", ScalarType::Int64.nullable(false)) - .with_column( - "created_at", - ScalarType::TimestampTz { precision: None }.nullable(false), - ) - .with_column("amount", ScalarType::Int64.nullable(false)) - .with_key(vec![0]), - ), - ( - "conversion_predictions", - RelationDesc::empty() - .with_column("lead_id", ScalarType::Int64.nullable(false)) - .with_column("experiment_bucket", ScalarType::String.nullable(false)) - .with_column( - "predicted_at", - ScalarType::TimestampTz { precision: None }.nullable(false), - ) - .with_column("score", ScalarType::Float64.nullable(false)) - .without_keys(), - ), - ] - } - LoadGenerator::Datums => vec![], - LoadGenerator::Tpch { .. } => { - let identifier = ScalarType::Int64.nullable(false); - let decimal = ScalarType::Numeric { - max_scale: Some(NumericMaxScale::try_from(2i64).unwrap()), - } - .nullable(false); - vec![ - ( - "supplier", - RelationDesc::empty() - .with_column("s_suppkey", identifier.clone()) - .with_column("s_name", ScalarType::String.nullable(false)) - .with_column("s_address", ScalarType::String.nullable(false)) - .with_column("s_nationkey", identifier.clone()) - .with_column("s_phone", ScalarType::String.nullable(false)) - .with_column("s_acctbal", decimal.clone()) - .with_column("s_comment", ScalarType::String.nullable(false)) - .with_key(vec![0]), - ), - ( - "part", - RelationDesc::empty() - .with_column("p_partkey", identifier.clone()) - .with_column("p_name", ScalarType::String.nullable(false)) - .with_column("p_mfgr", ScalarType::String.nullable(false)) - .with_column("p_brand", ScalarType::String.nullable(false)) - .with_column("p_type", ScalarType::String.nullable(false)) - .with_column("p_size", ScalarType::Int32.nullable(false)) - .with_column("p_container", ScalarType::String.nullable(false)) - .with_column("p_retailprice", decimal.clone()) - .with_column("p_comment", ScalarType::String.nullable(false)) - .with_key(vec![0]), - ), - ( - "partsupp", - RelationDesc::empty() - .with_column("ps_partkey", identifier.clone()) - .with_column("ps_suppkey", identifier.clone()) - .with_column("ps_availqty", ScalarType::Int32.nullable(false)) - .with_column("ps_supplycost", decimal.clone()) - .with_column("ps_comment", ScalarType::String.nullable(false)) - .with_key(vec![0, 1]), - ), - ( - "customer", - RelationDesc::empty() - .with_column("c_custkey", identifier.clone()) - .with_column("c_name", ScalarType::String.nullable(false)) - .with_column("c_address", ScalarType::String.nullable(false)) - .with_column("c_nationkey", identifier.clone()) - .with_column("c_phone", ScalarType::String.nullable(false)) - .with_column("c_acctbal", decimal.clone()) - .with_column("c_mktsegment", ScalarType::String.nullable(false)) - .with_column("c_comment", ScalarType::String.nullable(false)) - .with_key(vec![0]), - ), - ( - "orders", - RelationDesc::empty() - .with_column("o_orderkey", identifier.clone()) - .with_column("o_custkey", identifier.clone()) - .with_column("o_orderstatus", ScalarType::String.nullable(false)) - .with_column("o_totalprice", decimal.clone()) - .with_column("o_orderdate", ScalarType::Date.nullable(false)) - .with_column("o_orderpriority", ScalarType::String.nullable(false)) - .with_column("o_clerk", ScalarType::String.nullable(false)) - .with_column("o_shippriority", ScalarType::Int32.nullable(false)) - .with_column("o_comment", ScalarType::String.nullable(false)) - .with_key(vec![0]), - ), - ( - "lineitem", - RelationDesc::empty() - .with_column("l_orderkey", identifier.clone()) - .with_column("l_partkey", identifier.clone()) - .with_column("l_suppkey", identifier.clone()) - .with_column("l_linenumber", ScalarType::Int32.nullable(false)) - .with_column("l_quantity", decimal.clone()) - .with_column("l_extendedprice", decimal.clone()) - .with_column("l_discount", decimal.clone()) - .with_column("l_tax", decimal) - .with_column("l_returnflag", ScalarType::String.nullable(false)) - .with_column("l_linestatus", ScalarType::String.nullable(false)) - .with_column("l_shipdate", ScalarType::Date.nullable(false)) - .with_column("l_commitdate", ScalarType::Date.nullable(false)) - .with_column("l_receiptdate", ScalarType::Date.nullable(false)) - .with_column("l_shipinstruct", ScalarType::String.nullable(false)) - .with_column("l_shipmode", ScalarType::String.nullable(false)) - .with_column("l_comment", ScalarType::String.nullable(false)) - .with_key(vec![0, 3]), - ), - ( - "nation", - RelationDesc::empty() - .with_column("n_nationkey", identifier.clone()) - .with_column("n_name", ScalarType::String.nullable(false)) - .with_column("n_regionkey", identifier.clone()) - .with_column("n_comment", ScalarType::String.nullable(false)) - .with_key(vec![0]), - ), - ( - "region", - RelationDesc::empty() - .with_column("r_regionkey", identifier) - .with_column("r_name", ScalarType::String.nullable(false)) - .with_column("r_comment", ScalarType::String.nullable(false)) - .with_key(vec![0]), - ), - ] - } - } - } - - pub fn is_monotonic(&self) -> bool { - match self { - LoadGenerator::Auction => true, - LoadGenerator::Counter { - max_cardinality: None, - } => true, - LoadGenerator::Counter { .. } => false, - LoadGenerator::Marketing => false, - LoadGenerator::Datums => true, - LoadGenerator::Tpch { .. } => false, - } - } -} - -pub trait Generator { - /// Returns a function that produces rows and batch information. - fn by_seed( - &self, - now: NowFn, - seed: Option, - resume_offset: MzOffset, - ) -> Box, (Row, i64)>)>>; -} - -impl RustType for LoadGeneratorSourceConnection { - fn into_proto(&self) -> ProtoLoadGeneratorSourceConnection { - ProtoLoadGeneratorSourceConnection { - generator: Some(match &self.load_generator { - LoadGenerator::Auction => ProtoGenerator::Auction(()), - LoadGenerator::Counter { max_cardinality } => { - ProtoGenerator::Counter(ProtoCounterLoadGenerator { - max_cardinality: *max_cardinality, - }) - } - LoadGenerator::Marketing => ProtoGenerator::Marketing(()), - LoadGenerator::Tpch { - count_supplier, - count_part, - count_customer, - count_orders, - count_clerk, - } => ProtoGenerator::Tpch(ProtoTpchLoadGenerator { - count_supplier: *count_supplier, - count_part: *count_part, - count_customer: *count_customer, - count_orders: *count_orders, - count_clerk: *count_clerk, - }), - LoadGenerator::Datums => ProtoGenerator::Datums(()), - }), - tick_micros: self.tick_micros, - } - } - - fn from_proto(proto: ProtoLoadGeneratorSourceConnection) -> Result { - let generator = proto.generator.ok_or_else(|| { - TryFromProtoError::missing_field("ProtoLoadGeneratorSourceConnection::generator") - })?; - Ok(LoadGeneratorSourceConnection { - load_generator: match generator { - ProtoGenerator::Auction(()) => LoadGenerator::Auction, - ProtoGenerator::Counter(ProtoCounterLoadGenerator { max_cardinality }) => { - LoadGenerator::Counter { max_cardinality } - } - ProtoGenerator::Marketing(()) => LoadGenerator::Marketing, - ProtoGenerator::Tpch(ProtoTpchLoadGenerator { - count_supplier, - count_part, - count_customer, - count_orders, - count_clerk, - }) => LoadGenerator::Tpch { - count_supplier, - count_part, - count_customer, - count_orders, - count_clerk, - }, - ProtoGenerator::Datums(()) => LoadGenerator::Datums, - }, - tick_micros: proto.tick_micros, - }) - } -} - -#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] -pub struct TestScriptSourceConnection { - pub desc_json: String, -} - -pub static TEST_SCRIPT_PROGRESS_DESC: Lazy = - Lazy::new(|| RelationDesc::empty().with_column("offset", ScalarType::UInt64.nullable(true))); - -impl SourceConnection for TestScriptSourceConnection { - fn name(&self) -> &'static str { - "testscript" - } - - fn upstream_name(&self) -> Option<&str> { - None - } - - fn timestamp_desc(&self) -> RelationDesc { - TEST_SCRIPT_PROGRESS_DESC.clone() - } - - fn connection_id(&self) -> Option { - None - } - - fn metadata_columns(&self) -> Vec<(&str, ColumnType)> { - vec![] - } -} - -impl crate::AlterCompatible for TestScriptSourceConnection {} - -impl RustType for TestScriptSourceConnection { - fn into_proto(&self) -> ProtoTestScriptSourceConnection { - ProtoTestScriptSourceConnection { - desc_json: self.desc_json.clone(), - } - } - - fn from_proto(proto: ProtoTestScriptSourceConnection) -> Result { - Ok(TestScriptSourceConnection { - desc_json: proto.desc_json, - }) - } -} - #[derive(Arbitrary, Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] #[repr(transparent)] pub struct SourceData(pub Result); @@ -2925,7 +1228,7 @@ impl Schema for RelationDesc { #[cfg(test)] mod tests { - use mz_repr::is_no_stats_type; + use mz_repr::{is_no_stats_type, ScalarType}; use proptest::prelude::*; use proptest::strategy::ValueTree; diff --git a/src/storage-types/src/sources/envelope.proto b/src/storage-types/src/sources/envelope.proto new file mode 100644 index 0000000000000..de2b304af1707 --- /dev/null +++ b/src/storage-types/src/sources/envelope.proto @@ -0,0 +1,106 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +syntax = "proto3"; + +import "google/protobuf/empty.proto"; + +import "repr/src/global_id.proto"; + +package mz_storage_types.sources.envelope; + +message ProtoSourceEnvelope { + oneof kind { + ProtoNoneEnvelope none = 1; + ProtoDebeziumEnvelope debezium = 2; + ProtoUpsertEnvelope upsert = 3; + google.protobuf.Empty cdc_v2 = 4; + } +} + +message ProtoNoneEnvelope { + ProtoKeyEnvelope key_envelope = 1; + uint64 key_arity = 2; +} + +message ProtoKeyEnvelope { + oneof kind { + google.protobuf.Empty none = 1; + google.protobuf.Empty flattened = 2; + string named = 3; + } +} + +message ProtoUpsertEnvelope { + ProtoUpsertStyle style = 1; + repeated uint64 key_indices = 2; + uint64 source_arity = 3; + reserved 4; +} + +message ProtoUpsertStyle { + message ProtoDebezium { + uint64 after_idx = 1; + } + + oneof kind { + ProtoKeyEnvelope default = 1; + ProtoDebezium debezium = 2; + } +} + +message ProtoDebeziumEnvelope { + uint64 before_idx = 1; + uint64 after_idx = 2; + ProtoDebeziumDedupProjection dedup = 3; +} + +message ProtoDebeziumTransactionMetadata { + mz_repr.global_id.ProtoGlobalId tx_metadata_global_id = 1; + uint64 tx_status_idx = 2; + uint64 tx_transaction_id_idx = 3; + uint64 tx_data_collections_idx = 4; + uint64 tx_data_collections_data_collection_idx = 5; + uint64 tx_data_collections_event_count_idx = 6; + string tx_data_collection_name = 7; + uint64 data_transaction_idx = 8; + uint64 data_transaction_id_idx = 9; +} + +message ProtoDebeziumDedupProjection { + uint64 op_idx = 1; + uint64 source_idx = 2; + uint64 snapshot_idx = 3; + ProtoDebeziumSourceProjection source_projection = 4; + ProtoDebeziumTransactionMetadata tx_metadata = 6; +} + +message ProtoDebeziumSourceProjection { + message ProtoMySql { + uint64 file = 1; + uint64 pos = 2; + uint64 row = 3; + } + + message ProtoPostgres { + uint64 sequence = 1; + uint64 lsn = 2; + } + + message ProtoSqlServer { + uint64 change_lsn = 1; + uint64 event_serial_no = 2; + } + + oneof kind { + ProtoMySql my_sql = 1; + ProtoPostgres postgres = 2; + ProtoSqlServer sql_server = 3; + } +} diff --git a/src/storage-types/src/sources/envelope.rs b/src/storage-types/src/sources/envelope.rs new file mode 100644 index 0000000000000..87785e2e6b99f --- /dev/null +++ b/src/storage-types/src/sources/envelope.rs @@ -0,0 +1,624 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +//! Types related to source envelopes + +use anyhow::{anyhow, bail}; +use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError}; +use mz_repr::{ColumnType, GlobalId, RelationDesc, RelationType, ScalarType}; +use proptest::prelude::{any, Arbitrary, BoxedStrategy, Strategy}; +use proptest_derive::Arbitrary; +use serde::{Deserialize, Serialize}; + +include!(concat!( + env!("OUT_DIR"), + "/mz_storage_types.sources.envelope.rs" +)); + +/// `SourceEnvelope`s describe how to turn a stream of messages from `SourceDesc`s +/// into a _differential stream_, that is, a stream of (data, time, diff) +/// triples. +/// +/// PostgreSQL sources skip any explicit envelope handling, effectively +/// asserting that `SourceEnvelope` is `None` with `KeyEnvelope::None`. +#[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] +pub enum SourceEnvelope { + /// The most trivial version is `None`, which typically produces triples where the diff + /// is `1`. However, some sources are able to produce values with more exotic diff's, + /// such as the posgres source. Currently, this is the only variant usable with + /// those sources. + /// + /// If the `KeyEnvelope` is present, + /// include the key columns as an output column of the source with the given properties. + None(NoneEnvelope), + /// `Debezium` avoids holding onto previously seen values by trusting the required + /// `before` and `after` value fields coming from the upstream source. + Debezium(DebeziumEnvelope), + /// `Upsert` holds onto previously seen values and produces `1` or `-1` diffs depending on + /// whether or not the required _key_ outputed by the source has been seen before. This also + /// supports a `Debezium` mode. + Upsert(UpsertEnvelope), + /// `CdcV2` requires sources output messages in a strict form that requires a upstream-provided + /// timeline. + CdcV2, +} + +impl RustType for SourceEnvelope { + fn into_proto(&self) -> ProtoSourceEnvelope { + use proto_source_envelope::Kind; + ProtoSourceEnvelope { + kind: Some(match self { + SourceEnvelope::None(e) => Kind::None(e.into_proto()), + SourceEnvelope::Debezium(e) => Kind::Debezium(e.into_proto()), + SourceEnvelope::Upsert(e) => Kind::Upsert(e.into_proto()), + SourceEnvelope::CdcV2 => Kind::CdcV2(()), + }), + } + } + + fn from_proto(proto: ProtoSourceEnvelope) -> Result { + use proto_source_envelope::Kind; + let kind = proto + .kind + .ok_or_else(|| TryFromProtoError::missing_field("ProtoSourceEnvelope::kind"))?; + Ok(match kind { + Kind::None(e) => SourceEnvelope::None(e.into_rust()?), + Kind::Debezium(e) => SourceEnvelope::Debezium(e.into_rust()?), + Kind::Upsert(e) => SourceEnvelope::Upsert(e.into_rust()?), + Kind::CdcV2(()) => SourceEnvelope::CdcV2, + }) + } +} + +/// `UnplannedSourceEnvelope` is a `SourceEnvelope` missing some information. This information +/// is obtained in `UnplannedSourceEnvelope::desc`, where +/// `UnplannedSourceEnvelope::into_source_envelope` +/// creates a full `SourceEnvelope` +#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] +pub enum UnplannedSourceEnvelope { + None(KeyEnvelope), + Debezium(DebeziumEnvelope), + Upsert { style: UpsertStyle }, + CdcV2, +} + +#[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] +pub struct NoneEnvelope { + pub key_envelope: KeyEnvelope, + pub key_arity: usize, +} + +impl RustType for NoneEnvelope { + fn into_proto(&self) -> ProtoNoneEnvelope { + ProtoNoneEnvelope { + key_envelope: Some(self.key_envelope.into_proto()), + key_arity: self.key_arity.into_proto(), + } + } + + fn from_proto(proto: ProtoNoneEnvelope) -> Result { + Ok(NoneEnvelope { + key_envelope: proto + .key_envelope + .into_rust_if_some("ProtoNoneEnvelope::key_envelope")?, + key_arity: proto.key_arity.into_rust()?, + }) + } +} + +#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] +pub struct UpsertEnvelope { + /// Full arity, including the key columns + pub source_arity: usize, + /// What style of Upsert we are using + pub style: UpsertStyle, + /// The indices of the keys in the full value row, used + /// to deduplicate data in `upsert_core` + pub key_indices: Vec, +} + +impl Arbitrary for UpsertEnvelope { + type Strategy = BoxedStrategy; + type Parameters = (); + + fn arbitrary_with(_: Self::Parameters) -> Self::Strategy { + ( + any::(), + any::(), + proptest::collection::vec(any::(), 1..4), + ) + .prop_map(|(source_arity, style, key_indices)| Self { + source_arity, + style, + key_indices, + }) + .boxed() + } +} + +impl RustType for UpsertEnvelope { + fn into_proto(&self) -> ProtoUpsertEnvelope { + ProtoUpsertEnvelope { + source_arity: self.source_arity.into_proto(), + style: Some(self.style.into_proto()), + key_indices: self.key_indices.into_proto(), + } + } + + fn from_proto(proto: ProtoUpsertEnvelope) -> Result { + Ok(UpsertEnvelope { + source_arity: proto.source_arity.into_rust()?, + style: proto + .style + .into_rust_if_some("ProtoUpsertEnvelope::style")?, + key_indices: proto.key_indices.into_rust()?, + }) + } +} + +#[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] +pub enum UpsertStyle { + /// `ENVELOPE UPSERT`, where the key shape depends on the independent + /// `KeyEnvelope` + Default(KeyEnvelope), + /// `ENVELOPE DEBEZIUM UPSERT` + Debezium { after_idx: usize }, +} + +impl RustType for UpsertStyle { + fn into_proto(&self) -> ProtoUpsertStyle { + use proto_upsert_style::{Kind, ProtoDebezium}; + ProtoUpsertStyle { + kind: Some(match self { + UpsertStyle::Default(e) => Kind::Default(e.into_proto()), + UpsertStyle::Debezium { after_idx } => Kind::Debezium(ProtoDebezium { + after_idx: after_idx.into_proto(), + }), + }), + } + } + + fn from_proto(proto: ProtoUpsertStyle) -> Result { + use proto_upsert_style::Kind; + let kind = proto + .kind + .ok_or_else(|| TryFromProtoError::missing_field("ProtoUpsertStyle::kind"))?; + Ok(match kind { + Kind::Default(e) => UpsertStyle::Default(e.into_rust()?), + Kind::Debezium(d) => UpsertStyle::Debezium { + after_idx: d.after_idx.into_rust()?, + }, + }) + } +} + +#[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] +pub struct DebeziumEnvelope { + /// The column index containing the `before` row + pub before_idx: usize, + /// The column index containing the `after` row + pub after_idx: usize, + /// Details about how to deduplicate the data in the topic. + pub dedup: DebeziumDedupProjection, +} + +impl RustType for DebeziumEnvelope { + fn into_proto(&self) -> ProtoDebeziumEnvelope { + ProtoDebeziumEnvelope { + before_idx: self.before_idx.into_proto(), + after_idx: self.after_idx.into_proto(), + dedup: Some(self.dedup.into_proto()), + } + } + + fn from_proto(proto: ProtoDebeziumEnvelope) -> Result { + Ok(DebeziumEnvelope { + before_idx: proto.before_idx.into_rust()?, + after_idx: proto.after_idx.into_rust()?, + dedup: proto + .dedup + .into_rust_if_some("ProtoDebeziumEnvelope::dedup")?, + }) + } +} + +#[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] +pub struct DebeziumTransactionMetadata { + pub tx_metadata_global_id: GlobalId, + pub tx_status_idx: usize, + pub tx_transaction_id_idx: usize, + pub tx_data_collections_idx: usize, + pub tx_data_collections_data_collection_idx: usize, + pub tx_data_collections_event_count_idx: usize, + pub tx_data_collection_name: String, + /// The column index containing the debezium transaction metadata. + pub data_transaction_idx: usize, + pub data_transaction_id_idx: usize, +} + +impl RustType for DebeziumTransactionMetadata { + fn into_proto(&self) -> ProtoDebeziumTransactionMetadata { + ProtoDebeziumTransactionMetadata { + tx_metadata_global_id: Some(self.tx_metadata_global_id.into_proto()), + tx_status_idx: self.tx_status_idx.into_proto(), + tx_transaction_id_idx: self.tx_transaction_id_idx.into_proto(), + tx_data_collections_idx: self.tx_data_collections_idx.into_proto(), + tx_data_collections_data_collection_idx: self + .tx_data_collections_data_collection_idx + .into_proto(), + tx_data_collections_event_count_idx: self + .tx_data_collections_event_count_idx + .into_proto(), + tx_data_collection_name: self.tx_data_collection_name.clone(), + data_transaction_idx: self.data_transaction_idx.into_proto(), + data_transaction_id_idx: self.data_transaction_id_idx.into_proto(), + } + } + + fn from_proto(proto: ProtoDebeziumTransactionMetadata) -> Result { + Ok(DebeziumTransactionMetadata { + tx_metadata_global_id: proto + .tx_metadata_global_id + .into_rust_if_some("ProtoDebeziumTransactionMetadata::tx_metadata_global_id")?, + tx_status_idx: proto.tx_status_idx.into_rust()?, + tx_transaction_id_idx: proto.tx_transaction_id_idx.into_rust()?, + tx_data_collections_idx: proto.tx_data_collections_idx.into_rust()?, + tx_data_collections_data_collection_idx: proto + .tx_data_collections_data_collection_idx + .into_rust()?, + tx_data_collections_event_count_idx: proto + .tx_data_collections_event_count_idx + .into_rust()?, + tx_data_collection_name: proto.tx_data_collection_name, + data_transaction_idx: proto.data_transaction_idx.into_rust()?, + data_transaction_id_idx: proto.data_transaction_id_idx.into_rust()?, + }) + } +} + +#[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] +pub struct DebeziumDedupProjection { + /// The column index for the `op` field. + pub op_idx: usize, + /// The column index containing the debezium source metadata + pub source_idx: usize, + /// The record index of the `source.snapshot` field + pub snapshot_idx: usize, + /// The upstream database specific fields + pub source_projection: DebeziumSourceProjection, + /// Details about the transaction metadata. + pub tx_metadata: Option, +} + +impl RustType for DebeziumDedupProjection { + fn into_proto(&self) -> ProtoDebeziumDedupProjection { + ProtoDebeziumDedupProjection { + op_idx: self.op_idx.into_proto(), + source_idx: self.source_idx.into_proto(), + snapshot_idx: self.snapshot_idx.into_proto(), + source_projection: Some(self.source_projection.into_proto()), + tx_metadata: self.tx_metadata.into_proto(), + } + } + + fn from_proto(proto: ProtoDebeziumDedupProjection) -> Result { + Ok(DebeziumDedupProjection { + op_idx: proto.op_idx.into_rust()?, + source_idx: proto.source_idx.into_rust()?, + snapshot_idx: proto.snapshot_idx.into_rust()?, + source_projection: proto + .source_projection + .into_rust_if_some("ProtoDebeziumDedupProjection::source_projection")?, + tx_metadata: proto.tx_metadata.into_rust()?, + }) + } +} + +/// Debezium generates records that contain metadata about the upstream database. The structure of +/// this metadata depends on the type of connection used. This struct records the relevant indices +/// in the record, calculated during planning, so that the dataflow operator can unpack the +/// structure and extract the relevant information. +#[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] +pub enum DebeziumSourceProjection { + MySql { + file: usize, + pos: usize, + row: usize, + }, + Postgres { + sequence: usize, + lsn: usize, + }, + SqlServer { + change_lsn: usize, + event_serial_no: usize, + }, +} + +impl RustType for DebeziumSourceProjection { + fn into_proto(&self) -> ProtoDebeziumSourceProjection { + use proto_debezium_source_projection::{Kind, ProtoMySql, ProtoPostgres, ProtoSqlServer}; + ProtoDebeziumSourceProjection { + kind: Some(match self { + DebeziumSourceProjection::MySql { file, pos, row } => Kind::MySql(ProtoMySql { + file: file.into_proto(), + pos: pos.into_proto(), + row: row.into_proto(), + }), + DebeziumSourceProjection::Postgres { sequence, lsn } => { + Kind::Postgres(ProtoPostgres { + sequence: sequence.into_proto(), + lsn: lsn.into_proto(), + }) + } + DebeziumSourceProjection::SqlServer { + change_lsn, + event_serial_no, + } => Kind::SqlServer(ProtoSqlServer { + change_lsn: change_lsn.into_proto(), + event_serial_no: event_serial_no.into_proto(), + }), + }), + } + } + + fn from_proto(proto: ProtoDebeziumSourceProjection) -> Result { + use proto_debezium_source_projection::{Kind, ProtoMySql, ProtoPostgres, ProtoSqlServer}; + let kind = proto.kind.ok_or_else(|| { + TryFromProtoError::missing_field("ProtoDebeziumSourceProjection::kind") + })?; + Ok(match kind { + Kind::MySql(ProtoMySql { file, pos, row }) => DebeziumSourceProjection::MySql { + file: file.into_rust()?, + pos: pos.into_rust()?, + row: row.into_rust()?, + }, + Kind::Postgres(ProtoPostgres { sequence, lsn }) => DebeziumSourceProjection::Postgres { + sequence: sequence.into_rust()?, + lsn: lsn.into_rust()?, + }, + Kind::SqlServer(ProtoSqlServer { + change_lsn, + event_serial_no, + }) => DebeziumSourceProjection::SqlServer { + change_lsn: change_lsn.into_rust()?, + event_serial_no: event_serial_no.into_rust()?, + }, + }) + } +} + +/// Computes the indices of the value's relation description that appear in the key. +/// +/// Returns an error if it detects a common columns between the two relations that has the same +/// name but a different type, if a key column is missing from the value, and if the key relation +/// has a column with no name. +fn match_key_indices( + key_desc: &RelationDesc, + value_desc: &RelationDesc, +) -> anyhow::Result> { + let mut indices = Vec::new(); + for (name, key_type) in key_desc.iter() { + let (index, value_type) = value_desc + .get_by_name(name) + .ok_or_else(|| anyhow!("Value schema missing primary key column: {}", name))?; + + if key_type == value_type { + indices.push(index); + } else { + bail!( + "key and value column types do not match: key {:?} vs. value {:?}", + key_type, + value_type + ); + } + } + Ok(indices) +} + +impl UnplannedSourceEnvelope { + /// Transforms an `UnplannedSourceEnvelope` into a `SourceEnvelope` + /// + /// Panics if the input envelope is `UnplannedSourceEnvelope::Upsert` and + /// key is not passed as `Some` + // TODO(petrosagg): This API looks very error prone. Can we statically enforce it somehow? + fn into_source_envelope( + self, + key: Option>, + key_arity: Option, + source_arity: Option, + ) -> SourceEnvelope { + match self { + UnplannedSourceEnvelope::Upsert { + style: upsert_style, + } => SourceEnvelope::Upsert(UpsertEnvelope { + style: upsert_style, + key_indices: key.expect( + "into_source_envelope to be passed \ + correct parameters for UnplannedSourceEnvelope::Upsert", + ), + source_arity: source_arity.expect( + "into_source_envelope to be passed \ + correct parameters for UnplannedSourceEnvelope::Upsert", + ), + }), + UnplannedSourceEnvelope::Debezium(inner) => SourceEnvelope::Debezium(inner), + UnplannedSourceEnvelope::None(key_envelope) => SourceEnvelope::None(NoneEnvelope { + key_envelope, + key_arity: key_arity.unwrap_or(0), + }), + UnplannedSourceEnvelope::CdcV2 => SourceEnvelope::CdcV2, + } + } + + /// Computes the output relation of this envelope when applied on top of the decoded key and + /// value relation desc + pub fn desc( + self, + key_desc: Option, + value_desc: RelationDesc, + metadata_desc: RelationDesc, + ) -> anyhow::Result<(SourceEnvelope, RelationDesc)> { + Ok(match &self { + UnplannedSourceEnvelope::None(key_envelope) + | UnplannedSourceEnvelope::Upsert { + style: UpsertStyle::Default(key_envelope), + .. + } => { + let key_desc = match key_desc { + Some(desc) => desc, + None => { + return Ok(( + self.into_source_envelope(None, None, None), + value_desc.concat(metadata_desc), + )) + } + }; + let key_arity = key_desc.arity(); + + let (keyed, key) = match key_envelope { + KeyEnvelope::None => (value_desc, None), + KeyEnvelope::Flattened => { + // Add the key columns as a key. + let key_indices: Vec = (0..key_desc.arity()).collect(); + let key_desc = key_desc.with_key(key_indices.clone()); + (key_desc.concat(value_desc), Some(key_indices)) + } + KeyEnvelope::Named(key_name) => { + let key_desc = { + // if the key has multiple objects, nest them as a record inside of a single name + if key_desc.arity() > 1 { + let key_type = key_desc.typ(); + let key_as_record = RelationType::new(vec![ColumnType { + nullable: false, + scalar_type: ScalarType::Record { + fields: key_desc + .iter_names() + .zip(key_type.column_types.iter()) + .map(|(name, ty)| (name.clone(), ty.clone())) + .collect(), + custom_id: None, + }, + }]); + + RelationDesc::new(key_as_record, [key_name.to_string()]) + } else { + key_desc.with_names([key_name.to_string()]) + } + }; + let (key_desc, key) = match self { + UnplannedSourceEnvelope::None(_) => (key_desc, None), + // If we're applying the upsert logic the key column will be unique + UnplannedSourceEnvelope::Upsert { .. } => { + (key_desc.with_key(vec![0]), Some(vec![0])) + } + _ => unreachable!(), + }; + (key_desc.concat(value_desc), key) + } + }; + let desc = keyed.concat(metadata_desc); + ( + self.into_source_envelope(key, Some(key_arity), Some(desc.arity())), + desc, + ) + } + UnplannedSourceEnvelope::Debezium(DebeziumEnvelope { after_idx, .. }) + | UnplannedSourceEnvelope::Upsert { + style: UpsertStyle::Debezium { after_idx }, + .. + } => match &value_desc.typ().column_types[*after_idx].scalar_type { + ScalarType::Record { fields, .. } => { + let mut desc = RelationDesc::from_names_and_types(fields.clone()); + let key = key_desc.map(|k| match_key_indices(&k, &desc)).transpose()?; + if let Some(key) = key.clone() { + desc = desc.with_key(key); + } + + let desc = match self { + UnplannedSourceEnvelope::Upsert { .. } => desc.concat(metadata_desc), + _ => desc, + }; + + ( + self.into_source_envelope(key, None, Some(desc.arity())), + desc, + ) + } + ty => bail!( + "Incorrect type for Debezium value, expected Record, got {:?}", + ty + ), + }, + UnplannedSourceEnvelope::CdcV2 => { + // the correct types + + // CdcV2 row data are in a record in a record in a list + match &value_desc.typ().column_types[0].scalar_type { + ScalarType::List { element_type, .. } => match &**element_type { + ScalarType::Record { fields, .. } => { + // TODO maybe check this by name + match &fields[0].1.scalar_type { + ScalarType::Record { fields, .. } => ( + self.into_source_envelope(None, None, None), + RelationDesc::from_names_and_types(fields.clone()), + ), + ty => { + bail!("Unexpected type for MATERIALIZE envelope: {:?}", ty) + } + } + } + ty => bail!("Unexpected type for MATERIALIZE envelope: {:?}", ty), + }, + ty => bail!("Unexpected type for MATERIALIZE envelope: {:?}", ty), + } + } + }) + } +} + +/// Whether and how to include the decoded key of a stream in dataflows +#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] +pub enum KeyEnvelope { + /// Never include the key in the output row + None, + /// For composite key encodings, pull the fields from the encoding into columns. + Flattened, + /// Always use the given name for the key. + /// + /// * For a single-field key, this means that the column will get the given name. + /// * For a multi-column key, the columns will get packed into a [`ScalarType::Record`], and + /// that Record will get the given name. + Named(String), +} + +impl RustType for KeyEnvelope { + fn into_proto(&self) -> ProtoKeyEnvelope { + use proto_key_envelope::Kind; + ProtoKeyEnvelope { + kind: Some(match self { + KeyEnvelope::None => Kind::None(()), + KeyEnvelope::Flattened => Kind::Flattened(()), + KeyEnvelope::Named(name) => Kind::Named(name.clone()), + }), + } + } + + fn from_proto(proto: ProtoKeyEnvelope) -> Result { + use proto_key_envelope::Kind; + let kind = proto + .kind + .ok_or_else(|| TryFromProtoError::missing_field("ProtoKeyEnvelope::kind"))?; + Ok(match kind { + Kind::None(()) => KeyEnvelope::None, + Kind::Flattened(()) => KeyEnvelope::Flattened, + Kind::Named(name) => KeyEnvelope::Named(name), + }) + } +} diff --git a/src/storage-types/src/sources/kafka.proto b/src/storage-types/src/sources/kafka.proto new file mode 100644 index 0000000000000..b08dbf73c2789 --- /dev/null +++ b/src/storage-types/src/sources/kafka.proto @@ -0,0 +1,49 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +syntax = "proto3"; + +import "google/protobuf/empty.proto"; + +import "proto/src/proto.proto"; +import "repr/src/global_id.proto"; +import "storage-types/src/connections.proto"; + +package mz_storage_types.sources.kafka; + +message ProtoKafkaSourceConnection { + reserved 5, 6, 7, 8, 9, 10, 12, 14; + mz_storage_types.connections.ProtoKafkaConnection connection = 1; + mz_repr.global_id.ProtoGlobalId connection_id = 13; + string topic = 2; + map start_offsets = 3; + optional string group_id_prefix = 4; + repeated ProtoKafkaMetadataColumn metadata_columns = 11; + mz_proto.ProtoDuration topic_metadata_refresh_interval = 15; +} + +message ProtoKafkaMetadataColumn { + string name = 1; + ProtoKafkaMetadataKind kind = 2; +} + +message ProtoKafkaMetadataKind { + oneof kind { + google.protobuf.Empty partition = 1; + google.protobuf.Empty offset = 2; + google.protobuf.Empty timestamp = 3; + google.protobuf.Empty headers = 4; + ProtoKafkaHeader header = 5; + } +} + +message ProtoKafkaHeader { + string key = 1; + bool use_bytes = 2; +} diff --git a/src/storage-types/src/sources/kafka.rs b/src/storage-types/src/sources/kafka.rs new file mode 100644 index 0000000000000..fa49b7fddcfaa --- /dev/null +++ b/src/storage-types/src/sources/kafka.rs @@ -0,0 +1,436 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +//! Types related kafka sources + +use dec::OrderedDecimal; +use mz_expr::PartitionId; +use mz_proto::{IntoRustIfSome, RustType, TryFromProtoError}; +use mz_repr::adt::numeric::Numeric; +use mz_repr::{ColumnType, Datum, GlobalId, RelationDesc, Row, ScalarType}; +use mz_timely_util::order::{Interval, Partitioned, RangeBound}; +use once_cell::sync::Lazy; +use proptest::prelude::{any, Arbitrary, BoxedStrategy, Strategy}; +use proptest_derive::Arbitrary; +use serde::{Deserialize, Serialize}; +use std::collections::BTreeMap; +use std::time::Duration; + +use crate::connections::inline::{ + ConnectionAccess, ConnectionResolver, InlinedConnection, IntoInlineConnection, + ReferencedConnection, +}; +use crate::connections::ConnectionContext; +use crate::controller::StorageError; +use crate::sources::{MzOffset, SourceConnection, SourceTimestamp}; + +include!(concat!( + env!("OUT_DIR"), + "/mz_storage_types.sources.kafka.rs" +)); + +#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] +pub struct KafkaSourceConnection { + pub connection: C::Kafka, + pub connection_id: GlobalId, + pub topic: String, + // Map from partition -> starting offset + pub start_offsets: BTreeMap, + pub group_id_prefix: Option, + pub metadata_columns: Vec<(String, KafkaMetadataKind)>, + pub topic_metadata_refresh_interval: Duration, +} + +impl IntoInlineConnection + for KafkaSourceConnection +{ + fn into_inline_connection(self, r: R) -> KafkaSourceConnection { + let KafkaSourceConnection { + connection, + connection_id, + topic, + start_offsets, + group_id_prefix, + metadata_columns, + topic_metadata_refresh_interval, + } = self; + KafkaSourceConnection { + connection: r.resolve_connection(connection).unwrap_kafka(), + connection_id, + topic, + start_offsets, + group_id_prefix, + metadata_columns, + topic_metadata_refresh_interval, + } + } +} + +pub static KAFKA_PROGRESS_DESC: Lazy = Lazy::new(|| { + RelationDesc::empty() + .with_column( + "partition", + ScalarType::Range { + element_type: Box::new(ScalarType::Numeric { max_scale: None }), + } + .nullable(false), + ) + .with_column("offset", ScalarType::UInt64.nullable(true)) +}); + +impl KafkaSourceConnection { + /// Returns the client ID to register with librdkafka with. + /// + /// The caller is responsible for providing the source ID as it is not known + /// to `KafkaSourceConnection`. + pub fn client_id(&self, connection_context: &ConnectionContext, source_id: GlobalId) -> String { + format!( + "materialize-{}-{}-{}", + connection_context.environment_id, self.connection_id, source_id, + ) + } + + /// Returns the ID for the consumer group the configured source will use. + /// + /// The caller is responsible for providing the source ID as it is not known + /// to `KafkaSourceConnection`. + pub fn group_id(&self, connection_context: &ConnectionContext, source_id: GlobalId) -> String { + format!( + "{}{}", + self.group_id_prefix.as_deref().unwrap_or(""), + self.client_id(connection_context, source_id) + ) + } +} + +impl SourceConnection for KafkaSourceConnection { + fn name(&self) -> &'static str { + "kafka" + } + + fn upstream_name(&self) -> Option<&str> { + Some(self.topic.as_str()) + } + + fn timestamp_desc(&self) -> RelationDesc { + KAFKA_PROGRESS_DESC.clone() + } + + fn connection_id(&self) -> Option { + Some(self.connection_id) + } + + fn metadata_columns(&self) -> Vec<(&str, ColumnType)> { + self.metadata_columns + .iter() + .map(|(name, kind)| { + let typ = match kind { + KafkaMetadataKind::Partition => ScalarType::Int32.nullable(false), + KafkaMetadataKind::Offset => ScalarType::UInt64.nullable(false), + KafkaMetadataKind::Timestamp => { + ScalarType::Timestamp { precision: None }.nullable(false) + } + KafkaMetadataKind::Header { + use_bytes: true, .. + } => ScalarType::Bytes.nullable(true), + KafkaMetadataKind::Header { + use_bytes: false, .. + } => ScalarType::String.nullable(true), + KafkaMetadataKind::Headers => ScalarType::List { + element_type: Box::new(ScalarType::Record { + fields: vec![ + ( + "key".into(), + ColumnType { + nullable: false, + scalar_type: ScalarType::String, + }, + ), + ( + "value".into(), + ColumnType { + nullable: false, + scalar_type: ScalarType::Bytes, + }, + ), + ], + custom_id: None, + }), + custom_id: None, + } + .nullable(false), + }; + (&**name, typ) + }) + .collect() + } +} + +impl crate::AlterCompatible for KafkaSourceConnection { + fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), StorageError> { + if self == other { + return Ok(()); + } + + let KafkaSourceConnection { + // Connection details may change + connection: _, + connection_id, + topic, + start_offsets, + group_id_prefix, + metadata_columns, + topic_metadata_refresh_interval, + } = self; + + let compatibility_checks = [ + (connection_id == &other.connection_id, "connection_id"), + (topic == &other.topic, "topic"), + (start_offsets == &other.start_offsets, "start_offsets"), + (group_id_prefix == &other.group_id_prefix, "group_id_prefix"), + ( + metadata_columns == &other.metadata_columns, + "metadata_columns", + ), + ( + topic_metadata_refresh_interval == &other.topic_metadata_refresh_interval, + "topic_metadata_refresh_interval", + ), + ]; + + for (compatible, field) in compatibility_checks { + if !compatible { + tracing::warn!( + "KafkaSourceConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}", + self, + other + ); + + return Err(StorageError::InvalidAlter { id }); + } + } + + Ok(()) + } +} + +impl Arbitrary for KafkaSourceConnection +where + <::Kafka as Arbitrary>::Strategy: 'static, +{ + type Strategy = BoxedStrategy; + type Parameters = (); + + fn arbitrary_with(_: Self::Parameters) -> Self::Strategy { + ( + any::(), + any::(), + any::(), + proptest::collection::btree_map(any::(), any::(), 1..4), + any::>(), + proptest::collection::vec(any::<(String, KafkaMetadataKind)>(), 0..4), + any::(), + ) + .prop_map( + |( + connection, + connection_id, + topic, + start_offsets, + group_id_prefix, + metadata_columns, + topic_metadata_refresh_interval, + )| KafkaSourceConnection { + connection, + connection_id, + topic, + start_offsets, + group_id_prefix, + metadata_columns, + topic_metadata_refresh_interval, + }, + ) + .boxed() + } +} + +impl RustType for KafkaSourceConnection { + fn into_proto(&self) -> ProtoKafkaSourceConnection { + ProtoKafkaSourceConnection { + connection: Some(self.connection.into_proto()), + connection_id: Some(self.connection_id.into_proto()), + topic: self.topic.clone(), + start_offsets: self.start_offsets.clone(), + group_id_prefix: self.group_id_prefix.clone(), + metadata_columns: self + .metadata_columns + .iter() + .map(|(name, kind)| ProtoKafkaMetadataColumn { + name: name.into_proto(), + kind: Some(kind.into_proto()), + }) + .collect(), + topic_metadata_refresh_interval: Some( + self.topic_metadata_refresh_interval.into_proto(), + ), + } + } + + fn from_proto(proto: ProtoKafkaSourceConnection) -> Result { + let mut metadata_columns = Vec::with_capacity(proto.metadata_columns.len()); + for c in proto.metadata_columns { + let kind = c.kind.into_rust_if_some("ProtoKafkaMetadataColumn::kind")?; + metadata_columns.push((c.name, kind)); + } + + Ok(KafkaSourceConnection { + connection: proto + .connection + .into_rust_if_some("ProtoKafkaSourceConnection::connection")?, + connection_id: proto + .connection_id + .into_rust_if_some("ProtoKafkaSourceConnection::connection_id")?, + topic: proto.topic, + start_offsets: proto.start_offsets, + group_id_prefix: proto.group_id_prefix, + metadata_columns, + topic_metadata_refresh_interval: proto + .topic_metadata_refresh_interval + .into_rust_if_some("ProtoKafkaSourceConnection::topic_metadata_refresh_interval")?, + }) + } +} + +impl SourceTimestamp for Partitioned { + fn from_compat_ts(pid: PartitionId, offset: MzOffset) -> Self { + match pid { + PartitionId::Kafka(pid) => Partitioned::with_partition(pid, offset), + PartitionId::None => panic!("invalid partitioned partition {pid}"), + } + } + + fn try_into_compat_ts(&self) -> Option<(PartitionId, MzOffset)> { + let pid = self.partition()?; + Some((PartitionId::Kafka(*pid), *self.timestamp())) + } + + fn encode_row(&self) -> Row { + use mz_repr::adt::range; + let mut row = Row::with_capacity(2); + let mut packer = row.packer(); + + let to_numeric = |p: i32| Datum::from(OrderedDecimal(Numeric::from(p))); + + let (lower, upper) = match self.interval() { + Interval::Range(l, u) => match (l, u) { + (RangeBound::Bottom, RangeBound::Top) => { + ((Datum::Null, false), (Datum::Null, false)) + } + (RangeBound::Bottom, RangeBound::Elem(pid)) => { + ((Datum::Null, false), (to_numeric(*pid), false)) + } + (RangeBound::Elem(pid), RangeBound::Top) => { + ((to_numeric(*pid), false), (Datum::Null, false)) + } + (RangeBound::Elem(l_pid), RangeBound::Elem(u_pid)) => { + ((to_numeric(*l_pid), false), (to_numeric(*u_pid), false)) + } + o => unreachable!("don't know how to handle this partition {o:?}"), + }, + Interval::Point(pid) => ((to_numeric(*pid), true), (to_numeric(*pid), true)), + }; + + let offset = self.timestamp().offset; + + packer + .push_range(range::Range::new(Some(( + range::RangeBound::new(lower.0, lower.1), + range::RangeBound::new(upper.0, upper.1), + )))) + .expect("pushing range must not generate errors"); + + packer.push(Datum::UInt64(offset)); + row + } + fn decode_row(row: &Row) -> Self { + let mut datums = row.iter(); + + match (datums.next(), datums.next(), datums.next()) { + (Some(Datum::Range(range)), Some(Datum::UInt64(offset)), None) => { + let mut range = range.into_bounds(|b| b.datum()); + //XXX: why do we have to canonicalize on read? + range.canonicalize().expect("ranges must be valid"); + let range = range.inner.expect("empty range"); + + let lower = range.lower.bound.map(|row| { + i32::try_from(row.unwrap_numeric().0) + .expect("only i32 values converted to ranges") + }); + let upper = range.upper.bound.map(|row| { + i32::try_from(row.unwrap_numeric().0) + .expect("only i32 values converted to ranges") + }); + + match (range.lower.inclusive, range.upper.inclusive) { + (true, true) => { + assert_eq!(lower, upper); + Partitioned::with_partition(lower.unwrap(), MzOffset::from(offset)) + } + (false, false) => Partitioned::with_range(lower, upper, MzOffset::from(offset)), + _ => panic!("invalid timestamp"), + } + } + invalid_binding => unreachable!("invalid binding {:?}", invalid_binding), + } + } +} + +/// Which piece of metadata a column corresponds to +#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] +pub enum KafkaMetadataKind { + Partition, + Offset, + Timestamp, + Headers, + Header { key: String, use_bytes: bool }, +} + +impl RustType for KafkaMetadataKind { + fn into_proto(&self) -> ProtoKafkaMetadataKind { + use proto_kafka_metadata_kind::Kind; + ProtoKafkaMetadataKind { + kind: Some(match self { + KafkaMetadataKind::Partition => Kind::Partition(()), + KafkaMetadataKind::Offset => Kind::Offset(()), + KafkaMetadataKind::Timestamp => Kind::Timestamp(()), + KafkaMetadataKind::Headers => Kind::Headers(()), + KafkaMetadataKind::Header { key, use_bytes } => Kind::Header(ProtoKafkaHeader { + key: key.clone(), + use_bytes: *use_bytes, + }), + }), + } + } + + fn from_proto(proto: ProtoKafkaMetadataKind) -> Result { + use proto_kafka_metadata_kind::Kind; + let kind = proto + .kind + .ok_or_else(|| TryFromProtoError::missing_field("ProtoKafkaMetadataKind::kind"))?; + Ok(match kind { + Kind::Partition(()) => KafkaMetadataKind::Partition, + Kind::Offset(()) => KafkaMetadataKind::Offset, + Kind::Timestamp(()) => KafkaMetadataKind::Timestamp, + Kind::Headers(()) => KafkaMetadataKind::Headers, + Kind::Header(ProtoKafkaHeader { key, use_bytes }) => { + KafkaMetadataKind::Header { key, use_bytes } + } + }) + } +} diff --git a/src/storage-types/src/sources/load_generator.proto b/src/storage-types/src/sources/load_generator.proto new file mode 100644 index 0000000000000..064ab3d50de7c --- /dev/null +++ b/src/storage-types/src/sources/load_generator.proto @@ -0,0 +1,38 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +syntax = "proto3"; + +import "google/protobuf/empty.proto"; + +package mz_storage_types.sources.load_generator; + +message ProtoLoadGeneratorSourceConnection { + reserved 1; + oneof kind { + ProtoCounterLoadGenerator counter = 6; + google.protobuf.Empty auction = 3; + ProtoTpchLoadGenerator tpch = 4; + google.protobuf.Empty datums = 5; + google.protobuf.Empty marketing = 7; + } + optional uint64 tick_micros = 2; +} + +message ProtoCounterLoadGenerator { + optional uint64 max_cardinality = 1; +} + +message ProtoTpchLoadGenerator { + int64 count_supplier = 1; + int64 count_part = 2; + int64 count_customer = 3; + int64 count_orders = 4; + int64 count_clerk = 5; +} diff --git a/src/storage-types/src/sources/load_generator.rs b/src/storage-types/src/sources/load_generator.rs new file mode 100644 index 0000000000000..31af9cd3e7205 --- /dev/null +++ b/src/storage-types/src/sources/load_generator.rs @@ -0,0 +1,449 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +//! Types related to load generator sources + +use mz_ore::now::NowFn; +use mz_proto::{RustType, TryFromProtoError}; +use mz_repr::adt::numeric::NumericMaxScale; +use mz_repr::{ColumnType, GlobalId, RelationDesc, Row, ScalarType}; +use once_cell::sync::Lazy; +use proptest_derive::Arbitrary; +use serde::{Deserialize, Serialize}; +use std::collections::BTreeSet; +use timely::dataflow::operators::to_stream::Event; + +use crate::connections::inline::ConnectionAccess; +use crate::sources::encoding::{DataEncoding, DataEncodingInner, SourceDataEncoding}; +use crate::sources::{MzOffset, SourceConnection}; + +include!(concat!( + env!("OUT_DIR"), + "/mz_storage_types.sources.load_generator.rs" +)); + +#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] +pub struct LoadGeneratorSourceConnection { + pub load_generator: LoadGenerator, + pub tick_micros: Option, +} + +pub static LOAD_GEN_PROGRESS_DESC: Lazy = + Lazy::new(|| RelationDesc::empty().with_column("offset", ScalarType::UInt64.nullable(true))); + +impl SourceConnection for LoadGeneratorSourceConnection { + fn name(&self) -> &'static str { + "load-generator" + } + + fn upstream_name(&self) -> Option<&str> { + None + } + + fn timestamp_desc(&self) -> RelationDesc { + LOAD_GEN_PROGRESS_DESC.clone() + } + + fn connection_id(&self) -> Option { + None + } + + fn metadata_columns(&self) -> Vec<(&str, ColumnType)> { + vec![] + } +} + +impl crate::AlterCompatible for LoadGeneratorSourceConnection {} + +#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] +pub enum LoadGenerator { + Auction, + Counter { + /// How many values will be emitted + /// before old ones are retracted, or `None` for + /// an append-only collection. + max_cardinality: Option, + }, + Datums, + Marketing, + Tpch { + count_supplier: i64, + count_part: i64, + count_customer: i64, + count_orders: i64, + count_clerk: i64, + }, +} + +impl LoadGenerator { + fn data_encoding_inner(&self) -> DataEncodingInner { + match self { + LoadGenerator::Auction => DataEncodingInner::RowCodec(RelationDesc::empty()), + LoadGenerator::Datums => { + let mut desc = + RelationDesc::empty().with_column("rowid", ScalarType::Int64.nullable(false)); + let typs = ScalarType::enumerate(); + let mut names = BTreeSet::new(); + for typ in typs { + // Cut out variant information from the debug print. + let mut name = format!("_{:?}", typ) + .split(' ') + .next() + .unwrap() + .to_lowercase(); + // Incase we ever have multiple variants of the same type, create + // unique names for them. + while names.contains(&name) { + name.push('_'); + } + names.insert(name.clone()); + desc = desc.with_column(name, typ.clone().nullable(true)); + } + DataEncodingInner::RowCodec(desc) + } + LoadGenerator::Counter { .. } => DataEncodingInner::RowCodec( + RelationDesc::empty().with_column("counter", ScalarType::Int64.nullable(false)), + ), + LoadGenerator::Marketing => DataEncodingInner::RowCodec(RelationDesc::empty()), + LoadGenerator::Tpch { .. } => DataEncodingInner::RowCodec(RelationDesc::empty()), + } + } + + pub fn data_encoding(&self) -> SourceDataEncoding { + SourceDataEncoding::Single(DataEncoding::new(self.data_encoding_inner())) + } + + /// Returns the list of table names and their column types that this generator generates + pub fn views(&self) -> Vec<(&str, RelationDesc)> { + match self { + LoadGenerator::Auction => vec![ + ( + "organizations", + RelationDesc::empty() + .with_column("id", ScalarType::Int64.nullable(false)) + .with_column("name", ScalarType::String.nullable(false)) + .with_key(vec![0]), + ), + ( + "users", + RelationDesc::empty() + .with_column("id", ScalarType::Int64.nullable(false)) + .with_column("org_id", ScalarType::Int64.nullable(false)) + .with_column("name", ScalarType::String.nullable(false)) + .with_key(vec![0]), + ), + ( + "accounts", + RelationDesc::empty() + .with_column("id", ScalarType::Int64.nullable(false)) + .with_column("org_id", ScalarType::Int64.nullable(false)) + .with_column("balance", ScalarType::Int64.nullable(false)) + .with_key(vec![0]), + ), + ( + "auctions", + RelationDesc::empty() + .with_column("id", ScalarType::Int64.nullable(false)) + .with_column("seller", ScalarType::Int64.nullable(false)) + .with_column("item", ScalarType::String.nullable(false)) + .with_column( + "end_time", + ScalarType::TimestampTz { precision: None }.nullable(false), + ) + .with_key(vec![0]), + ), + ( + "bids", + RelationDesc::empty() + .with_column("id", ScalarType::Int64.nullable(false)) + .with_column("buyer", ScalarType::Int64.nullable(false)) + .with_column("auction_id", ScalarType::Int64.nullable(false)) + .with_column("amount", ScalarType::Int32.nullable(false)) + .with_column( + "bid_time", + ScalarType::TimestampTz { precision: None }.nullable(false), + ) + .with_key(vec![0]), + ), + ], + LoadGenerator::Counter { max_cardinality: _ } => vec![], + LoadGenerator::Marketing => { + vec![ + ( + "customers", + RelationDesc::empty() + .with_column("id", ScalarType::Int64.nullable(false)) + .with_column("email", ScalarType::String.nullable(false)) + .with_column("income", ScalarType::Int64.nullable(false)) + .with_key(vec![0]), + ), + ( + "impressions", + RelationDesc::empty() + .with_column("id", ScalarType::Int64.nullable(false)) + .with_column("customer_id", ScalarType::Int64.nullable(false)) + .with_column("campaign_id", ScalarType::Int64.nullable(false)) + .with_column( + "impression_time", + ScalarType::TimestampTz { precision: None }.nullable(false), + ) + .with_key(vec![0]), + ), + ( + "clicks", + RelationDesc::empty() + .with_column("impression_id", ScalarType::Int64.nullable(false)) + .with_column( + "click_time", + ScalarType::TimestampTz { precision: None }.nullable(false), + ) + .without_keys(), + ), + ( + "leads", + RelationDesc::empty() + .with_column("id", ScalarType::Int64.nullable(false)) + .with_column("customer_id", ScalarType::Int64.nullable(false)) + .with_column( + "created_at", + ScalarType::TimestampTz { precision: None }.nullable(false), + ) + .with_column( + "converted_at", + ScalarType::TimestampTz { precision: None }.nullable(true), + ) + .with_column("conversion_amount", ScalarType::Int64.nullable(true)) + .with_key(vec![0]), + ), + ( + "coupons", + RelationDesc::empty() + .with_column("id", ScalarType::Int64.nullable(false)) + .with_column("lead_id", ScalarType::Int64.nullable(false)) + .with_column( + "created_at", + ScalarType::TimestampTz { precision: None }.nullable(false), + ) + .with_column("amount", ScalarType::Int64.nullable(false)) + .with_key(vec![0]), + ), + ( + "conversion_predictions", + RelationDesc::empty() + .with_column("lead_id", ScalarType::Int64.nullable(false)) + .with_column("experiment_bucket", ScalarType::String.nullable(false)) + .with_column( + "predicted_at", + ScalarType::TimestampTz { precision: None }.nullable(false), + ) + .with_column("score", ScalarType::Float64.nullable(false)) + .without_keys(), + ), + ] + } + LoadGenerator::Datums => vec![], + LoadGenerator::Tpch { .. } => { + let identifier = ScalarType::Int64.nullable(false); + let decimal = ScalarType::Numeric { + max_scale: Some(NumericMaxScale::try_from(2i64).unwrap()), + } + .nullable(false); + vec![ + ( + "supplier", + RelationDesc::empty() + .with_column("s_suppkey", identifier.clone()) + .with_column("s_name", ScalarType::String.nullable(false)) + .with_column("s_address", ScalarType::String.nullable(false)) + .with_column("s_nationkey", identifier.clone()) + .with_column("s_phone", ScalarType::String.nullable(false)) + .with_column("s_acctbal", decimal.clone()) + .with_column("s_comment", ScalarType::String.nullable(false)) + .with_key(vec![0]), + ), + ( + "part", + RelationDesc::empty() + .with_column("p_partkey", identifier.clone()) + .with_column("p_name", ScalarType::String.nullable(false)) + .with_column("p_mfgr", ScalarType::String.nullable(false)) + .with_column("p_brand", ScalarType::String.nullable(false)) + .with_column("p_type", ScalarType::String.nullable(false)) + .with_column("p_size", ScalarType::Int32.nullable(false)) + .with_column("p_container", ScalarType::String.nullable(false)) + .with_column("p_retailprice", decimal.clone()) + .with_column("p_comment", ScalarType::String.nullable(false)) + .with_key(vec![0]), + ), + ( + "partsupp", + RelationDesc::empty() + .with_column("ps_partkey", identifier.clone()) + .with_column("ps_suppkey", identifier.clone()) + .with_column("ps_availqty", ScalarType::Int32.nullable(false)) + .with_column("ps_supplycost", decimal.clone()) + .with_column("ps_comment", ScalarType::String.nullable(false)) + .with_key(vec![0, 1]), + ), + ( + "customer", + RelationDesc::empty() + .with_column("c_custkey", identifier.clone()) + .with_column("c_name", ScalarType::String.nullable(false)) + .with_column("c_address", ScalarType::String.nullable(false)) + .with_column("c_nationkey", identifier.clone()) + .with_column("c_phone", ScalarType::String.nullable(false)) + .with_column("c_acctbal", decimal.clone()) + .with_column("c_mktsegment", ScalarType::String.nullable(false)) + .with_column("c_comment", ScalarType::String.nullable(false)) + .with_key(vec![0]), + ), + ( + "orders", + RelationDesc::empty() + .with_column("o_orderkey", identifier.clone()) + .with_column("o_custkey", identifier.clone()) + .with_column("o_orderstatus", ScalarType::String.nullable(false)) + .with_column("o_totalprice", decimal.clone()) + .with_column("o_orderdate", ScalarType::Date.nullable(false)) + .with_column("o_orderpriority", ScalarType::String.nullable(false)) + .with_column("o_clerk", ScalarType::String.nullable(false)) + .with_column("o_shippriority", ScalarType::Int32.nullable(false)) + .with_column("o_comment", ScalarType::String.nullable(false)) + .with_key(vec![0]), + ), + ( + "lineitem", + RelationDesc::empty() + .with_column("l_orderkey", identifier.clone()) + .with_column("l_partkey", identifier.clone()) + .with_column("l_suppkey", identifier.clone()) + .with_column("l_linenumber", ScalarType::Int32.nullable(false)) + .with_column("l_quantity", decimal.clone()) + .with_column("l_extendedprice", decimal.clone()) + .with_column("l_discount", decimal.clone()) + .with_column("l_tax", decimal) + .with_column("l_returnflag", ScalarType::String.nullable(false)) + .with_column("l_linestatus", ScalarType::String.nullable(false)) + .with_column("l_shipdate", ScalarType::Date.nullable(false)) + .with_column("l_commitdate", ScalarType::Date.nullable(false)) + .with_column("l_receiptdate", ScalarType::Date.nullable(false)) + .with_column("l_shipinstruct", ScalarType::String.nullable(false)) + .with_column("l_shipmode", ScalarType::String.nullable(false)) + .with_column("l_comment", ScalarType::String.nullable(false)) + .with_key(vec![0, 3]), + ), + ( + "nation", + RelationDesc::empty() + .with_column("n_nationkey", identifier.clone()) + .with_column("n_name", ScalarType::String.nullable(false)) + .with_column("n_regionkey", identifier.clone()) + .with_column("n_comment", ScalarType::String.nullable(false)) + .with_key(vec![0]), + ), + ( + "region", + RelationDesc::empty() + .with_column("r_regionkey", identifier) + .with_column("r_name", ScalarType::String.nullable(false)) + .with_column("r_comment", ScalarType::String.nullable(false)) + .with_key(vec![0]), + ), + ] + } + } + } + + pub fn is_monotonic(&self) -> bool { + match self { + LoadGenerator::Auction => true, + LoadGenerator::Counter { + max_cardinality: None, + } => true, + LoadGenerator::Counter { .. } => false, + LoadGenerator::Marketing => false, + LoadGenerator::Datums => true, + LoadGenerator::Tpch { .. } => false, + } + } +} + +pub trait Generator { + /// Returns a function that produces rows and batch information. + fn by_seed( + &self, + now: NowFn, + seed: Option, + resume_offset: MzOffset, + ) -> Box, (Row, i64)>)>>; +} + +impl RustType for LoadGeneratorSourceConnection { + fn into_proto(&self) -> ProtoLoadGeneratorSourceConnection { + use proto_load_generator_source_connection::Kind; + ProtoLoadGeneratorSourceConnection { + kind: Some(match &self.load_generator { + LoadGenerator::Auction => Kind::Auction(()), + LoadGenerator::Counter { max_cardinality } => { + Kind::Counter(ProtoCounterLoadGenerator { + max_cardinality: *max_cardinality, + }) + } + LoadGenerator::Marketing => Kind::Marketing(()), + LoadGenerator::Tpch { + count_supplier, + count_part, + count_customer, + count_orders, + count_clerk, + } => Kind::Tpch(ProtoTpchLoadGenerator { + count_supplier: *count_supplier, + count_part: *count_part, + count_customer: *count_customer, + count_orders: *count_orders, + count_clerk: *count_clerk, + }), + LoadGenerator::Datums => Kind::Datums(()), + }), + tick_micros: self.tick_micros, + } + } + + fn from_proto(proto: ProtoLoadGeneratorSourceConnection) -> Result { + use proto_load_generator_source_connection::Kind; + let kind = proto.kind.ok_or_else(|| { + TryFromProtoError::missing_field("ProtoLoadGeneratorSourceConnection::kind") + })?; + Ok(LoadGeneratorSourceConnection { + load_generator: match kind { + Kind::Auction(()) => LoadGenerator::Auction, + Kind::Counter(ProtoCounterLoadGenerator { max_cardinality }) => { + LoadGenerator::Counter { max_cardinality } + } + Kind::Marketing(()) => LoadGenerator::Marketing, + Kind::Tpch(ProtoTpchLoadGenerator { + count_supplier, + count_part, + count_customer, + count_orders, + count_clerk, + }) => LoadGenerator::Tpch { + count_supplier, + count_part, + count_customer, + count_orders, + count_clerk, + }, + Kind::Datums(()) => LoadGenerator::Datums, + }, + tick_micros: proto.tick_micros, + }) + } +} diff --git a/src/storage-types/src/sources/postgres.proto b/src/storage-types/src/sources/postgres.proto new file mode 100644 index 0000000000000..da298936e7764 --- /dev/null +++ b/src/storage-types/src/sources/postgres.proto @@ -0,0 +1,38 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +syntax = "proto3"; + +import "postgres-util/src/desc.proto"; +import "repr/src/global_id.proto"; +import "storage-types/src/connections.proto"; +import "expr/src/scalar.proto"; + +package mz_storage_types.sources.postgres; + +message ProtoPostgresSourceConnection { + message ProtoPostgresTableCast { + repeated mz_expr.scalar.ProtoMirScalarExpr column_casts = 1; + } + + mz_repr.global_id.ProtoGlobalId connection_id = 6; + mz_storage_types.connections.ProtoPostgresConnection connection = 1; + string publication = 2; + ProtoPostgresSourcePublicationDetails details = 4; + repeated ProtoPostgresTableCast table_casts = 5; + // Describes the position in the source's publication that the table cast + // correlates to; meant to be iterated over in tandem with table_casts + repeated uint64 table_cast_pos = 7; +} + +message ProtoPostgresSourcePublicationDetails { + repeated mz_postgres_util.desc.ProtoPostgresTableDesc tables = 1; + string slot = 2; + optional uint64 timeline_id = 3; +} diff --git a/src/storage-types/src/sources/postgres.rs b/src/storage-types/src/sources/postgres.rs new file mode 100644 index 0000000000000..f58a45c55a3f0 --- /dev/null +++ b/src/storage-types/src/sources/postgres.rs @@ -0,0 +1,272 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +//! Types related to postgres sources + +use itertools::EitherOrBoth::Both; +use itertools::Itertools; +use mz_expr::MirScalarExpr; +use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError}; +use mz_repr::{ColumnType, GlobalId, RelationDesc, ScalarType}; +use once_cell::sync::Lazy; +use proptest::prelude::{any, Arbitrary, BoxedStrategy, Strategy}; +use proptest_derive::Arbitrary; +use serde::{Deserialize, Serialize}; +use std::collections::BTreeMap; + +use crate::connections::inline::{ + ConnectionAccess, ConnectionResolver, InlinedConnection, IntoInlineConnection, + ReferencedConnection, +}; +use crate::controller::StorageError; +use crate::sources::SourceConnection; + +include!(concat!( + env!("OUT_DIR"), + "/mz_storage_types.sources.postgres.rs" +)); + +#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] +pub struct PostgresSourceConnection { + pub connection_id: GlobalId, + pub connection: C::Pg, + /// The cast expressions to convert the incoming string encoded rows to + /// their target types, keyed by their position in the source. + pub table_casts: BTreeMap>, + pub publication: String, + pub publication_details: PostgresSourcePublicationDetails, +} + +impl IntoInlineConnection + for PostgresSourceConnection +{ + fn into_inline_connection(self, r: R) -> PostgresSourceConnection { + let PostgresSourceConnection { + connection_id, + connection, + table_casts, + publication, + publication_details, + } = self; + + PostgresSourceConnection { + connection_id, + connection: r.resolve_connection(connection).unwrap_pg(), + table_casts, + publication, + publication_details, + } + } +} + +impl Arbitrary for PostgresSourceConnection { + type Strategy = BoxedStrategy; + type Parameters = (); + + fn arbitrary_with(_: Self::Parameters) -> Self::Strategy { + ( + any::(), + any::(), + proptest::collection::btree_map( + any::(), + proptest::collection::vec(any::(), 1..4), + 1..4, + ), + any::(), + any::(), + ) + .prop_map( + |(connection, connection_id, table_casts, publication, details)| Self { + connection, + connection_id, + table_casts, + publication, + publication_details: details, + }, + ) + .boxed() + } +} + +pub static PG_PROGRESS_DESC: Lazy = + Lazy::new(|| RelationDesc::empty().with_column("lsn", ScalarType::UInt64.nullable(true))); + +impl SourceConnection for PostgresSourceConnection { + fn name(&self) -> &'static str { + "postgres" + } + + fn upstream_name(&self) -> Option<&str> { + None + } + + fn timestamp_desc(&self) -> RelationDesc { + PG_PROGRESS_DESC.clone() + } + + fn connection_id(&self) -> Option { + Some(self.connection_id) + } + + fn metadata_columns(&self) -> Vec<(&str, ColumnType)> { + vec![] + } +} + +impl crate::AlterCompatible for PostgresSourceConnection { + fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), StorageError> { + if self == other { + return Ok(()); + } + + let PostgresSourceConnection { + connection_id, + // Connection details may change + connection: _, + table_casts, + publication, + publication_details, + } = self; + + let compatibility_checks = [ + (connection_id == &other.connection_id, "connection_id"), + ( + table_casts + .iter() + .merge_join_by(&other.table_casts, |(l_key, _), (r_key, _)| { + l_key.cmp(r_key) + }) + .all(|r| match r { + Both((_, l_val), (_, r_val)) => l_val == r_val, + _ => true, + }), + "table_casts", + ), + (publication == &other.publication, "publication"), + ( + publication_details == &other.publication_details, + "publication_details", + ), + ]; + + for (compatible, field) in compatibility_checks { + if !compatible { + tracing::warn!( + "PostgresSourceConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}", + self, + other + ); + + return Err(StorageError::InvalidAlter { id }); + } + } + + Ok(()) + } +} + +impl RustType for PostgresSourceConnection { + fn into_proto(&self) -> ProtoPostgresSourceConnection { + use proto_postgres_source_connection::ProtoPostgresTableCast; + let mut table_casts = Vec::with_capacity(self.table_casts.len()); + let mut table_cast_pos = Vec::with_capacity(self.table_casts.len()); + for (pos, table_cast_cols) in self.table_casts.iter() { + table_casts.push(ProtoPostgresTableCast { + column_casts: table_cast_cols + .iter() + .cloned() + .map(|cast| cast.into_proto()) + .collect(), + }); + table_cast_pos.push(mz_ore::cast::usize_to_u64(*pos)); + } + + ProtoPostgresSourceConnection { + connection: Some(self.connection.into_proto()), + connection_id: Some(self.connection_id.into_proto()), + publication: self.publication.clone(), + details: Some(self.publication_details.into_proto()), + table_casts, + table_cast_pos, + } + } + + fn from_proto(proto: ProtoPostgresSourceConnection) -> Result { + // If we get the wrong number of table cast positions, we have to just + // accept all of the table casts. This is somewhat harmless, as the + // worst thing that happens is that we generate unused snapshots from + // the upstream PG publication, and this will (hopefully) correct + // itself on the next version upgrade. + let table_cast_pos = if proto.table_casts.len() == proto.table_cast_pos.len() { + proto.table_cast_pos + } else { + (1..proto.table_casts.len() + 1) + .map(mz_ore::cast::usize_to_u64) + .collect() + }; + + let mut table_casts = BTreeMap::new(); + for (pos, cast) in table_cast_pos + .into_iter() + .zip_eq(proto.table_casts.into_iter()) + { + let mut column_casts = vec![]; + for cast in cast.column_casts { + column_casts.push(cast.into_rust()?); + } + table_casts.insert(mz_ore::cast::u64_to_usize(pos), column_casts); + } + + Ok(PostgresSourceConnection { + connection: proto + .connection + .into_rust_if_some("ProtoPostgresSourceConnection::connection")?, + connection_id: proto + .connection_id + .into_rust_if_some("ProtoPostgresSourceConnection::connection_id")?, + publication: proto.publication, + publication_details: proto + .details + .into_rust_if_some("ProtoPostgresSourceConnection::details")?, + table_casts, + }) + } +} + +#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] +pub struct PostgresSourcePublicationDetails { + pub tables: Vec, + pub slot: String, + /// The active timeline_id when this source was created + /// The None value indicates an unknown timeline, to account for sources that existed + /// prior to this field being introduced + pub timeline_id: Option, +} + +impl RustType for PostgresSourcePublicationDetails { + fn into_proto(&self) -> ProtoPostgresSourcePublicationDetails { + ProtoPostgresSourcePublicationDetails { + tables: self.tables.iter().map(|t| t.into_proto()).collect(), + slot: self.slot.clone(), + timeline_id: self.timeline_id.clone(), + } + } + + fn from_proto(proto: ProtoPostgresSourcePublicationDetails) -> Result { + Ok(PostgresSourcePublicationDetails { + tables: proto + .tables + .into_iter() + .map(mz_postgres_util::desc::PostgresTableDesc::from_proto) + .collect::>()?, + slot: proto.slot, + timeline_id: proto.timeline_id, + }) + } +} diff --git a/src/storage-types/src/sources/testscript.proto b/src/storage-types/src/sources/testscript.proto new file mode 100644 index 0000000000000..3463a80ac673a --- /dev/null +++ b/src/storage-types/src/sources/testscript.proto @@ -0,0 +1,16 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +syntax = "proto3"; + +package mz_storage_types.sources.testscript; + +message ProtoTestScriptSourceConnection { + string desc_json = 1; +} diff --git a/src/storage-types/src/sources/testscript.rs b/src/storage-types/src/sources/testscript.rs new file mode 100644 index 0000000000000..184f845a3310f --- /dev/null +++ b/src/storage-types/src/sources/testscript.rs @@ -0,0 +1,69 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +//! Types related to testscript sources + +use mz_proto::{RustType, TryFromProtoError}; +use mz_repr::{ColumnType, GlobalId, RelationDesc, ScalarType}; +use once_cell::sync::Lazy; +use proptest_derive::Arbitrary; +use serde::{Deserialize, Serialize}; + +use crate::sources::SourceConnection; + +include!(concat!( + env!("OUT_DIR"), + "/mz_storage_types.sources.testscript.rs" +)); + +#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] +pub struct TestScriptSourceConnection { + pub desc_json: String, +} + +pub static TEST_SCRIPT_PROGRESS_DESC: Lazy = + Lazy::new(|| RelationDesc::empty().with_column("offset", ScalarType::UInt64.nullable(true))); + +impl SourceConnection for TestScriptSourceConnection { + fn name(&self) -> &'static str { + "testscript" + } + + fn upstream_name(&self) -> Option<&str> { + None + } + + fn timestamp_desc(&self) -> RelationDesc { + TEST_SCRIPT_PROGRESS_DESC.clone() + } + + fn connection_id(&self) -> Option { + None + } + + fn metadata_columns(&self) -> Vec<(&str, ColumnType)> { + vec![] + } +} + +impl crate::AlterCompatible for TestScriptSourceConnection {} + +impl RustType for TestScriptSourceConnection { + fn into_proto(&self) -> ProtoTestScriptSourceConnection { + ProtoTestScriptSourceConnection { + desc_json: self.desc_json.clone(), + } + } + + fn from_proto(proto: ProtoTestScriptSourceConnection) -> Result { + Ok(TestScriptSourceConnection { + desc_json: proto.desc_json, + }) + } +} diff --git a/src/storage/src/render/debezium.rs b/src/storage/src/render/debezium.rs index 9564835dc7229..7afc8cd9ff50f 100644 --- a/src/storage/src/render/debezium.rs +++ b/src/storage/src/render/debezium.rs @@ -16,10 +16,11 @@ use mz_expr::EvalError; use mz_ore::cast::CastFrom; use mz_repr::{Datum, Diff, Row, Timestamp}; use mz_storage_types::errors::{DataflowError, EnvelopeError}; -use mz_storage_types::sources::{ +use mz_storage_types::sources::envelope::{ DebeziumDedupProjection, DebeziumEnvelope, DebeziumSourceProjection, - DebeziumTransactionMetadata, MzOffset, + DebeziumTransactionMetadata, }; +use mz_storage_types::sources::MzOffset; use timely::dataflow::channels::pact::{Exchange, Pipeline}; use timely::dataflow::operators::{Capability, OkErr, Operator}; use timely::dataflow::{Scope, ScopeParent}; diff --git a/src/storage/src/render/sources.rs b/src/storage/src/render/sources.rs index 940f76bed57b1..ccec64aed0ad5 100644 --- a/src/storage/src/render/sources.rs +++ b/src/storage/src/render/sources.rs @@ -27,6 +27,7 @@ use mz_storage_types::errors::{ }; use mz_storage_types::parameters::StorageMaxInflightBytesConfig; use mz_storage_types::sources::encoding::*; +use mz_storage_types::sources::envelope::{KeyEnvelope, NoneEnvelope, UpsertEnvelope, UpsertStyle}; use mz_storage_types::sources::*; use mz_timely_util::builder_async::PressOnDropButton; use mz_timely_util::operator::CollectionExt; diff --git a/src/storage/src/render/upsert.rs b/src/storage/src/render/upsert.rs index 0c838932d7c79..ab0ce41c994ee 100644 --- a/src/storage/src/render/upsert.rs +++ b/src/storage/src/render/upsert.rs @@ -24,7 +24,7 @@ use mz_repr::{Datum, DatumVec, Diff, Row}; use mz_storage_operators::metrics::BackpressureMetrics; use mz_storage_types::configuration::StorageConfiguration; use mz_storage_types::errors::{DataflowError, EnvelopeError, UpsertError}; -use mz_storage_types::sources::UpsertEnvelope; +use mz_storage_types::sources::envelope::UpsertEnvelope; use mz_timely_util::builder_async::{ AsyncOutputHandle, Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton, diff --git a/src/storage/src/source/generator.rs b/src/storage/src/source/generator.rs index 98684976d406b..216b46deb0de8 100644 --- a/src/storage/src/source/generator.rs +++ b/src/storage/src/source/generator.rs @@ -13,9 +13,10 @@ use std::time::Duration; use differential_dataflow::{AsCollection, Collection}; use mz_ore::collections::CollectionExt; use mz_repr::{Diff, Row}; -use mz_storage_types::sources::{ - Generator, LoadGenerator, LoadGeneratorSourceConnection, MzOffset, SourceTimestamp, +use mz_storage_types::sources::load_generator::{ + Generator, LoadGenerator, LoadGeneratorSourceConnection, }; +use mz_storage_types::sources::{MzOffset, SourceTimestamp}; use mz_timely_util::builder_async::{OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton}; use timely::dataflow::operators::to_stream::Event; use timely::dataflow::operators::ToStream; diff --git a/src/storage/src/source/generator/auction.rs b/src/storage/src/source/generator/auction.rs index 0b3cd2f8a9ecc..3e2c265c4ba9c 100644 --- a/src/storage/src/source/generator/auction.rs +++ b/src/storage/src/source/generator/auction.rs @@ -12,7 +12,8 @@ use std::iter; use mz_ore::now::{to_datetime, NowFn}; use mz_repr::{Datum, Row}; -use mz_storage_types::sources::{Generator, MzOffset}; +use mz_storage_types::sources::load_generator::Generator; +use mz_storage_types::sources::MzOffset; use rand::prelude::{Rng, SmallRng}; use rand::seq::SliceRandom; use rand::SeedableRng; diff --git a/src/storage/src/source/generator/counter.rs b/src/storage/src/source/generator/counter.rs index 1b42501c3909e..a4658c5e9c003 100644 --- a/src/storage/src/source/generator/counter.rs +++ b/src/storage/src/source/generator/counter.rs @@ -9,7 +9,8 @@ use mz_ore::now::NowFn; use mz_repr::{Datum, Row}; -use mz_storage_types::sources::{Generator, MzOffset}; +use mz_storage_types::sources::load_generator::Generator; +use mz_storage_types::sources::MzOffset; use timely::dataflow::operators::to_stream::Event; pub struct Counter { diff --git a/src/storage/src/source/generator/datums.rs b/src/storage/src/source/generator/datums.rs index 0e532c735886c..6a22d1a566b4f 100644 --- a/src/storage/src/source/generator/datums.rs +++ b/src/storage/src/source/generator/datums.rs @@ -11,7 +11,8 @@ use std::iter; use mz_ore::now::NowFn; use mz_repr::{Datum, Row, ScalarType}; -use mz_storage_types::sources::{Generator, MzOffset}; +use mz_storage_types::sources::load_generator::Generator; +use mz_storage_types::sources::MzOffset; use timely::dataflow::operators::to_stream::Event; pub struct Datums {} diff --git a/src/storage/src/source/generator/marketing.rs b/src/storage/src/source/generator/marketing.rs index af48780b2642f..238462e36a876 100644 --- a/src/storage/src/source/generator/marketing.rs +++ b/src/storage/src/source/generator/marketing.rs @@ -14,7 +14,8 @@ use std::{ use mz_ore::now::to_datetime; use mz_repr::{Datum, Row}; -use mz_storage_types::sources::{Generator, MzOffset}; +use mz_storage_types::sources::load_generator::Generator; +use mz_storage_types::sources::MzOffset; use rand::{distributions::Standard, rngs::SmallRng, Rng, SeedableRng}; use timely::dataflow::operators::to_stream::Event; diff --git a/src/storage/src/source/generator/tpch.rs b/src/storage/src/source/generator/tpch.rs index 60fbff074a2aa..05d67edb7d679 100644 --- a/src/storage/src/source/generator/tpch.rs +++ b/src/storage/src/source/generator/tpch.rs @@ -19,7 +19,8 @@ use mz_ore::now::NowFn; use mz_repr::adt::date::Date; use mz_repr::adt::numeric::{self, DecimalLike, Numeric}; use mz_repr::{Datum, Row}; -use mz_storage_types::sources::{Generator, MzOffset}; +use mz_storage_types::sources::load_generator::Generator; +use mz_storage_types::sources::MzOffset; use once_cell::sync::Lazy; use rand::distributions::{Alphanumeric, DistString}; use rand::rngs::StdRng; diff --git a/src/storage/src/source/kafka.rs b/src/storage/src/source/kafka.rs index c40f6e544fa24..19e45858e481c 100644 --- a/src/storage/src/source/kafka.rs +++ b/src/storage/src/source/kafka.rs @@ -27,9 +27,8 @@ use mz_repr::adt::timestamp::CheckedTimestamp; use mz_repr::{adt::jsonb::Jsonb, Datum, Diff, GlobalId, Row}; use mz_ssh_util::tunnel::SshTunnelStatus; use mz_storage_types::errors::ContextCreationError; -use mz_storage_types::sources::{ - KafkaMetadataKind, KafkaSourceConnection, MzOffset, SourceTimestamp, -}; +use mz_storage_types::sources::kafka::{KafkaMetadataKind, KafkaSourceConnection}; +use mz_storage_types::sources::{MzOffset, SourceTimestamp}; use mz_timely_util::antichain::AntichainExt; use mz_timely_util::builder_async::{OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton}; use mz_timely_util::order::Partitioned;