From 0ea38c8320b953dda6b46eab3e6f47b91ab8a9eb Mon Sep 17 00:00:00 2001 From: xxchan Date: Fri, 10 Jan 2025 20:10:51 +0800 Subject: [PATCH] unify entry point for reader Signed-off-by: xxchan --- src/connector/src/macros.rs | 25 ++++ src/connector/src/source/base.rs | 66 ++++++++- .../src/source/cdc/enumerator/mod.rs | 2 +- src/connector/src/source/cdc/mod.rs | 2 +- .../src/source/datagen/enumerator/mod.rs | 4 +- src/connector/src/source/datagen/mod.rs | 2 +- .../source/filesystem/opendal_source/mod.rs | 8 +- .../opendal_source/opendal_enumerator.rs | 4 +- .../src/source/filesystem/s3/enumerator.rs | 2 +- src/connector/src/source/filesystem/s3/mod.rs | 2 +- .../source/google_pubsub/enumerator/client.rs | 1 - src/connector/src/source/iceberg/mod.rs | 8 +- .../src/source/kafka/enumerator/client.rs | 1 - .../src/source/kinesis/enumerator/client.rs | 2 +- .../src/source/mqtt/enumerator/mod.rs | 2 +- .../src/source/nats/enumerator/mod.rs | 2 +- .../src/source/nexmark/enumerator/mod.rs | 6 +- src/connector/src/source/nexmark/mod.rs | 2 +- .../src/source/pulsar/enumerator/client.rs | 2 +- src/connector/src/source/reader/fs_reader.rs | 20 +-- src/connector/src/source/reader/reader.rs | 135 +++--------------- src/connector/src/source/test_source.rs | 4 +- src/frontend/src/scheduler/plan_fragmenter.rs | 3 +- .../source/source_backfill_executor.rs | 6 +- .../src/executor/source/source_executor.rs | 6 +- 25 files changed, 150 insertions(+), 167 deletions(-) diff --git a/src/connector/src/macros.rs b/src/connector/src/macros.rs index 3af4e7a34b8bb..0ac0f5e681472 100644 --- a/src/connector/src/macros.rs +++ b/src/connector/src/macros.rs @@ -328,6 +328,31 @@ macro_rules! impl_connector_properties { }; Ok(enumerator) } + + + pub async fn create_split_reader( + self, + splits: Vec, + parser_config: ParserConfig, + source_ctx: SourceContextRef, + columns: Option>, + mut opt: $crate::source::CreateSplitReaderOpt, + ) -> Result<(BoxSourceChunkStream, $crate::source::CreateSplitReaderResult)> { + opt.support_multiple_splits = self.support_multiple_splits(); + tracing::debug!( + ?splits, + support_multiple_splits = opt.support_multiple_splits, + "spawning connector split reader", + ); + + match self { + $( + ConnectorProperties::$variant_name(prop) => { + $crate::source::create_split_readers(*prop, splits, parser_config, source_ctx, columns, opt).await + } + )* + } + } } } diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index 2dd8724027eed..f70032377d7a2 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -20,8 +20,9 @@ use async_trait::async_trait; use aws_sdk_s3::types::Object; use bytes::Bytes; use enum_as_inner::EnumAsInner; +use futures::future::try_join_all; use futures::stream::BoxStream; -use futures::Stream; +use futures::{Stream, StreamExt}; use itertools::Itertools; use risingwave_common::array::StreamChunk; use risingwave_common::bail; @@ -31,6 +32,7 @@ use risingwave_common::types::{JsonbVal, Scalar}; use risingwave_pb::catalog::{PbSource, PbStreamSourceInfo}; use risingwave_pb::plan_common::ExternalTableDesc; use risingwave_pb::source::ConnectorSplit; +use rw_futures_util::select_all; use serde::de::DeserializeOwned; use serde_json::json; use tokio::sync::mpsc; @@ -118,15 +120,71 @@ impl TryFromBTreeMap for P { } } -pub async fn create_split_reader( +#[derive(Default)] +pub struct CreateSplitReaderOpt { + pub support_multiple_splits: bool, + pub seek_to_latest: bool, +} + +#[derive(Default)] +pub struct CreateSplitReaderResult { + pub latest_splits: Option>, + pub backfill_info: HashMap, +} + +pub async fn create_split_readers( prop: P, splits: Vec, parser_config: ParserConfig, source_ctx: SourceContextRef, columns: Option>, -) -> Result { + opt: CreateSplitReaderOpt, +) -> Result<(BoxSourceChunkStream, CreateSplitReaderResult)> { let splits = splits.into_iter().map(P::Split::try_from).try_collect()?; - P::SplitReader::new(prop, splits, parser_config, source_ctx, columns).await + let mut res = CreateSplitReaderResult { + backfill_info: HashMap::new(), + latest_splits: None, + }; + if opt.support_multiple_splits { + let mut reader = P::SplitReader::new( + prop.clone(), + splits, + parser_config.clone(), + source_ctx.clone(), + columns.clone(), + ) + .await?; + if opt.seek_to_latest { + res.latest_splits = Some(reader.seek_to_latest().await?); + } + res.backfill_info = reader.backfill_info(); + Ok((reader.into_stream().boxed(), res)) + } else { + let mut readers = try_join_all(splits.into_iter().map(|split| { + // TODO: is this reader split across multiple threads...? Realistically, we want + // source_ctx to live in a single actor. + P::SplitReader::new( + prop.clone(), + vec![split], + parser_config.clone(), + source_ctx.clone(), + columns.clone(), + ) + })) + .await?; + if opt.seek_to_latest { + let mut latest_splits = vec![]; + for reader in &mut readers { + latest_splits.extend(reader.seek_to_latest().await?); + } + res.latest_splits = Some(latest_splits); + } + res.backfill_info = readers.iter().flat_map(|r| r.backfill_info()).collect(); + Ok(( + select_all(readers.into_iter().map(|r| r.into_stream())).boxed(), + res, + )) + } } /// [`SplitEnumerator`] fetches the split metadata from the external source service. diff --git a/src/connector/src/source/cdc/enumerator/mod.rs b/src/connector/src/source/cdc/enumerator/mod.rs index d2561cdb084b3..626bc53e8532a 100644 --- a/src/connector/src/source/cdc/enumerator/mod.rs +++ b/src/connector/src/source/cdc/enumerator/mod.rs @@ -30,7 +30,7 @@ use crate::source::cdc::{ table_schema_exclude_additional_columns, CdcProperties, CdcSourceTypeTrait, Citus, DebeziumCdcSplit, Mongodb, Mysql, Postgres, SqlServer, }; -use crate::source::{SourceEnumeratorContextRef, SplitEnumerator}; +use crate::source::SplitEnumerator; pub const DATABASE_SERVERS_KEY: &str = "database.servers"; diff --git a/src/connector/src/source/cdc/mod.rs b/src/connector/src/source/cdc/mod.rs index 9aef78ca4d94a..c9c0f69d9c4ef 100644 --- a/src/connector/src/source/cdc/mod.rs +++ b/src/connector/src/source/cdc/mod.rs @@ -213,7 +213,7 @@ where self, context: crate::source::SourceEnumeratorContextRef, ) -> crate::error::ConnectorResult { - Ok(DebeziumSplitEnumerator::new(self, context).await?) + DebeziumSplitEnumerator::new(self, context).await } } diff --git a/src/connector/src/source/datagen/enumerator/mod.rs b/src/connector/src/source/datagen/enumerator/mod.rs index 72bd1d2495a57..197efc1d9e3b1 100644 --- a/src/connector/src/source/datagen/enumerator/mod.rs +++ b/src/connector/src/source/datagen/enumerator/mod.rs @@ -16,7 +16,7 @@ use anyhow::Context; use async_trait::async_trait; use crate::source::datagen::{DatagenProperties, DatagenSplit}; -use crate::source::{SourceEnumeratorContextRef, SplitEnumerator}; +use crate::source::SplitEnumerator; #[derive(Debug, Copy, Clone, Eq, PartialEq)] pub struct DatagenSplitEnumerator { @@ -24,7 +24,7 @@ pub struct DatagenSplitEnumerator { } impl DatagenSplitEnumerator { - pub async fn new( + pub fn new( properties: DatagenProperties, _context: crate::source::SourceEnumeratorContextRef, ) -> crate::error::ConnectorResult { diff --git a/src/connector/src/source/datagen/mod.rs b/src/connector/src/source/datagen/mod.rs index cc9e805672cf0..e2e30888b7311 100644 --- a/src/connector/src/source/datagen/mod.rs +++ b/src/connector/src/source/datagen/mod.rs @@ -68,7 +68,7 @@ impl SourceProperties for DatagenProperties { self, context: crate::source::SourceEnumeratorContextRef, ) -> crate::error::ConnectorResult { - DatagenSplitEnumerator::new(self, context).await + DatagenSplitEnumerator::new(self, context) } } diff --git a/src/connector/src/source/filesystem/opendal_source/mod.rs b/src/connector/src/source/filesystem/opendal_source/mod.rs index 7dc3d20a22e6c..8172cbc4a37d0 100644 --- a/src/connector/src/source/filesystem/opendal_source/mod.rs +++ b/src/connector/src/source/filesystem/opendal_source/mod.rs @@ -91,7 +91,7 @@ impl SourceProperties for GcsProperties { self, context: crate::source::SourceEnumeratorContextRef, ) -> crate::error::ConnectorResult { - OpendalEnumerator::new(self, context).await + OpendalEnumerator::new(self, context) } } @@ -167,7 +167,7 @@ impl SourceProperties for OpendalS3Properties { self, context: crate::source::SourceEnumeratorContextRef, ) -> crate::error::ConnectorResult { - OpendalEnumerator::new(self, context).await + OpendalEnumerator::new(self, context) } } @@ -207,7 +207,7 @@ impl SourceProperties for PosixFsProperties { self, context: crate::source::SourceEnumeratorContextRef, ) -> crate::error::ConnectorResult { - OpendalEnumerator::new(self, context).await + OpendalEnumerator::new(self, context) } } @@ -253,7 +253,7 @@ impl SourceProperties for AzblobProperties { self, context: crate::source::SourceEnumeratorContextRef, ) -> crate::error::ConnectorResult { - OpendalEnumerator::new(self, context).await + OpendalEnumerator::new(self, context) } } diff --git a/src/connector/src/source/filesystem/opendal_source/opendal_enumerator.rs b/src/connector/src/source/filesystem/opendal_source/opendal_enumerator.rs index dc07bd6975bb0..9866376ab1cfd 100644 --- a/src/connector/src/source/filesystem/opendal_source/opendal_enumerator.rs +++ b/src/connector/src/source/filesystem/opendal_source/opendal_enumerator.rs @@ -26,7 +26,7 @@ use super::OpendalSource; use crate::error::ConnectorResult; use crate::source::filesystem::file_common::CompressionFormat; use crate::source::filesystem::{FsPageItem, OpendalFsSplit}; -use crate::source::{SourceEnumeratorContextRef, SplitEnumerator}; +use crate::source::SplitEnumerator; #[derive(Debug, Clone)] pub struct OpendalEnumerator { @@ -39,7 +39,7 @@ pub struct OpendalEnumerator { } impl OpendalEnumerator { - pub async fn new( + pub fn new( properties: Src::Properties, _context: crate::source::SourceEnumeratorContextRef, ) -> ConnectorResult { diff --git a/src/connector/src/source/filesystem/s3/enumerator.rs b/src/connector/src/source/filesystem/s3/enumerator.rs index b604ff00a3d45..c9b4d8295632f 100644 --- a/src/connector/src/source/filesystem/s3/enumerator.rs +++ b/src/connector/src/source/filesystem/s3/enumerator.rs @@ -20,7 +20,7 @@ use crate::aws_utils::{default_conn_config, s3_client}; use crate::connector_common::AwsAuthProps; use crate::source::filesystem::file_common::FsSplit; use crate::source::filesystem::s3::S3Properties; -use crate::source::{FsListInner, SourceEnumeratorContextRef, SplitEnumerator}; +use crate::source::{FsListInner, SplitEnumerator}; /// Get the prefix from a glob pub fn get_prefix(glob: &str) -> String { diff --git a/src/connector/src/source/filesystem/s3/mod.rs b/src/connector/src/source/filesystem/s3/mod.rs index 5d58dc3074e98..c05ba722efd8e 100644 --- a/src/connector/src/source/filesystem/s3/mod.rs +++ b/src/connector/src/source/filesystem/s3/mod.rs @@ -75,7 +75,7 @@ impl SourceProperties for S3Properties { self, context: crate::source::SourceEnumeratorContextRef, ) -> crate::error::ConnectorResult { - Ok(S3SplitEnumerator::new(self, context).await?) + S3SplitEnumerator::new(self, context).await } } diff --git a/src/connector/src/source/google_pubsub/enumerator/client.rs b/src/connector/src/source/google_pubsub/enumerator/client.rs index be5110b164919..6d0d58ed89143 100644 --- a/src/connector/src/source/google_pubsub/enumerator/client.rs +++ b/src/connector/src/source/google_pubsub/enumerator/client.rs @@ -22,7 +22,6 @@ use crate::error::ConnectorResult; use crate::source::base::SplitEnumerator; use crate::source::google_pubsub::split::PubsubSplit; use crate::source::google_pubsub::PubsubProperties; -use crate::source::SourceEnumeratorContextRef; pub struct PubsubSplitEnumerator { subscription: String, diff --git a/src/connector/src/source/iceberg/mod.rs b/src/connector/src/source/iceberg/mod.rs index 7dc81d688a4c5..eb616cd2a37d4 100644 --- a/src/connector/src/source/iceberg/mod.rs +++ b/src/connector/src/source/iceberg/mod.rs @@ -41,8 +41,8 @@ use crate::connector_common::IcebergCommon; use crate::error::{ConnectorError, ConnectorResult}; use crate::parser::ParserConfig; use crate::source::{ - BoxSourceChunkStream, Column, SourceContextRef, SourceEnumeratorContextRef, SourceProperties, - SplitEnumerator, SplitId, SplitMetaData, SplitReader, UnknownFields, + BoxSourceChunkStream, Column, SourceContextRef, SourceProperties, SplitEnumerator, SplitId, + SplitMetaData, SplitReader, UnknownFields, }; pub const ICEBERG_CONNECTOR: &str = "iceberg"; @@ -115,7 +115,7 @@ impl SourceProperties for IcebergProperties { self, context: crate::source::SourceEnumeratorContextRef, ) -> crate::error::ConnectorResult { - IcebergSplitEnumerator::new(self, context).await + IcebergSplitEnumerator::new(self, context) } } @@ -256,7 +256,7 @@ pub struct IcebergSplitEnumerator { } impl IcebergSplitEnumerator { - pub async fn new( + pub fn new( properties: IcebergProperties, context: crate::source::SourceEnumeratorContextRef, ) -> ConnectorResult { diff --git a/src/connector/src/source/kafka/enumerator/client.rs b/src/connector/src/source/kafka/enumerator/client.rs index 26c6860cdfee9..d1a377e6124b1 100644 --- a/src/connector/src/source/kafka/enumerator/client.rs +++ b/src/connector/src/source/kafka/enumerator/client.rs @@ -33,7 +33,6 @@ use crate::source::kafka::{ KafkaConnectionProps, KafkaContextCommon, KafkaProperties, RwConsumerContext, KAFKA_ISOLATION_LEVEL, }; -use crate::source::SourceEnumeratorContextRef; type KafkaClientType = BaseConsumer; diff --git a/src/connector/src/source/kinesis/enumerator/client.rs b/src/connector/src/source/kinesis/enumerator/client.rs index e00d3d94992bb..02924d32d716b 100644 --- a/src/connector/src/source/kinesis/enumerator/client.rs +++ b/src/connector/src/source/kinesis/enumerator/client.rs @@ -21,7 +21,7 @@ use risingwave_common::bail; use crate::error::ConnectorResult as Result; use crate::source::kinesis::split::KinesisOffset; use crate::source::kinesis::*; -use crate::source::{SourceEnumeratorContextRef, SplitEnumerator}; +use crate::source::SplitEnumerator; pub struct KinesisSplitEnumerator { stream_name: String, diff --git a/src/connector/src/source/mqtt/enumerator/mod.rs b/src/connector/src/source/mqtt/enumerator/mod.rs index 5699b5ab8f1e3..5cf2d290cadef 100644 --- a/src/connector/src/source/mqtt/enumerator/mod.rs +++ b/src/connector/src/source/mqtt/enumerator/mod.rs @@ -26,7 +26,7 @@ use tokio::sync::RwLock; use super::source::MqttSplit; use super::MqttProperties; use crate::error::ConnectorResult; -use crate::source::{SourceEnumeratorContextRef, SplitEnumerator}; +use crate::source::SplitEnumerator; pub struct MqttSplitEnumerator { #[expect(dead_code)] diff --git a/src/connector/src/source/nats/enumerator/mod.rs b/src/connector/src/source/nats/enumerator/mod.rs index 3187357081107..7ccd0567d767b 100644 --- a/src/connector/src/source/nats/enumerator/mod.rs +++ b/src/connector/src/source/nats/enumerator/mod.rs @@ -20,7 +20,7 @@ use risingwave_common::bail; use super::source::{NatsOffset, NatsSplit}; use super::NatsProperties; use crate::error::ConnectorResult; -use crate::source::{SourceEnumeratorContextRef, SplitEnumerator, SplitId}; +use crate::source::{SplitEnumerator, SplitId}; #[derive(Debug, Clone)] pub struct NatsSplitEnumerator { diff --git a/src/connector/src/source/nexmark/enumerator/mod.rs b/src/connector/src/source/nexmark/enumerator/mod.rs index a94f1623ed860..17ca6e7bdecf4 100644 --- a/src/connector/src/source/nexmark/enumerator/mod.rs +++ b/src/connector/src/source/nexmark/enumerator/mod.rs @@ -17,16 +17,14 @@ use async_trait::async_trait; use crate::error::ConnectorResult; use crate::source::nexmark::split::NexmarkSplit; use crate::source::nexmark::NexmarkProperties; -use crate::source::{SourceEnumeratorContextRef, SplitEnumerator}; +use crate::source::SplitEnumerator; pub struct NexmarkSplitEnumerator { split_num: i32, } -impl NexmarkSplitEnumerator {} - impl NexmarkSplitEnumerator { - pub async fn new( + pub fn new( properties: NexmarkProperties, _context: crate::source::SourceEnumeratorContextRef, ) -> ConnectorResult { diff --git a/src/connector/src/source/nexmark/mod.rs b/src/connector/src/source/nexmark/mod.rs index a5d7e84ac5044..a05876054e3f9 100644 --- a/src/connector/src/source/nexmark/mod.rs +++ b/src/connector/src/source/nexmark/mod.rs @@ -233,7 +233,7 @@ impl SourceProperties for NexmarkProperties { self, context: crate::source::SourceEnumeratorContextRef, ) -> crate::error::ConnectorResult { - NexmarkSplitEnumerator::new(self, context).await + NexmarkSplitEnumerator::new(self, context) } } diff --git a/src/connector/src/source/pulsar/enumerator/client.rs b/src/connector/src/source/pulsar/enumerator/client.rs index 1ce7865c1b056..b043fc3df1531 100644 --- a/src/connector/src/source/pulsar/enumerator/client.rs +++ b/src/connector/src/source/pulsar/enumerator/client.rs @@ -23,7 +23,7 @@ use crate::error::ConnectorResult; use crate::source::pulsar::split::PulsarSplit; use crate::source::pulsar::topic::{parse_topic, Topic}; use crate::source::pulsar::PulsarProperties; -use crate::source::{SourceEnumeratorContextRef, SplitEnumerator}; +use crate::source::SplitEnumerator; pub struct PulsarSplitEnumerator { client: Pulsar, diff --git a/src/connector/src/source/reader/fs_reader.rs b/src/connector/src/source/reader/fs_reader.rs index 7329d429b8054..41817c3a85a97 100644 --- a/src/connector/src/source/reader/fs_reader.rs +++ b/src/connector/src/source/reader/fs_reader.rs @@ -24,10 +24,9 @@ use risingwave_common::catalog::ColumnId; use crate::error::ConnectorResult; use crate::parser::{CommonParserConfig, ParserConfig, SpecificParserConfig}; use crate::source::{ - create_split_reader, BoxSourceChunkStream, ConnectorProperties, ConnectorState, - SourceColumnDesc, SourceContext, SplitReader, + BoxSourceChunkStream, ConnectorProperties, ConnectorState, SourceColumnDesc, SourceContext, }; -use crate::{dispatch_source_prop, WithOptionsSecResolved}; +use crate::WithOptionsSecResolved; #[derive(Clone, Debug)] pub struct FsSourceReader { @@ -92,11 +91,16 @@ impl FsSourceReader { let stream = match state { None => pending().boxed(), Some(splits) => { - dispatch_source_prop!(config, prop, { - create_split_reader(*prop, splits, parser_config, source_ctx, None) - .await? - .into_stream() - }) + config + .create_split_reader( + splits, + parser_config, + source_ctx, + None, + Default::default(), + ) + .await? + .0 } }; Ok(stream) diff --git a/src/connector/src/source/reader/reader.rs b/src/connector/src/source/reader/reader.rs index 5f77af01ecb81..f5098220d9b7f 100644 --- a/src/connector/src/source/reader/reader.rs +++ b/src/connector/src/source/reader/reader.rs @@ -12,19 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; use std::sync::Arc; use anyhow::Context; use async_nats::jetstream::consumer::AckPolicy; -use futures::future::try_join_all; use futures::stream::pending; use futures::StreamExt; use futures_async_stream::try_stream; use itertools::Itertools; use risingwave_common::bail; use risingwave_common::catalog::ColumnId; -use rw_futures_util::select_all; use thiserror_ext::AsReport as _; use crate::error::ConnectorResult; @@ -36,11 +33,11 @@ use crate::source::filesystem::opendal_source::{ }; use crate::source::filesystem::{FsPageItem, OpendalFsSplit}; use crate::source::{ - create_split_reader, BackfillInfo, BoxSourceChunkStream, BoxTryStream, Column, - ConnectorProperties, ConnectorState, SourceColumnDesc, SourceContext, SplitId, SplitImpl, - SplitReader, WaitCheckpointTask, + BoxSourceChunkStream, BoxTryStream, Column, ConnectorProperties, ConnectorState, + CreateSplitReaderOpt, CreateSplitReaderResult, SourceColumnDesc, SourceContext, + WaitCheckpointTask, }; -use crate::{dispatch_source_prop, WithOptionsSecResolved}; +use crate::WithOptionsSecResolved; #[derive(Clone, Debug)] pub struct SourceReader { @@ -144,72 +141,6 @@ impl SourceReader { }) } - pub async fn build_stream_for_backfill( - &self, - state: ConnectorState, - column_ids: Vec, - source_ctx: Arc, - ) -> ConnectorResult<(BoxSourceChunkStream, HashMap)> { - let Some(splits) = state else { - return Ok((pending().boxed(), HashMap::new())); - }; - let config = self.config.clone(); - let columns = self.get_target_columns(column_ids)?; - - let data_gen_columns = Some( - columns - .iter() - .map(|col| Column { - name: col.name.clone(), - data_type: col.data_type.clone(), - is_visible: col.is_visible(), - }) - .collect_vec(), - ); - - let parser_config = ParserConfig { - specific: self.parser_config.clone(), - common: CommonParserConfig { - rw_columns: columns, - }, - }; - - let support_multiple_splits = config.support_multiple_splits(); - dispatch_source_prop!(config, prop, { - let readers = if support_multiple_splits { - tracing::debug!( - "spawning connector split reader for multiple splits {:?}", - splits - ); - let reader = - create_split_reader(*prop, splits, parser_config, source_ctx, data_gen_columns) - .await?; - - vec![reader] - } else { - let to_reader_splits = splits.into_iter().map(|split| vec![split]); - try_join_all(to_reader_splits.into_iter().map(|splits| { - tracing::debug!(?splits, "spawning connector split reader"); - let props = prop.clone(); - let data_gen_columns = data_gen_columns.clone(); - let parser_config = parser_config.clone(); - // TODO: is this reader split across multiple threads...? Realistically, we want - // source_ctx to live in a single actor. - let source_ctx = source_ctx.clone(); - create_split_reader(*props, splits, parser_config, source_ctx, data_gen_columns) - })) - .await? - }; - - let backfill_info = readers.iter().flat_map(|r| r.backfill_info()).collect(); - - Ok(( - select_all(readers.into_iter().map(|r| r.into_stream())).boxed(), - backfill_info, - )) - }) - } - /// Build `SplitReader`s and then `BoxSourceChunkStream` from the given `ConnectorState` (`SplitImpl`s). /// /// If `seek_to_latest` is true, will also return the latest splits after seek. @@ -219,9 +150,9 @@ impl SourceReader { column_ids: Vec, source_ctx: Arc, seek_to_latest: bool, - ) -> ConnectorResult<(BoxSourceChunkStream, Option>)> { + ) -> ConnectorResult<(BoxSourceChunkStream, CreateSplitReaderResult)> { let Some(splits) = state else { - return Ok((pending().boxed(), None)); + return Ok((pending().boxed(), Default::default())); }; let config = self.config.clone(); let columns = self.get_target_columns(column_ids)?; @@ -244,48 +175,18 @@ impl SourceReader { }, }; - let support_multiple_splits = config.support_multiple_splits(); - tracing::debug!( - ?splits, - %support_multiple_splits, - "spawning connector split reader", - ); - dispatch_source_prop!(config, prop, { - let mut readers = if support_multiple_splits { - let reader = - create_split_reader(*prop, splits, parser_config, source_ctx, data_gen_columns) - .await?; - - vec![reader] - } else { - let to_reader_splits = splits.into_iter().map(|split| vec![split]); - try_join_all(to_reader_splits.into_iter().map(|splits| { - let props = prop.clone(); - let data_gen_columns = data_gen_columns.clone(); - let parser_config = parser_config.clone(); - // TODO: is this reader split across multiple threads...? Realistically, we want - // source_ctx to live in a single actor. - let source_ctx = source_ctx.clone(); - create_split_reader(*props, splits, parser_config, source_ctx, data_gen_columns) - })) - .await? - }; - - let latest_splits = if seek_to_latest { - let mut latest_splits = Vec::new(); - for reader in &mut readers { - latest_splits.extend(reader.seek_to_latest().await?); - } - Some(latest_splits) - } else { - None - }; - - Ok(( - select_all(readers.into_iter().map(|r| r.into_stream())).boxed(), - latest_splits, - )) - }) + config + .create_split_reader( + splits, + parser_config, + source_ctx, + data_gen_columns, + CreateSplitReaderOpt { + seek_to_latest, + ..Default::default() + }, + ) + .await } } diff --git a/src/connector/src/source/test_source.rs b/src/connector/src/source/test_source.rs index fa7e0c7bade85..25cb725816b67 100644 --- a/src/connector/src/source/test_source.rs +++ b/src/connector/src/source/test_source.rs @@ -166,7 +166,7 @@ pub struct TestSourceSplitEnumerator { } impl TestSourceSplitEnumerator { - pub async fn new( + pub fn new( properties: TestSourceProperties, context: crate::source::SourceEnumeratorContextRef, ) -> ConnectorResult { @@ -248,7 +248,7 @@ impl SourceProperties for TestSourceProperties { self, context: crate::source::SourceEnumeratorContextRef, ) -> crate::error::ConnectorResult { - TestSourceSplitEnumerator::new(self, context).await + TestSourceSplitEnumerator::new(self, context) } } diff --git a/src/frontend/src/scheduler/plan_fragmenter.rs b/src/frontend/src/scheduler/plan_fragmenter.rs index 3949f1f2d9f5d..ae244d36453d8 100644 --- a/src/frontend/src/scheduler/plan_fragmenter.rs +++ b/src/frontend/src/scheduler/plan_fragmenter.rs @@ -380,8 +380,7 @@ impl SourceScanInfo { SourceFetchParameters::IcebergSpecificInfo(iceberg_specific_info), ) => { let iceberg_enumerator = - IcebergSplitEnumerator::new(*prop, SourceEnumeratorContext::dummy().into()) - .await?; + IcebergSplitEnumerator::new(*prop, SourceEnumeratorContext::dummy().into())?; let time_travel_info = match fetch_info.as_of { Some(AsOf::VersionNum(v)) => Some(IcebergTimeTravelInfo::Version(v)), diff --git a/src/stream/src/executor/source/source_backfill_executor.rs b/src/stream/src/executor/source/source_backfill_executor.rs index 1b3c08b2cad5e..d960ea97ef859 100644 --- a/src/stream/src/executor/source/source_backfill_executor.rs +++ b/src/stream/src/executor/source/source_backfill_executor.rs @@ -309,14 +309,14 @@ impl SourceBackfillExecutorInner { // the executor can only know it's finished when data coming in. // For blocking DDL, this would be annoying. - let (stream, backfill_info) = source_desc + let (stream, res) = source_desc .source - .build_stream_for_backfill(Some(splits), column_ids, Arc::new(source_ctx)) + .build_stream(Some(splits), column_ids, Arc::new(source_ctx), false) .await .map_err(StreamExecutorError::connector_error)?; Ok(( apply_rate_limit(stream, self.rate_limit_rps).boxed(), - backfill_info, + res.backfill_info, )) } diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index 89f10c0f569e4..ae4d59bb5c8cf 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -129,7 +129,7 @@ impl SourceExecutor { seek_to_latest: bool, ) -> StreamExecutorResult<(BoxSourceChunkStream, Option>)> { let (column_ids, source_ctx) = self.prepare_source_stream_build(source_desc); - let (stream, latest_splits) = source_desc + let (stream, res) = source_desc .source .build_stream(state, column_ids, Arc::new(source_ctx), seek_to_latest) .await @@ -137,7 +137,7 @@ impl SourceExecutor { Ok(( apply_rate_limit(stream, self.rate_limit_rps).boxed(), - latest_splits, + res.latest_splits, )) } @@ -554,7 +554,7 @@ impl SourceExecutor { false, // not need to seek to latest since source state is initialized ) .await { - Ok((stream, latest_splits)) => Ok((stream, latest_splits)), + Ok((stream, res)) => Ok((stream, res.latest_splits)), Err(e) => { tracing::warn!(error = %e.as_report(), "failed to build source stream, retrying..."); Err(e)