Skip to content

Commit

Permalink
make SplitEnumerator dyn
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Jan 10, 2025
1 parent e3da091 commit 4664578
Show file tree
Hide file tree
Showing 25 changed files with 350 additions and 242 deletions.
25 changes: 25 additions & 0 deletions src/connector/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,31 @@ macro_rules! impl_connector_properties {
}
}
)*


$crate::paste! {
impl ConnectorProperties {
pub async fn create_split_enumerator(self, context: crate::source::base::SourceEnumeratorContextRef) -> crate::error::ConnectorResult<Box<dyn SplitEnumerator>> {
let enumerator = match self {
$(
ConnectorProperties::$variant_name(prop) =>
Box::new(crate::source::prelude::[<$variant_name SplitEnumerator>]::new(*prop, context).await?) as Box<dyn SplitEnumerator>,
)*
};
Ok(enumerator)
}
}
}

// TODO: make SourceProperties dyn-compatible
// pub fn into_dyn(self) -> Box<dyn crate::source::SourceProperties> {
// match self {
// $(
// ConnectorProperties::$variant_name(prop) => prop,
// )*
// }
// }

}
}

Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/sink/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use crate::sink::{DummySinkCommitCoordinator, Result, SinkWriterParam};
use crate::source::kafka::{
KafkaContextCommon, KafkaProperties, KafkaSplitEnumerator, RwProducerContext,
};
use crate::source::{SourceEnumeratorContext, SplitEnumerator};
use crate::source::SourceEnumeratorContext;
use crate::{
deserialize_duration_from_string, deserialize_u32_from_string, dispatch_sink_formatter_impl,
};
Expand Down
11 changes: 3 additions & 8 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ pub trait SourceProperties: TryFromBTreeMap + Clone + WithOptions + std::fmt::De
type Split: SplitMetaData
+ TryFrom<SplitImpl, Error = crate::error::ConnectorError>
+ Into<SplitImpl>;
type SplitEnumerator: SplitEnumerator<Properties = Self, Split = Self::Split>;
type SplitEnumerator;
type SplitReader: SplitReader<Split = Self::Split, Properties = Self>;

/// Load additional info from `PbSource`. Currently only used by CDC.
Expand Down Expand Up @@ -127,13 +127,8 @@ pub async fn create_split_reader<P: SourceProperties>(
/// [`SplitEnumerator`] fetches the split metadata from the external source service.
/// NOTE: It runs in the meta server, so probably it should be moved to the `meta` crate.
#[async_trait]
pub trait SplitEnumerator: Sized {
type Split: SplitMetaData + Send;
type Properties;

async fn new(properties: Self::Properties, context: SourceEnumeratorContextRef)
-> Result<Self>;
async fn list_splits(&mut self) -> Result<Vec<Self::Split>>;
pub trait SplitEnumerator: Send {
async fn list_splits(&mut self) -> Result<Vec<SplitImpl>>;
}

pub type SourceContextRef = Arc<SourceContext>;
Expand Down
42 changes: 21 additions & 21 deletions src/connector/src/source/cdc/enumerator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,8 @@ pub struct DebeziumSplitEnumerator<T: CdcSourceTypeTrait> {
_phantom: PhantomData<T>,
}

#[async_trait]
impl<T: CdcSourceTypeTrait> SplitEnumerator for DebeziumSplitEnumerator<T>
where
Self: ListCdcSplits<CdcSourceType = T>,
{
type Properties = CdcProperties<T>;
type Split = DebeziumCdcSplit<T>;

async fn new(
impl<T: CdcSourceTypeTrait> DebeziumSplitEnumerator<T> {
pub async fn new(
props: CdcProperties<T>,
context: SourceEnumeratorContextRef,
) -> ConnectorResult<Self> {
Expand Down Expand Up @@ -119,47 +112,54 @@ where
_phantom: PhantomData,
})
}
}

#[async_trait]
impl<T: CdcSourceTypeTrait> SplitEnumerator for DebeziumSplitEnumerator<T>
where
Self: ListCdcSplits<CdcSourceType = T>,
{

async fn list_splits(&mut self) -> ConnectorResult<Vec<DebeziumCdcSplit<T>>> {
async fn list_splits(&mut self) -> crate::error::ConnectorResult<Vec<crate::source::base::SplitImpl>> {
Ok(self.list_cdc_splits())
}
}

pub trait ListCdcSplits {
type CdcSourceType: CdcSourceTypeTrait;
fn list_cdc_splits(&mut self) -> Vec<DebeziumCdcSplit<Self::CdcSourceType>>;
fn list_cdc_splits(&mut self) -> Vec<crate::source::base::SplitImpl>;
}

impl ListCdcSplits for DebeziumSplitEnumerator<Mysql> {
type CdcSourceType = Mysql;

fn list_cdc_splits(&mut self) -> Vec<DebeziumCdcSplit<Self::CdcSourceType>> {
fn list_cdc_splits(&mut self) -> Vec<crate::source::base::SplitImpl> {
// CDC source only supports single split
vec![DebeziumCdcSplit::<Self::CdcSourceType>::new(
self.source_id,
None,
None,
)]
).into()]
}
}

impl ListCdcSplits for DebeziumSplitEnumerator<Postgres> {
type CdcSourceType = Postgres;

fn list_cdc_splits(&mut self) -> Vec<DebeziumCdcSplit<Self::CdcSourceType>> {
fn list_cdc_splits(&mut self) -> Vec<crate::source::base::SplitImpl> {
// CDC source only supports single split
vec![DebeziumCdcSplit::<Self::CdcSourceType>::new(
self.source_id,
None,
None,
)]
).into()]
}
}

impl ListCdcSplits for DebeziumSplitEnumerator<Citus> {
type CdcSourceType = Citus;

fn list_cdc_splits(&mut self) -> Vec<DebeziumCdcSplit<Self::CdcSourceType>> {
fn list_cdc_splits(&mut self) -> Vec<crate::source::base::SplitImpl> {
self.worker_node_addrs
.iter()
.enumerate()
Expand All @@ -168,32 +168,32 @@ impl ListCdcSplits for DebeziumSplitEnumerator<Citus> {
id as u32,
None,
Some(addr.to_string()),
)
).into()
})
.collect_vec()
}
}
impl ListCdcSplits for DebeziumSplitEnumerator<Mongodb> {
type CdcSourceType = Mongodb;

fn list_cdc_splits(&mut self) -> Vec<DebeziumCdcSplit<Self::CdcSourceType>> {
fn list_cdc_splits(&mut self) -> Vec<crate::source::base::SplitImpl> {
// CDC source only supports single split
vec![DebeziumCdcSplit::<Self::CdcSourceType>::new(
self.source_id,
None,
None,
)]
).into()]
}
}

