Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage-types: reorganize sources.rs into modules #24096

Merged
merged 1 commit into from
Dec 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/adapter/src/catalog/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ async fn ast_rewrite_postgres_source_timeline_id_0_80_0(
use mz_sql_parser::ast::{PgConfigOption, PgConfigOptionName, Value, WithOptionValue};
use mz_storage_types::connections::inline::IntoInlineConnection;
use mz_storage_types::connections::PostgresConnection;
use mz_storage_types::sources::{
use mz_storage_types::sources::postgres::{
PostgresSourcePublicationDetails, ProtoPostgresSourcePublicationDetails,
};
use prost::Message;
Expand Down
4 changes: 2 additions & 2 deletions src/catalog/src/memory/objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -580,8 +580,8 @@ impl Source {
Some("debezium")
}
SourceEnvelope::Upsert(upsert_envelope) => match upsert_envelope.style {
mz_storage_types::sources::UpsertStyle::Default(_) => Some("upsert"),
mz_storage_types::sources::UpsertStyle::Debezium { .. } => {
mz_storage_types::sources::envelope::UpsertStyle::Default(_) => Some("upsert"),
mz_storage_types::sources::envelope::UpsertStyle::Debezium { .. } => {
// NOTE(aljoscha): Should we somehow mark that this is
// using upsert internally? See note above about
// DEBEZIUM.
Expand Down
15 changes: 10 additions & 5 deletions src/sql/src/plan/statement/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,17 @@ use mz_storage_types::sources::encoding::{
included_column_desc, AvroEncoding, ColumnSpec, CsvEncoding, DataEncoding, DataEncodingInner,
ProtobufEncoding, RegexEncoding, SourceDataEncoding, SourceDataEncodingInner,
};
use mz_storage_types::sources::{
GenericSourceConnection, KafkaMetadataKind, KafkaSourceConnection, KeyEnvelope, LoadGenerator,
LoadGeneratorSourceConnection, PostgresSourceConnection, PostgresSourcePublicationDetails,
ProtoPostgresSourcePublicationDetails, SourceConnection, SourceDesc, SourceEnvelope,
TestScriptSourceConnection, Timeline, UnplannedSourceEnvelope, UpsertStyle,
use mz_storage_types::sources::envelope::{
KeyEnvelope, SourceEnvelope, UnplannedSourceEnvelope, UpsertStyle,
};
use mz_storage_types::sources::kafka::{KafkaMetadataKind, KafkaSourceConnection};
use mz_storage_types::sources::load_generator::{LoadGenerator, LoadGeneratorSourceConnection};
use mz_storage_types::sources::postgres::{
PostgresSourceConnection, PostgresSourcePublicationDetails,
ProtoPostgresSourcePublicationDetails,
};
use mz_storage_types::sources::testscript::TestScriptSourceConnection;
use mz_storage_types::sources::{GenericSourceConnection, SourceConnection, SourceDesc, Timeline};
use prost::Message;

use crate::ast::display::AstDisplay;
Expand Down
17 changes: 10 additions & 7 deletions src/sql/src/pure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,8 @@ use mz_storage_types::configuration::StorageConfiguration;
use mz_storage_types::connections::inline::IntoInlineConnection;
use mz_storage_types::connections::Connection;
use mz_storage_types::errors::ContextCreationError;
use mz_storage_types::sources::{
GenericSourceConnection, PostgresSourcePublicationDetails, SourceConnection,
};
use mz_storage_types::sources::postgres::PostgresSourcePublicationDetails;
use mz_storage_types::sources::{GenericSourceConnection, SourceConnection};
use prost::Message;
use protobuf_native::compiler::{SourceTreeDescriptorDatabase, VirtualSourceTree};
use protobuf_native::MessageLite;
Expand Down Expand Up @@ -433,13 +432,17 @@ async fn purify_create_source(
let mut subsources = vec![];

let progress_desc = match &connection {
CreateSourceConnection::Kafka { .. } => &mz_storage_types::sources::KAFKA_PROGRESS_DESC,
CreateSourceConnection::Postgres { .. } => &mz_storage_types::sources::PG_PROGRESS_DESC,
CreateSourceConnection::Kafka { .. } => {
&mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC
}
CreateSourceConnection::Postgres { .. } => {
&mz_storage_types::sources::postgres::PG_PROGRESS_DESC
}
CreateSourceConnection::LoadGenerator { .. } => {
&mz_storage_types::sources::LOAD_GEN_PROGRESS_DESC
&mz_storage_types::sources::load_generator::LOAD_GEN_PROGRESS_DESC
}
CreateSourceConnection::TestScript { .. } => {
&mz_storage_types::sources::TEST_SCRIPT_PROGRESS_DESC
&mz_storage_types::sources::testscript::TEST_SCRIPT_PROGRESS_DESC
}
};

Expand Down
5 changes: 5 additions & 0 deletions src/storage-types/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ fn main() {
"storage-types/src/sinks.proto",
"storage-types/src/sources.proto",
"storage-types/src/sources/encoding.proto",
"storage-types/src/sources/envelope.proto",
"storage-types/src/sources/kafka.proto",
"storage-types/src/sources/postgres.proto",
"storage-types/src/sources/load_generator.proto",
"storage-types/src/sources/testscript.proto",
],
&[".."],
)
Expand Down
2 changes: 0 additions & 2 deletions src/storage-types/src/connections/aws.proto
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@

syntax = "proto3";

import "google/protobuf/empty.proto";

import "repr/src/global_id.proto";
import "storage-types/src/connections.proto";

Expand Down
1 change: 0 additions & 1 deletion src/storage-types/src/errors.proto
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
syntax = "proto3";

import "expr/src/scalar.proto";
import "expr/src/id.proto";
import "repr/src/global_id.proto";
import "repr/src/row.proto";
import "storage-types/src/shim.proto";
Expand Down
191 changes: 10 additions & 181 deletions src/storage-types/src/sources.proto
Original file line number Diff line number Diff line change
Expand Up @@ -11,49 +11,25 @@ syntax = "proto3";

import "google/protobuf/empty.proto";

import "postgres-util/src/desc.proto";
import "proto/src/chrono.proto";
import "proto/src/proto.proto";
import "repr/src/global_id.proto";
import "repr/src/relation_and_scalar.proto";
import "repr/src/row.proto";
import "storage-types/src/controller.proto";
import "storage-types/src/connections.proto";
import "storage-types/src/connections/aws.proto";
import "storage-types/src/errors.proto";
import "storage-types/src/instances.proto";
import "storage-types/src/sources/encoding.proto";
import "expr/src/scalar.proto";
import "storage-types/src/sources/envelope.proto";
import "storage-types/src/sources/postgres.proto";
import "storage-types/src/sources/kafka.proto";
import "storage-types/src/sources/load_generator.proto";
import "storage-types/src/sources/testscript.proto";

package mz_storage_types.sources;

message ProtoMzOffset {
uint64 offset = 1;
}

message ProtoKafkaMetadataKind {
oneof kind {
google.protobuf.Empty partition = 1;
google.protobuf.Empty offset = 2;
google.protobuf.Empty timestamp = 3;
google.protobuf.Empty headers = 4;
ProtoKafkaHeader header = 5;
}
}

message ProtoKafkaHeader {
string key = 1;
bool use_bytes = 2;
}

message ProtoKeyEnvelope {
oneof kind {
google.protobuf.Empty none = 1;
google.protobuf.Empty flattened = 2;
string named = 3;
}
}

message ProtoTimeline {
oneof kind {
google.protobuf.Empty epoch_milliseconds = 1;
Expand All @@ -62,119 +38,21 @@ message ProtoTimeline {
}
}

message ProtoSourceEnvelope {
oneof kind {
ProtoNoneEnvelope none = 1;
ProtoDebeziumEnvelope debezium = 2;
ProtoUpsertEnvelope upsert = 3;
google.protobuf.Empty cdc_v2 = 4;
}
}

message ProtoNoneEnvelope {
ProtoKeyEnvelope key_envelope = 1;
uint64 key_arity = 2;
}

message ProtoUpsertEnvelope {
ProtoUpsertStyle style = 1;
repeated uint64 key_indices = 2;
uint64 source_arity = 3;
reserved 4;
}

message ProtoUpsertStyle {
message ProtoDebezium {
uint64 after_idx = 1;
}

oneof kind {
ProtoKeyEnvelope default = 1;
ProtoDebezium debezium = 2;
}
}

message ProtoDebeziumEnvelope {
uint64 before_idx = 1;
uint64 after_idx = 2;
ProtoDebeziumDedupProjection dedup = 3;
}

message ProtoDebeziumTransactionMetadata {
mz_repr.global_id.ProtoGlobalId tx_metadata_global_id = 1;
uint64 tx_status_idx = 2;
uint64 tx_transaction_id_idx = 3;
uint64 tx_data_collections_idx = 4;
uint64 tx_data_collections_data_collection_idx = 5;
uint64 tx_data_collections_event_count_idx = 6;
string tx_data_collection_name = 7;
uint64 data_transaction_idx = 8;
uint64 data_transaction_id_idx = 9;
}

message ProtoDebeziumDedupProjection {
uint64 op_idx = 1;
uint64 source_idx = 2;
uint64 snapshot_idx = 3;
ProtoDebeziumSourceProjection source_projection = 4;
ProtoDebeziumTransactionMetadata tx_metadata = 6;
}

message ProtoDebeziumSourceProjection {
message ProtoMySql {
uint64 file = 1;
uint64 pos = 2;
uint64 row = 3;
}

message ProtoPostgres {
uint64 sequence = 1;
uint64 lsn = 2;
}

message ProtoSqlServer {
uint64 change_lsn = 1;
uint64 event_serial_no = 2;
}

oneof kind {
ProtoMySql my_sql = 1;
ProtoPostgres postgres = 2;
ProtoSqlServer sql_server = 3;
}
}

message ProtoKafkaMetadataColumn {
string name = 1;
ProtoKafkaMetadataKind kind = 2;
}

message ProtoKafkaSourceConnection {
reserved 5, 6, 7, 8, 9, 10, 12, 14;
mz_storage_types.connections.ProtoKafkaConnection connection = 1;
mz_repr.global_id.ProtoGlobalId connection_id = 13;
string topic = 2;
map<int32, int64> start_offsets = 3;
optional string group_id_prefix = 4;
repeated ProtoKafkaMetadataColumn metadata_columns = 11;
mz_proto.ProtoDuration topic_metadata_refresh_interval = 15;
}

message ProtoSourceDesc {
reserved 4;
ProtoSourceConnection connection = 1;
mz_storage_types.sources.encoding.ProtoSourceDataEncoding encoding = 2;
ProtoSourceEnvelope envelope = 3;
mz_storage_types.sources.envelope.ProtoSourceEnvelope envelope = 3;
mz_proto.ProtoDuration timestamp_interval = 5;
}

message ProtoSourceConnection {
reserved 2, 3, 5;
oneof kind {
ProtoKafkaSourceConnection kafka = 1;
ProtoPostgresSourceConnection postgres = 4;
ProtoLoadGeneratorSourceConnection loadgen = 6;
ProtoTestScriptSourceConnection testscript = 7;
mz_storage_types.sources.kafka.ProtoKafkaSourceConnection kafka = 1;
mz_storage_types.sources.postgres.ProtoPostgresSourceConnection postgres = 4;
mz_storage_types.sources.load_generator.ProtoLoadGeneratorSourceConnection loadgen = 6;
mz_storage_types.sources.testscript.ProtoTestScriptSourceConnection testscript = 7;
}
}

Expand All @@ -185,55 +63,6 @@ message ProtoSourceData {
}
}

message ProtoPostgresSourceConnection {
message ProtoPostgresTableCast {
repeated mz_expr.scalar.ProtoMirScalarExpr column_casts = 1;
}

mz_repr.global_id.ProtoGlobalId connection_id = 6;
mz_storage_types.connections.ProtoPostgresConnection connection = 1;
string publication = 2;
ProtoPostgresSourcePublicationDetails details = 4;
repeated ProtoPostgresTableCast table_casts = 5;
// Describes the position in the source's publication that the table cast
// correlates to; meant to be iterated over in tandem with table_casts
repeated uint64 table_cast_pos = 7;
}

message ProtoPostgresSourcePublicationDetails {
repeated mz_postgres_util.desc.ProtoPostgresTableDesc tables = 1;
string slot = 2;
optional uint64 timeline_id = 3;
}

message ProtoLoadGeneratorSourceConnection {
reserved 1;
oneof generator {
ProtoCounterLoadGenerator counter = 6;
google.protobuf.Empty auction = 3;
ProtoTpchLoadGenerator tpch = 4;
google.protobuf.Empty datums = 5;
google.protobuf.Empty marketing = 7;
}
optional uint64 tick_micros = 2;
}

message ProtoTestScriptSourceConnection {
string desc_json = 1;
}

message ProtoCounterLoadGenerator {
optional uint64 max_cardinality = 1;
}

message ProtoTpchLoadGenerator {
int64 count_supplier = 1;
int64 count_part = 2;
int64 count_customer = 3;
int64 count_orders = 4;
int64 count_clerk = 5;
}

message ProtoCompression {
oneof kind {
google.protobuf.Empty gzip = 1;
Expand Down
Loading