Skip to content

Commit

Permalink
storage-types: reorganize sources.rs into modules
Browse files Browse the repository at this point in the history
This is a pure code movement PR that splits out
`src/storage-types/src/sources.rs` into the following structure:

```
sources
├── encoding.proto
├── encoding.rs
├── envelope.proto
├── envelope.rs
├── kafka.proto
├── kafka.rs
├── load_generator.proto
├── load_generator.rs
├── postgres.proto
├── postgres.rs
├── testscript.proto
└── testscript.rs
```
  • Loading branch information
petrosagg committed Dec 22, 2023
1 parent b3ef64d commit 330e4a3
Show file tree
Hide file tree
Showing 29 changed files with 2,182 additions and 1,932 deletions.
2 changes: 1 addition & 1 deletion src/adapter/src/catalog/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ async fn ast_rewrite_postgres_source_timeline_id_0_80_0(
use mz_sql_parser::ast::{PgConfigOption, PgConfigOptionName, Value, WithOptionValue};
use mz_storage_types::connections::inline::IntoInlineConnection;
use mz_storage_types::connections::PostgresConnection;
use mz_storage_types::sources::{
use mz_storage_types::sources::postgres::{
PostgresSourcePublicationDetails, ProtoPostgresSourcePublicationDetails,
};
use prost::Message;
Expand Down
4 changes: 2 additions & 2 deletions src/catalog/src/memory/objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -580,8 +580,8 @@ impl Source {
Some("debezium")
}
SourceEnvelope::Upsert(upsert_envelope) => match upsert_envelope.style {
mz_storage_types::sources::UpsertStyle::Default(_) => Some("upsert"),
mz_storage_types::sources::UpsertStyle::Debezium { .. } => {
mz_storage_types::sources::envelope::UpsertStyle::Default(_) => Some("upsert"),
mz_storage_types::sources::envelope::UpsertStyle::Debezium { .. } => {
// NOTE(aljoscha): Should we somehow mark that this is
// using upsert internally? See note above about
// DEBEZIUM.
Expand Down
15 changes: 10 additions & 5 deletions src/sql/src/plan/statement/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,17 @@ use mz_storage_types::sources::encoding::{
included_column_desc, AvroEncoding, ColumnSpec, CsvEncoding, DataEncoding, DataEncodingInner,
ProtobufEncoding, RegexEncoding, SourceDataEncoding, SourceDataEncodingInner,
};
use mz_storage_types::sources::{
GenericSourceConnection, KafkaMetadataKind, KafkaSourceConnection, KeyEnvelope, LoadGenerator,
LoadGeneratorSourceConnection, PostgresSourceConnection, PostgresSourcePublicationDetails,
ProtoPostgresSourcePublicationDetails, SourceConnection, SourceDesc, SourceEnvelope,
TestScriptSourceConnection, Timeline, UnplannedSourceEnvelope, UpsertStyle,
use mz_storage_types::sources::envelope::{
KeyEnvelope, SourceEnvelope, UnplannedSourceEnvelope, UpsertStyle,
};
use mz_storage_types::sources::kafka::{KafkaMetadataKind, KafkaSourceConnection};
use mz_storage_types::sources::load_generator::{LoadGenerator, LoadGeneratorSourceConnection};
use mz_storage_types::sources::postgres::{
PostgresSourceConnection, PostgresSourcePublicationDetails,
ProtoPostgresSourcePublicationDetails,
};
use mz_storage_types::sources::testscript::TestScriptSourceConnection;
use mz_storage_types::sources::{GenericSourceConnection, SourceConnection, SourceDesc, Timeline};
use prost::Message;

use crate::ast::display::AstDisplay;
Expand Down
17 changes: 10 additions & 7 deletions src/sql/src/pure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,8 @@ use mz_storage_types::configuration::StorageConfiguration;
use mz_storage_types::connections::inline::IntoInlineConnection;
use mz_storage_types::connections::Connection;
use mz_storage_types::errors::ContextCreationError;
use mz_storage_types::sources::{
GenericSourceConnection, PostgresSourcePublicationDetails, SourceConnection,
};
use mz_storage_types::sources::postgres::PostgresSourcePublicationDetails;
use mz_storage_types::sources::{GenericSourceConnection, SourceConnection};
use prost::Message;
use protobuf_native::compiler::{SourceTreeDescriptorDatabase, VirtualSourceTree};
use protobuf_native::MessageLite;
Expand Down Expand Up @@ -433,13 +432,17 @@ async fn purify_create_source(
let mut subsources = vec![];

let progress_desc = match &connection {
CreateSourceConnection::Kafka { .. } => &mz_storage_types::sources::KAFKA_PROGRESS_DESC,
CreateSourceConnection::Postgres { .. } => &mz_storage_types::sources::PG_PROGRESS_DESC,
CreateSourceConnection::Kafka { .. } => {
&mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC
}
CreateSourceConnection::Postgres { .. } => {
&mz_storage_types::sources::postgres::PG_PROGRESS_DESC
}
CreateSourceConnection::LoadGenerator { .. } => {
&mz_storage_types::sources::LOAD_GEN_PROGRESS_DESC
&mz_storage_types::sources::load_generator::LOAD_GEN_PROGRESS_DESC
}
CreateSourceConnection::TestScript { .. } => {
&mz_storage_types::sources::TEST_SCRIPT_PROGRESS_DESC
&mz_storage_types::sources::testscript::TEST_SCRIPT_PROGRESS_DESC
}
};

Expand Down
5 changes: 5 additions & 0 deletions src/storage-types/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ fn main() {
"storage-types/src/sinks.proto",
"storage-types/src/sources.proto",
"storage-types/src/sources/encoding.proto",
"storage-types/src/sources/envelope.proto",
"storage-types/src/sources/kafka.proto",
"storage-types/src/sources/postgres.proto",
"storage-types/src/sources/load_generator.proto",
"storage-types/src/sources/testscript.proto",
],
&[".."],
)
Expand Down
2 changes: 0 additions & 2 deletions src/storage-types/src/connections/aws.proto
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@

syntax = "proto3";

import "google/protobuf/empty.proto";

import "repr/src/global_id.proto";
import "storage-types/src/connections.proto";

Expand Down
1 change: 0 additions & 1 deletion src/storage-types/src/errors.proto
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
syntax = "proto3";

import "expr/src/scalar.proto";
import "expr/src/id.proto";
import "repr/src/global_id.proto";
import "repr/src/row.proto";
import "storage-types/src/shim.proto";
Expand Down
191 changes: 10 additions & 181 deletions src/storage-types/src/sources.proto
Original file line number Diff line number Diff line change
Expand Up @@ -11,49 +11,25 @@ syntax = "proto3";

import "google/protobuf/empty.proto";

import "postgres-util/src/desc.proto";
import "proto/src/chrono.proto";
import "proto/src/proto.proto";
import "repr/src/global_id.proto";
import "repr/src/relation_and_scalar.proto";
import "repr/src/row.proto";
import "storage-types/src/controller.proto";
import "storage-types/src/connections.proto";
import "storage-types/src/connections/aws.proto";
import "storage-types/src/errors.proto";
import "storage-types/src/instances.proto";
import "storage-types/src/sources/encoding.proto";
import "expr/src/scalar.proto";
import "storage-types/src/sources/envelope.proto";
import "storage-types/src/sources/postgres.proto";
import "storage-types/src/sources/kafka.proto";
import "storage-types/src/sources/load_generator.proto";
import "storage-types/src/sources/testscript.proto";

package mz_storage_types.sources;

message ProtoMzOffset {
uint64 offset = 1;
}

message ProtoKafkaMetadataKind {
oneof kind {
google.protobuf.Empty partition = 1;
google.protobuf.Empty offset = 2;
google.protobuf.Empty timestamp = 3;
google.protobuf.Empty headers = 4;
ProtoKafkaHeader header = 5;
}
}

message ProtoKafkaHeader {
string key = 1;
bool use_bytes = 2;
}

message ProtoKeyEnvelope {
oneof kind {
google.protobuf.Empty none = 1;
google.protobuf.Empty flattened = 2;
string named = 3;
}
}

message ProtoTimeline {
oneof kind {
google.protobuf.Empty epoch_milliseconds = 1;
Expand All @@ -62,119 +38,21 @@ message ProtoTimeline {
}
}

message ProtoSourceEnvelope {
oneof kind {
ProtoNoneEnvelope none = 1;
ProtoDebeziumEnvelope debezium = 2;
ProtoUpsertEnvelope upsert = 3;
google.protobuf.Empty cdc_v2 = 4;
}
}

message ProtoNoneEnvelope {
ProtoKeyEnvelope key_envelope = 1;
uint64 key_arity = 2;
}

message ProtoUpsertEnvelope {
ProtoUpsertStyle style = 1;
repeated uint64 key_indices = 2;
uint64 source_arity = 3;
reserved 4;
}

message ProtoUpsertStyle {
message ProtoDebezium {
uint64 after_idx = 1;
}

oneof kind {
ProtoKeyEnvelope default = 1;
ProtoDebezium debezium = 2;
}
}

message ProtoDebeziumEnvelope {
uint64 before_idx = 1;
uint64 after_idx = 2;
ProtoDebeziumDedupProjection dedup = 3;
}

message ProtoDebeziumTransactionMetadata {
mz_repr.global_id.ProtoGlobalId tx_metadata_global_id = 1;
uint64 tx_status_idx = 2;
uint64 tx_transaction_id_idx = 3;
uint64 tx_data_collections_idx = 4;
uint64 tx_data_collections_data_collection_idx = 5;
uint64 tx_data_collections_event_count_idx = 6;
string tx_data_collection_name = 7;
uint64 data_transaction_idx = 8;
uint64 data_transaction_id_idx = 9;
}

message ProtoDebeziumDedupProjection {
uint64 op_idx = 1;
uint64 source_idx = 2;
uint64 snapshot_idx = 3;
ProtoDebeziumSourceProjection source_projection = 4;
ProtoDebeziumTransactionMetadata tx_metadata = 6;
}

message ProtoDebeziumSourceProjection {
message ProtoMySql {
uint64 file = 1;
uint64 pos = 2;
uint64 row = 3;
}

message ProtoPostgres {
uint64 sequence = 1;
uint64 lsn = 2;
}

message ProtoSqlServer {
uint64 change_lsn = 1;
uint64 event_serial_no = 2;
}

oneof kind {
ProtoMySql my_sql = 1;
ProtoPostgres postgres = 2;
ProtoSqlServer sql_server = 3;
}
}

message ProtoKafkaMetadataColumn {
string name = 1;
ProtoKafkaMetadataKind kind = 2;
}

message ProtoKafkaSourceConnection {
reserved 5, 6, 7, 8, 9, 10, 12, 14;
mz_storage_types.connections.ProtoKafkaConnection connection = 1;
mz_repr.global_id.ProtoGlobalId connection_id = 13;
string topic = 2;
map<int32, int64> start_offsets = 3;
optional string group_id_prefix = 4;
repeated ProtoKafkaMetadataColumn metadata_columns = 11;
mz_proto.ProtoDuration topic_metadata_refresh_interval = 15;
}

message ProtoSourceDesc {
reserved 4;
ProtoSourceConnection connection = 1;
mz_storage_types.sources.encoding.ProtoSourceDataEncoding encoding = 2;
ProtoSourceEnvelope envelope = 3;
mz_storage_types.sources.envelope.ProtoSourceEnvelope envelope = 3;
mz_proto.ProtoDuration timestamp_interval = 5;
}

message ProtoSourceConnection {
reserved 2, 3, 5;
oneof kind {
ProtoKafkaSourceConnection kafka = 1;
ProtoPostgresSourceConnection postgres = 4;
ProtoLoadGeneratorSourceConnection loadgen = 6;
ProtoTestScriptSourceConnection testscript = 7;
mz_storage_types.sources.kafka.ProtoKafkaSourceConnection kafka = 1;
mz_storage_types.sources.postgres.ProtoPostgresSourceConnection postgres = 4;
mz_storage_types.sources.load_generator.ProtoLoadGeneratorSourceConnection loadgen = 6;
mz_storage_types.sources.testscript.ProtoTestScriptSourceConnection testscript = 7;
}
}

Expand All @@ -185,55 +63,6 @@ message ProtoSourceData {
}
}

message ProtoPostgresSourceConnection {
message ProtoPostgresTableCast {
repeated mz_expr.scalar.ProtoMirScalarExpr column_casts = 1;
}

mz_repr.global_id.ProtoGlobalId connection_id = 6;
mz_storage_types.connections.ProtoPostgresConnection connection = 1;
string publication = 2;
ProtoPostgresSourcePublicationDetails details = 4;
repeated ProtoPostgresTableCast table_casts = 5;
// Describes the position in the source's publication that the table cast
// correlates to; meant to be iterated over in tandem with table_casts
repeated uint64 table_cast_pos = 7;
}

message ProtoPostgresSourcePublicationDetails {
repeated mz_postgres_util.desc.ProtoPostgresTableDesc tables = 1;
string slot = 2;
optional uint64 timeline_id = 3;
}

message ProtoLoadGeneratorSourceConnection {
reserved 1;
oneof generator {
ProtoCounterLoadGenerator counter = 6;
google.protobuf.Empty auction = 3;
ProtoTpchLoadGenerator tpch = 4;
google.protobuf.Empty datums = 5;
google.protobuf.Empty marketing = 7;
}
optional uint64 tick_micros = 2;
}

message ProtoTestScriptSourceConnection {
string desc_json = 1;
}

message ProtoCounterLoadGenerator {
optional uint64 max_cardinality = 1;
}

message ProtoTpchLoadGenerator {
int64 count_supplier = 1;
int64 count_part = 2;
int64 count_customer = 3;
int64 count_orders = 4;
int64 count_clerk = 5;
}

message ProtoCompression {
oneof kind {
google.protobuf.Empty gzip = 1;
Expand Down
Loading

0 comments on commit 330e4a3

Please sign in to comment.