impl ListCdcSplits for DebeziumSplitEnumerator<SqlServer> {
type CdcSourceType = SqlServer;

fn list_cdc_splits(&mut self) -> Vec<DebeziumCdcSplit<Self::CdcSourceType>> {
fn list_cdc_splits(&mut self) -> Vec<crate::source::base::SplitImpl> {
vec![DebeziumCdcSplit::<Self::CdcSourceType>::new(
self.source_id,
None,
None,
)]
).into()]
}
}
16 changes: 8 additions & 8 deletions src/connector/src/source/datagen/enumerator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,8 @@ pub struct DatagenSplitEnumerator {
split_num: i32,
}

#[async_trait]
impl SplitEnumerator for DatagenSplitEnumerator {
type Properties = DatagenProperties;
type Split = DatagenSplit;

async fn new(
impl DatagenSplitEnumerator {
pub async fn new(
properties: DatagenProperties,
_context: SourceEnumeratorContextRef,
) -> crate::error::ConnectorResult<DatagenSplitEnumerator> {
Expand All @@ -38,15 +34,19 @@ impl SplitEnumerator for DatagenSplitEnumerator {
.context("failed to parse datagen split num")?;
Ok(Self { split_num })
}
}

#[async_trait]
impl SplitEnumerator for DatagenSplitEnumerator {

async fn list_splits(&mut self) -> crate::error::ConnectorResult<Vec<DatagenSplit>> {
async fn list_splits(&mut self) -> crate::error::ConnectorResult<Vec<crate::source::base::SplitImpl>> {
let mut splits = vec![];
for i in 0..self.split_num {
splits.push(DatagenSplit {
split_num: self.split_num,
split_index: i,
start_offset: None,
});
}.into());
}
Ok(splits)
}
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/source/filesystem/opendal_source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use with_options::WithOptions;
pub mod opendal_enumerator;
pub mod opendal_reader;

use self::opendal_enumerator::OpendalEnumerator;
pub use self::opendal_enumerator::OpendalEnumerator;
use self::opendal_reader::OpendalReader;
use super::file_common::CompressionFormat;
pub use super::s3::S3PropertiesCommon;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,34 +38,14 @@ pub struct OpendalEnumerator<Src: OpendalSource> {
pub(crate) compression_format: CompressionFormat,
}

#[async_trait]
impl<Src: OpendalSource> SplitEnumerator for OpendalEnumerator<Src> {
type Properties = Src::Properties;
type Split = OpendalFsSplit<Src>;

async fn new(
impl<Src: OpendalSource> OpendalEnumerator<Src> {
pub async fn new(
properties: Src::Properties,
_context: SourceEnumeratorContextRef,
) -> ConnectorResult<Self> {
Src::new_enumerator(properties)
}

async fn list_splits(&mut self) -> ConnectorResult<Vec<OpendalFsSplit<Src>>> {
let empty_split: OpendalFsSplit<Src> = OpendalFsSplit::empty_split();
let prefix = self.prefix.as_deref().unwrap_or("/");

match self.op.list(prefix).await {
Ok(_) => return Ok(vec![empty_split]),
Err(e) => {
return Err(anyhow!(e)
.context("fail to create source, please check your config.")
.into())
}
}
}
}

impl<Src: OpendalSource> OpendalEnumerator<Src> {
pub async fn list(&self) -> ConnectorResult<ObjectMetadataIter> {
let prefix = self.prefix.as_deref().unwrap_or("/");

Expand Down Expand Up @@ -113,4 +93,81 @@ impl<Src: OpendalSource> OpendalEnumerator<Src> {
self.prefix.as_deref().unwrap_or("/")
}
}

#[async_trait]
impl SplitEnumerator for OpendalEnumerator<super::OpendalS3> {
async fn list_splits(
&mut self,
) -> crate::error::ConnectorResult<Vec<crate::source::base::SplitImpl>> {
let empty_split: OpendalFsSplit<super::OpendalS3> = OpendalFsSplit::empty_split();
let prefix = self.prefix.as_deref().unwrap_or("/");

match self.op.list(prefix).await {
Ok(_) => return Ok(vec![empty_split.into()]),
Err(e) => {
return Err(anyhow!(e)
.context("fail to create source, please check your config.")
.into())
}
}
}
}

#[async_trait]
impl SplitEnumerator for OpendalEnumerator<super::OpendalGcs> {
async fn list_splits(
&mut self,
) -> crate::error::ConnectorResult<Vec<crate::source::base::SplitImpl>> {
let empty_split: OpendalFsSplit<super::OpendalGcs> = OpendalFsSplit::empty_split();
let prefix = self.prefix.as_deref().unwrap_or("/");

match self.op.list(prefix).await {
Ok(_) => return Ok(vec![empty_split.into()]),
Err(e) => {
return Err(anyhow!(e)
.context("fail to create source, please check your config.")
.into())
}
}
}
}

#[async_trait]
impl SplitEnumerator for OpendalEnumerator<super::OpendalAzblob> {
async fn list_splits(
&mut self,
) -> crate::error::ConnectorResult<Vec<crate::source::base::SplitImpl>> {
let empty_split: OpendalFsSplit<super::OpendalAzblob> = OpendalFsSplit::empty_split();
let prefix = self.prefix.as_deref().unwrap_or("/");

match self.op.list(prefix).await {
Ok(_) => return Ok(vec![empty_split.into()]),
Err(e) => {
return Err(anyhow!(e)
.context("fail to create source, please check your config.")
.into())
}
}
}
}

#[async_trait]
impl SplitEnumerator for OpendalEnumerator<super::OpendalPosixFs> {
async fn list_splits(
&mut self,
) -> crate::error::ConnectorResult<Vec<crate::source::base::SplitImpl>> {
let empty_split: OpendalFsSplit<super::OpendalPosixFs> = OpendalFsSplit::empty_split();
let prefix = self.prefix.as_deref().unwrap_or("/");

match self.op.list(prefix).await {
Ok(_) => return Ok(vec![empty_split.into()]),
Err(e) => {
return Err(anyhow!(e)
.context("fail to create source, please check your config.")
.into())
}
}
}
}

pub type ObjectMetadataIter = BoxStream<'static, ConnectorResult<FsPageItem>>;
18 changes: 9 additions & 9 deletions src/connector/src/source/filesystem/s3/enumerator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,9 @@ pub struct S3SplitEnumerator {
pub(crate) next_continuation_token: Option<String>,
}

#[async_trait]
impl SplitEnumerator for S3SplitEnumerator {
type Properties = S3Properties;
type Split = FsSplit;

async fn new(
properties: Self::Properties,
impl S3SplitEnumerator {
pub async fn new(
properties: S3Properties,
_context: SourceEnumeratorContextRef,
) -> crate::error::ConnectorResult<Self> {
let config = AwsAuthProps::from(&properties);
Expand All @@ -97,12 +93,16 @@ impl SplitEnumerator for S3SplitEnumerator {
next_continuation_token: None,
})
}
}

#[async_trait]
impl SplitEnumerator for S3SplitEnumerator {

async fn list_splits(&mut self) -> crate::error::ConnectorResult<Vec<Self::Split>> {
async fn list_splits(&mut self) -> crate::error::ConnectorResult<Vec<crate::source::base::SplitImpl>> {
let mut objects = Vec::new();
loop {
let (files, has_finished) = self.get_next_page::<FsSplit>().await?;
objects.extend(files);
objects.extend(files.into_iter().map(|f| f.into()));
if has_finished {
break;
}
Expand Down
Loading

0 comments on commit 4664578

Please sign in to comment.