Skip to content

Commit

Permalink
add a trait method
Browse files Browse the repository at this point in the history
Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan committed Jan 10, 2025
1 parent 4664578 commit 5bde2a9
Show file tree
Hide file tree
Showing 31 changed files with 251 additions and 376 deletions.
8 changes: 4 additions & 4 deletions src/connector/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,11 +319,11 @@ macro_rules! impl_connector_properties {

$crate::paste! {
impl ConnectorProperties {
pub async fn create_split_enumerator(self, context: crate::source::base::SourceEnumeratorContextRef) -> crate::error::ConnectorResult<Box<dyn SplitEnumerator>> {
let enumerator = match self {
pub async fn create_split_enumerator(self, context: $crate::source::base::SourceEnumeratorContextRef) -> $crate::error::ConnectorResult<Box<dyn SplitEnumerator>> {
let enumerator: Box<dyn SplitEnumerator> = match self {
$(
ConnectorProperties::$variant_name(prop) =>
Box::new(crate::source::prelude::[<$variant_name SplitEnumerator>]::new(*prop, context).await?) as Box<dyn SplitEnumerator>,
ConnectorProperties::$variant_name(prop) =>
Box::new(prop.create_split_enumerator(context).await?),
)*
};
Ok(enumerator)
Expand Down
5 changes: 5 additions & 0 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ pub trait SourceProperties: TryFromBTreeMap + Clone + WithOptions + std::fmt::De

/// Load additional info from `ExternalTableDesc`. Currently only used by CDC.
fn init_from_pb_cdc_table_desc(&mut self, _table_desc: &ExternalTableDesc) {}

async fn create_split_enumerator(
self,
context: crate::source::SourceEnumeratorContextRef,
) -> Result<Self::SplitEnumerator>;
}

pub trait UnknownFields {
Expand Down
34 changes: 10 additions & 24 deletions src/connector/src/source/cdc/enumerator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub struct DebeziumSplitEnumerator<T: CdcSourceTypeTrait> {
impl<T: CdcSourceTypeTrait> DebeziumSplitEnumerator<T> {
pub async fn new(
props: CdcProperties<T>,
context: SourceEnumeratorContextRef,
context: crate::source::SourceEnumeratorContextRef,
) -> ConnectorResult<Self> {
let server_addrs = props
.properties
Expand Down Expand Up @@ -119,8 +119,9 @@ impl<T: CdcSourceTypeTrait> SplitEnumerator for DebeziumSplitEnumerator<T>
where
Self: ListCdcSplits<CdcSourceType = T>,
{

async fn list_splits(&mut self) -> crate::error::ConnectorResult<Vec<crate::source::base::SplitImpl>> {
async fn list_splits(
&mut self,
) -> crate::error::ConnectorResult<Vec<crate::source::base::SplitImpl>> {
Ok(self.list_cdc_splits())
}
}
Expand All @@ -135,11 +136,7 @@ impl ListCdcSplits for DebeziumSplitEnumerator<Mysql> {

fn list_cdc_splits(&mut self) -> Vec<crate::source::base::SplitImpl> {
// CDC source only supports single split
vec![DebeziumCdcSplit::<Self::CdcSourceType>::new(
self.source_id,
None,
None,
).into()]
vec![DebeziumCdcSplit::<Self::CdcSourceType>::new(self.source_id, None, None).into()]
}
}

Expand All @@ -148,11 +145,7 @@ impl ListCdcSplits for DebeziumSplitEnumerator<Postgres> {

fn list_cdc_splits(&mut self) -> Vec<crate::source::base::SplitImpl> {
// CDC source only supports single split
vec![DebeziumCdcSplit::<Self::CdcSourceType>::new(
self.source_id,
None,
None,
).into()]
vec![DebeziumCdcSplit::<Self::CdcSourceType>::new(self.source_id, None, None).into()]
}
}

Expand All @@ -168,7 +161,8 @@ impl ListCdcSplits for DebeziumSplitEnumerator<Citus> {
id as u32,
None,
Some(addr.to_string()),
).into()
)
.into()
})
.collect_vec()
}
Expand All @@ -178,22 +172,14 @@ impl ListCdcSplits for DebeziumSplitEnumerator<Mongodb> {

fn list_cdc_splits(&mut self) -> Vec<crate::source::base::SplitImpl> {
// CDC source only supports single split
vec![DebeziumCdcSplit::<Self::CdcSourceType>::new(
self.source_id,
None,
None,
).into()]
vec![DebeziumCdcSplit::<Self::CdcSourceType>::new(self.source_id, None, None).into()]
}
}

impl ListCdcSplits for DebeziumSplitEnumerator<SqlServer> {
type CdcSourceType = SqlServer;

fn list_cdc_splits(&mut self) -> Vec<crate::source::base::SplitImpl> {
vec![DebeziumCdcSplit::<Self::CdcSourceType>::new(
self.source_id,
None,
None,
).into()]
vec![DebeziumCdcSplit::<Self::CdcSourceType>::new(self.source_id, None, None).into()]
}
}
7 changes: 7 additions & 0 deletions src/connector/src/source/cdc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,13 @@ where
self.is_cdc_source_job = false;
self.is_backfill_table = true;
}

async fn create_split_enumerator(
self,
context: crate::source::SourceEnumeratorContextRef,
) -> crate::error::ConnectorResult<Self::SplitEnumerator> {
Ok(DebeziumSplitEnumerator::new(self, context).await?)
}
}

impl<T: CdcSourceTypeTrait> crate::source::UnknownFields for CdcProperties<T> {
Expand Down
20 changes: 12 additions & 8 deletions src/connector/src/source/datagen/enumerator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub struct DatagenSplitEnumerator {
impl DatagenSplitEnumerator {
pub async fn new(
properties: DatagenProperties,
_context: SourceEnumeratorContextRef,
_context: crate::source::SourceEnumeratorContextRef,
) -> crate::error::ConnectorResult<DatagenSplitEnumerator> {
let split_num = properties.split_num.unwrap_or_else(|| "1".to_owned());
let split_num = split_num
Expand All @@ -38,15 +38,19 @@ impl DatagenSplitEnumerator {

#[async_trait]
impl SplitEnumerator for DatagenSplitEnumerator {

async fn list_splits(&mut self) -> crate::error::ConnectorResult<Vec<crate::source::base::SplitImpl>> {
async fn list_splits(
&mut self,
) -> crate::error::ConnectorResult<Vec<crate::source::base::SplitImpl>> {
let mut splits = vec![];
for i in 0..self.split_num {
splits.push(DatagenSplit {
split_num: self.split_num,
split_index: i,
start_offset: None,
}.into());
splits.push(
DatagenSplit {
split_num: self.split_num,
split_index: i,
start_offset: None,
}
.into(),
);
}
Ok(splits)
}
Expand Down
7 changes: 7 additions & 0 deletions src/connector/src/source/datagen/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@ impl SourceProperties for DatagenProperties {
type SplitReader = DatagenSplitReader;

const SOURCE_NAME: &'static str = DATAGEN_CONNECTOR;

async fn create_split_enumerator(
self,
context: crate::source::SourceEnumeratorContextRef,
) -> crate::error::ConnectorResult<Self::SplitEnumerator> {
DatagenSplitEnumerator::new(self, context).await
}
}

impl crate::source::UnknownFields for DatagenProperties {
Expand Down
28 changes: 28 additions & 0 deletions src/connector/src/source/filesystem/opendal_source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,13 @@ impl SourceProperties for GcsProperties {
type SplitReader = OpendalReader<OpendalGcs>;

const SOURCE_NAME: &'static str = GCS_CONNECTOR;

async fn create_split_enumerator(
self,
context: crate::source::SourceEnumeratorContextRef,
) -> crate::error::ConnectorResult<Self::SplitEnumerator> {
OpendalEnumerator::new(self, context).await
}
}

pub trait OpendalSource: Send + Sync + 'static + Clone + PartialEq {
Expand Down Expand Up @@ -155,6 +162,13 @@ impl SourceProperties for OpendalS3Properties {
type SplitReader = OpendalReader<OpendalS3>;

const SOURCE_NAME: &'static str = OPENDAL_S3_CONNECTOR;

async fn create_split_enumerator(
self,
context: crate::source::SourceEnumeratorContextRef,
) -> crate::error::ConnectorResult<Self::SplitEnumerator> {
OpendalEnumerator::new(self, context).await
}
}

#[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)]
Expand Down Expand Up @@ -188,6 +202,13 @@ impl SourceProperties for PosixFsProperties {
type SplitReader = OpendalReader<OpendalPosixFs>;

const SOURCE_NAME: &'static str = POSIX_FS_CONNECTOR;

async fn create_split_enumerator(
self,
context: crate::source::SourceEnumeratorContextRef,
) -> crate::error::ConnectorResult<Self::SplitEnumerator> {
OpendalEnumerator::new(self, context).await
}
}

#[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)]
Expand Down Expand Up @@ -227,6 +248,13 @@ impl SourceProperties for AzblobProperties {
type SplitReader = OpendalReader<OpendalAzblob>;

const SOURCE_NAME: &'static str = AZBLOB_CONNECTOR;

async fn create_split_enumerator(
self,
context: crate::source::SourceEnumeratorContextRef,
) -> crate::error::ConnectorResult<Self::SplitEnumerator> {
OpendalEnumerator::new(self, context).await
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub struct OpendalEnumerator<Src: OpendalSource> {
impl<Src: OpendalSource> OpendalEnumerator<Src> {
pub async fn new(
properties: Src::Properties,
_context: SourceEnumeratorContextRef,
_context: crate::source::SourceEnumeratorContextRef,
) -> ConnectorResult<Self> {
Src::new_enumerator(properties)
}
Expand Down Expand Up @@ -94,80 +94,33 @@ impl<Src: OpendalSource> OpendalEnumerator<Src> {
}
}

#[async_trait]
impl SplitEnumerator for OpendalEnumerator<super::OpendalS3> {
async fn list_splits(
&mut self,
) -> crate::error::ConnectorResult<Vec<crate::source::base::SplitImpl>> {
let empty_split: OpendalFsSplit<super::OpendalS3> = OpendalFsSplit::empty_split();
let prefix = self.prefix.as_deref().unwrap_or("/");

match self.op.list(prefix).await {
Ok(_) => return Ok(vec![empty_split.into()]),
Err(e) => {
return Err(anyhow!(e)
.context("fail to create source, please check your config.")
.into())
}
}
}
}

#[async_trait]
impl SplitEnumerator for OpendalEnumerator<super::OpendalGcs> {
async fn list_splits(
&mut self,
) -> crate::error::ConnectorResult<Vec<crate::source::base::SplitImpl>> {
let empty_split: OpendalFsSplit<super::OpendalGcs> = OpendalFsSplit::empty_split();
let prefix = self.prefix.as_deref().unwrap_or("/");

match self.op.list(prefix).await {
Ok(_) => return Ok(vec![empty_split.into()]),
Err(e) => {
return Err(anyhow!(e)
.context("fail to create source, please check your config.")
.into())
}
}
}
}

#[async_trait]
impl SplitEnumerator for OpendalEnumerator<super::OpendalAzblob> {
async fn list_splits(
&mut self,
) -> crate::error::ConnectorResult<Vec<crate::source::base::SplitImpl>> {
let empty_split: OpendalFsSplit<super::OpendalAzblob> = OpendalFsSplit::empty_split();
let prefix = self.prefix.as_deref().unwrap_or("/");

match self.op.list(prefix).await {
Ok(_) => return Ok(vec![empty_split.into()]),
Err(e) => {
return Err(anyhow!(e)
.context("fail to create source, please check your config.")
.into())
// we don't use generic since impl From<OpendalFsSplit> for SplitImpl is implemented for concrete types
macro_rules! impl_split_enumerator {
($source_type:ty) => {
#[async_trait]
impl SplitEnumerator for OpendalEnumerator<$source_type> {
async fn list_splits(
&mut self,
) -> crate::error::ConnectorResult<Vec<crate::source::base::SplitImpl>> {
let empty_split: OpendalFsSplit<$source_type> = OpendalFsSplit::empty_split();
let prefix = self.prefix.as_deref().unwrap_or("/");

match self.op.list(prefix).await {
Ok(_) => return Ok(vec![empty_split.into()]),
Err(e) => {
return Err(anyhow!(e)
.context("fail to create source, please check your config.")
.into())
}
}
}
}
}
};
}

#[async_trait]
impl SplitEnumerator for OpendalEnumerator<super::OpendalPosixFs> {
async fn list_splits(
&mut self,
) -> crate::error::ConnectorResult<Vec<crate::source::base::SplitImpl>> {
let empty_split: OpendalFsSplit<super::OpendalPosixFs> = OpendalFsSplit::empty_split();
let prefix = self.prefix.as_deref().unwrap_or("/");

match self.op.list(prefix).await {
Ok(_) => return Ok(vec![empty_split.into()]),
Err(e) => {
return Err(anyhow!(e)
.context("fail to create source, please check your config.")
.into())
}
}
}
}
impl_split_enumerator!(super::OpendalS3);
impl_split_enumerator!(super::OpendalGcs);
impl_split_enumerator!(super::OpendalAzblob);
impl_split_enumerator!(super::OpendalPosixFs);

pub type ObjectMetadataIter = BoxStream<'static, ConnectorResult<FsPageItem>>;
Loading

0 comments on commit 5bde2a9

Please sign in to comment.