Skip to content

Commit

Permalink
use time based eviction for admin client
Browse files Browse the repository at this point in the history
Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan committed Jan 17, 2025
1 parent 134f575 commit 8168248
Showing 1 changed file with 19 additions and 3 deletions.
22 changes: 19 additions & 3 deletions src/connector/src/source/kafka/enumerator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,16 @@ use crate::source::SourceEnumeratorContextRef;
type KafkaConsumer = BaseConsumer<RwConsumerContext>;
type KafkaAdmin = AdminClient<RwConsumerContext>;

pub static SHARED_KAFKA_CLIENT: LazyLock<MokaCache<KafkaConnectionProps, Weak<KafkaConsumer>>> =
/// Consumer client is shared, and the cache doesn't manage the lifecycle, so we store `Weak` and no eviction.
pub static SHARED_KAFKA_CONSUMER: LazyLock<MokaCache<KafkaConnectionProps, Weak<KafkaConsumer>>> =
LazyLock::new(|| moka::future::Cache::builder().build());
/// Admin client is short-lived, so we store `Arc` and sets a time-to-idle eviction policy.
pub static SHARED_KAFKA_ADMIN: LazyLock<MokaCache<KafkaConnectionProps, Arc<KafkaAdmin>>> =
LazyLock::new(|| {
moka::future::Cache::builder()
.time_to_idle(Duration::from_secs(5 * 60))
.build()
});

#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum KafkaEnumeratorOffset {
Expand Down Expand Up @@ -107,7 +115,7 @@ impl SplitEnumerator for KafkaSplitEnumerator {
}

let mut client: Option<Arc<KafkaConsumer>> = None;
SHARED_KAFKA_CLIENT
SHARED_KAFKA_CONSUMER
.entry_by_ref(&properties.connection)
.and_try_compute_with::<_, _, ConnectorError>(|maybe_entry| async {
if let Some(entry) = maybe_entry {
Expand Down Expand Up @@ -170,7 +178,15 @@ impl SplitEnumerator for KafkaSplitEnumerator {
}

async fn on_drop_fragments(&mut self, fragment_ids: Vec<u32>) -> ConnectorResult<()> {
let admin = build_kafka_admin(&self.config, &self.properties).await?;
let admin = SHARED_KAFKA_ADMIN
.try_get_with_by_ref(&self.properties.connection, async {
tracing::info!("build new kafka admin for {}", self.broker_address);
Ok(Arc::new(
build_kafka_admin(&self.config, &self.properties).await?,
))
})
.await?;

let group_ids = fragment_ids
.iter()
.map(|fragment_id| self.properties.group_id(*fragment_id))
Expand Down

0 comments on commit 8168248

Please sign in to comment.