From 3f20124ff52c92a5c2352299e5a24cf410600ca2 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Tue, 25 Jun 2024 08:37:43 +0300 Subject: [PATCH] [improve][fn] Make producer cache bounded and expiring in Functions/Connectors (#22945) (cherry picked from commit 6fe8100b1fd5d37a6e1bf33803a8904fa3879321) (cherry picked from commit f10708f985417e969c5a0fcd21a023eab9d6c487) (cherry picked from commit 9f5432aaf6dd3e6f629d697d5b852f39c6d137ae) --- pulsar-functions/instance/pom.xml | 5 + .../functions/instance/ContextImpl.java | 86 +++--------- .../instance/JavaInstanceRunnable.java | 8 +- .../functions/instance/ProducerCache.java | 130 +++++++++++++++++ .../pulsar/functions/sink/PulsarSink.java | 89 +++++------- .../functions/instance/ContextImplTest.java | 22 ++- .../pulsar/functions/sink/PulsarSinkTest.java | 132 ++++++++---------- 7 files changed, 267 insertions(+), 205 deletions(-) create mode 100644 pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ProducerCache.java diff --git a/pulsar-functions/instance/pom.xml b/pulsar-functions/instance/pom.xml index 1228869d3d646c..dbbffe7794e6a6 100644 --- a/pulsar-functions/instance/pom.xml +++ b/pulsar-functions/instance/pom.xml @@ -157,6 +157,11 @@ jcommander + + com.github.ben-manes.caffeine + caffeine + + net.jodah typetools diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java index 075e8bc9a764c4..a70f4eaec73454 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java @@ -29,16 +29,15 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.stream.Stream; import lombok.ToString; +import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.ClientBuilder; @@ -85,6 +84,7 @@ /** * This class implements the Context interface exposed to the user. */ +@Slf4j @ToString(exclude = {"pulsarAdmin"}) class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable { private final ProducerBuilderFactory producerBuilderFactory; @@ -98,8 +98,6 @@ class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable private final ClientBuilder clientBuilder; private final PulsarClient client; private final PulsarAdmin pulsarAdmin; - private Map> publishProducers; - private ThreadLocal>> tlPublishProducers; private final TopicSchema topicSchema; @@ -137,12 +135,15 @@ class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable private final Function.FunctionDetails.ComponentType componentType; + private final ProducerCache producerCache; + private final boolean useThreadLocalProducers; + public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client, SecretsProvider secretsProvider, FunctionCollectorRegistry collectorRegistry, String[] metricsLabels, Function.FunctionDetails.ComponentType componentType, ComponentStatsManager statsManager, - StateManager stateManager, PulsarAdmin pulsarAdmin, ClientBuilder clientBuilder) - throws PulsarClientException { + StateManager stateManager, PulsarAdmin pulsarAdmin, ClientBuilder clientBuilder, + ProducerCache producerCache) throws PulsarClientException { this.config = config; this.logger = logger; this.clientBuilder = clientBuilder; @@ -151,14 +152,17 @@ public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client, this.topicSchema = new TopicSchema(client, Thread.currentThread().getContextClassLoader()); this.statsManager = statsManager; - boolean useThreadLocalProducers = false; + this.producerCache = producerCache; Function.ProducerSpec producerSpec = config.getFunctionDetails().getSink().getProducerSpec(); ProducerConfig producerConfig = null; if (producerSpec != null) { producerConfig = FunctionConfigUtils.convertProducerSpecToProducerConfig(producerSpec); useThreadLocalProducers = producerSpec.getUseThreadLocalProducers(); + } else { + useThreadLocalProducers = false; } + producerBuilderFactory = new ProducerBuilderFactory(client, producerConfig, Thread.currentThread().getContextClassLoader(), // This is for backwards compatibility. The PR https://github.com/apache/pulsar/pull/19470 removed @@ -172,12 +176,6 @@ public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client, this.config.getFunctionDetails().getName()), this.config.getInstanceId())); - if (useThreadLocalProducers) { - tlPublishProducers = new ThreadLocal<>(); - } else { - publishProducers = new ConcurrentHashMap<>(); - } - if (config.getFunctionDetails().getUserConfig().isEmpty()) { userConfigs = new HashMap<>(); } else { @@ -535,39 +533,15 @@ public ClientBuilder getPulsarClientBuilder() { } private Producer getProducer(String topicName, Schema schema) throws PulsarClientException { - Producer producer; - if (tlPublishProducers != null) { - Map> producerMap = tlPublishProducers.get(); - if (producerMap == null) { - producerMap = new HashMap<>(); - tlPublishProducers.set(producerMap); - } - producer = (Producer) producerMap.get(topicName); - } else { - producer = (Producer) publishProducers.get(topicName); - } - - if (producer == null) { - Producer newProducer = producerBuilderFactory - .createProducerBuilder(topicName, schema, null) - .properties(producerProperties) - .create(); - - if (tlPublishProducers != null) { - tlPublishProducers.get().put(topicName, newProducer); - } else { - Producer existingProducer = (Producer) publishProducers.putIfAbsent(topicName, newProducer); - - if (existingProducer != null) { - // The value in the map was not updated after the concurrent put - newProducer.close(); - producer = existingProducer; - } else { - producer = newProducer; - } - } - } - return producer; + Long additionalCacheKey = useThreadLocalProducers ? Thread.currentThread().getId() : null; + return producerCache.getOrCreateProducer(ProducerCache.CacheArea.CONTEXT_CACHE, + topicName, additionalCacheKey, () -> { + log.info("Initializing producer on topic {} with schema {}", topicName, schema); + return producerBuilderFactory + .createProducerBuilder(topicName, schema, null) + .properties(producerProperties) + .create(); + }); } public Map getAndResetMetrics() { @@ -706,29 +680,9 @@ public void setUnderlyingBuilder(TypedMessageBuilder underlyingBuilder) { @Override public void close() { - List futures = new LinkedList<>(); - - if (publishProducers != null) { - for (Producer producer : publishProducers.values()) { - futures.add(producer.closeAsync()); - } - } - - if (tlPublishProducers != null) { - for (Producer producer : tlPublishProducers.get().values()) { - futures.add(producer.closeAsync()); - } - } - if (pulsarAdmin != null) { pulsarAdmin.close(); } - - try { - CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(); - } catch (InterruptedException | ExecutionException e) { - logger.warn("Failed to close producers", e); - } } @Override diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index 7459f553efa731..a3718f68197c6a 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -168,6 +168,8 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { private final AtomicReference> sinkSchema = new AtomicReference<>(); private SinkSchemaInfoProvider sinkSchemaInfoProvider = null; + private final ProducerCache producerCache = new ProducerCache(); + public JavaInstanceRunnable(InstanceConfig instanceConfig, ClientBuilder clientBuilder, PulsarClient pulsarClient, @@ -287,7 +289,7 @@ ContextImpl setupContext() throws PulsarClientException { Thread.currentThread().setContextClassLoader(functionClassLoader); return new ContextImpl(instanceConfig, instanceLog, client, secretsProvider, collectorRegistry, metricsLabels, this.componentType, this.stats, stateManager, - pulsarAdmin, clientBuilder); + pulsarAdmin, clientBuilder, producerCache); } finally { Thread.currentThread().setContextClassLoader(clsLoader); } @@ -583,6 +585,8 @@ public synchronized void close() { instanceCache = null; + producerCache.close(); + if (logAppender != null) { removeLogTopicAppender(LoggerContext.getContext()); removeLogTopicAppender(LoggerContext.getContext(false)); @@ -1001,7 +1005,7 @@ private void setupOutput(ContextImpl contextImpl) throws Exception { } object = new PulsarSink(this.client, pulsarSinkConfig, this.properties, this.stats, - this.functionClassLoader); + this.functionClassLoader, this.producerCache); } } else { object = Reflections.createInstance( diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ProducerCache.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ProducerCache.java new file mode 100644 index 00000000000000..f68c4e9589558e --- /dev/null +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ProducerCache.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.instance; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.Scheduler; +import com.google.common.annotations.VisibleForTesting; +import java.io.Closeable; +import java.time.Duration; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.common.util.FutureUtil; + +@Slf4j +public class ProducerCache implements Closeable { + // allow tuning the cache timeout with PRODUCER_CACHE_TIMEOUT_SECONDS env variable + private static final int PRODUCER_CACHE_TIMEOUT_SECONDS = + Integer.parseInt(System.getenv().getOrDefault("PRODUCER_CACHE_TIMEOUT_SECONDS", "300")); + // allow tuning the cache size with PRODUCER_CACHE_MAX_SIZE env variable + private static final int PRODUCER_CACHE_MAX_SIZE = + Integer.parseInt(System.getenv().getOrDefault("PRODUCER_CACHE_MAX_SIZE", "10000")); + private static final int FLUSH_OR_CLOSE_TIMEOUT_SECONDS = 60; + + // prevents the different producers created in different code locations from mixing up + public enum CacheArea { + // producers created by calling Context, SinkContext, SourceContext methods + CONTEXT_CACHE, + // producers created in Pulsar Sources, multiple topics are possible by returning destination topics + // by SinkRecord.getDestinationTopic call + SINK_RECORD_CACHE, + } + + record ProducerCacheKey(CacheArea cacheArea, String topic, Object additionalKey) { + } + + private final Cache> cache; + private final AtomicBoolean closed = new AtomicBoolean(false); + private final CopyOnWriteArrayList> closeFutures = new CopyOnWriteArrayList<>(); + + public ProducerCache() { + Caffeine builder = Caffeine.newBuilder() + .scheduler(Scheduler.systemScheduler()) + .removalListener((key, producer, cause) -> { + log.info("Closing producer for topic {}, cause {}", key.topic(), cause); + CompletableFuture closeFuture = + producer.flushAsync() + .orTimeout(FLUSH_OR_CLOSE_TIMEOUT_SECONDS, TimeUnit.SECONDS) + .exceptionally(ex -> { + log.error("Error flushing producer for topic {}", key.topic(), ex); + return null; + }).thenCompose(__ -> + producer.closeAsync().orTimeout(FLUSH_OR_CLOSE_TIMEOUT_SECONDS, + TimeUnit.SECONDS) + .exceptionally(ex -> { + log.error("Error closing producer for topic {}", key.topic(), + ex); + return null; + })); + if (closed.get()) { + closeFutures.add(closeFuture); + } + }) + .weigher((key, producer) -> Math.max(producer.getNumOfPartitions(), 1)) + .maximumWeight(PRODUCER_CACHE_MAX_SIZE); + if (PRODUCER_CACHE_TIMEOUT_SECONDS > 0) { + builder.expireAfterAccess(Duration.ofSeconds(PRODUCER_CACHE_TIMEOUT_SECONDS)); + } + cache = builder.build(); + } + + public Producer getOrCreateProducer(CacheArea cacheArea, String topicName, Object additionalCacheKey, + Callable> supplier) { + if (closed.get()) { + throw new IllegalStateException("ProducerCache is already closed"); + } + return (Producer) cache.get(new ProducerCacheKey(cacheArea, topicName, additionalCacheKey), key -> { + try { + return supplier.call(); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException("Unable to create producer for topic '" + topicName + "'", e); + } + }); + } + + public void close() { + if (closed.compareAndSet(false, true)) { + cache.invalidateAll(); + try { + FutureUtil.waitForAll(closeFutures).get(); + } catch (InterruptedException | ExecutionException e) { + log.warn("Failed to close producers", e); + } + } + } + + @VisibleForTesting + public boolean containsKey(CacheArea cacheArea, String topic) { + return containsKey(cacheArea, topic, null); + } + + @VisibleForTesting + public boolean containsKey(CacheArea cacheArea, String topic, Object additionalCacheKey) { + return cache.getIfPresent(new ProducerCacheKey(cacheArea, topic, additionalCacheKey)) != null; + } +} diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java index 18e55e8e84de1f..da6b8006eb987f 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java @@ -20,19 +20,15 @@ import com.google.common.annotations.VisibleForTesting; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; import java.util.Base64; -import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; @@ -48,6 +44,7 @@ import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.functions.instance.AbstractSinkRecord; import org.apache.pulsar.functions.instance.ProducerBuilderFactory; +import org.apache.pulsar.functions.instance.ProducerCache; import org.apache.pulsar.functions.instance.stats.ComponentStatsManager; import org.apache.pulsar.functions.source.PulsarRecord; import org.apache.pulsar.functions.source.TopicSchema; @@ -62,6 +59,7 @@ public class PulsarSink implements Sink { private final Map properties; private final ClassLoader functionClassLoader; private ComponentStatsManager stats; + private final ProducerCache producerCache; @VisibleForTesting PulsarSinkProcessor pulsarSinkProcessor; @@ -80,43 +78,25 @@ private interface PulsarSinkProcessor { } abstract class PulsarSinkProcessorBase implements PulsarSinkProcessor { - protected Map> publishProducers = new ConcurrentHashMap<>(); - protected Producer getProducer(String destinationTopic, Schema schema) { - return getProducer(destinationTopic, null, destinationTopic, schema); + return getProducer(destinationTopic, schema, null, null); } - protected Producer getProducer(String producerId, String producerName, String topicName, Schema schema) { - return publishProducers.computeIfAbsent(producerId, s -> { - try { - log.info("Initializing producer {} on topic {} with schema {}", - producerName, topicName, schema); - Producer producer = createProducer( - topicName, - schema, producerName - ); - log.info("Initialized producer {} on topic {} with schema {}: {} -> {}", - producerName, topicName, schema, producerId, producer); - return producer; - } catch (PulsarClientException e) { - log.error("Failed to create Producer while doing user publish", e); - throw new RuntimeException(e); - } - }); + protected Producer getProducer(String topicName, Schema schema, String producerName, String partitionId) { + return producerCache.getOrCreateProducer(ProducerCache.CacheArea.SINK_RECORD_CACHE, topicName, partitionId, + () -> { + Producer producer = createProducer(topicName, schema, producerName); + log.info( + "Initialized producer with name '{}' on topic '{}' with schema {} partitionId {} " + + "-> {}", + producerName, topicName, schema, partitionId, producer); + return producer; + }); } @Override public void close() throws Exception { - List> closeFutures = new ArrayList<>(publishProducers.size()); - for (Map.Entry> entry : publishProducers.entrySet()) { - Producer producer = entry.getValue(); - closeFutures.add(producer.closeAsync()); - } - try { - org.apache.pulsar.common.util.FutureUtil.waitForAll(closeFutures); - } catch (Exception e) { - log.warn("Failed to close all the producers", e); - } + // no op } public Function getPublishErrorHandler(AbstractSinkRecord record, boolean failSource) { @@ -153,13 +133,7 @@ class PulsarSinkAtMostOnceProcessor extends PulsarSinkProcessorBase { public PulsarSinkAtMostOnceProcessor() { if (!(schema instanceof AutoConsumeSchema)) { // initialize default topic - try { - publishProducers.put(pulsarSinkConfig.getTopic(), - createProducer(pulsarSinkConfig.getTopic(), schema, null)); - } catch (PulsarClientException e) { - log.error("Failed to create Producer while doing user publish", e); - throw new RuntimeException(e); - } + getProducer(pulsarSinkConfig.getTopic(), schema); } else { if (log.isDebugEnabled()) { log.debug("The Pulsar producer is not initialized until the first record is" @@ -232,13 +206,10 @@ public TypedMessageBuilder newMessage(AbstractSinkRecord record) { // we must use the destination topic schema schemaToWrite = schema; } - Producer producer = getProducer( - String.format("%s-%s", record.getDestinationTopic().orElse(pulsarSinkConfig.getTopic()), - record.getPartitionId().get()), - record.getPartitionId().get(), - record.getDestinationTopic().orElse(pulsarSinkConfig.getTopic()), - schemaToWrite - ); + String topicName = record.getDestinationTopic().orElse(pulsarSinkConfig.getTopic()); + String partitionId = record.getPartitionId().get(); + String producerName = partitionId; + Producer producer = getProducer(topicName, schemaToWrite, producerName, partitionId); if (schemaToWrite != null) { return producer.newMessage(schemaToWrite); } else { @@ -263,13 +234,14 @@ public void sendOutputMessage(TypedMessageBuilder msg, AbstractSinkRecord } public PulsarSink(PulsarClient client, PulsarSinkConfig pulsarSinkConfig, Map properties, - ComponentStatsManager stats, ClassLoader functionClassLoader) { + ComponentStatsManager stats, ClassLoader functionClassLoader, ProducerCache producerCache) { this.client = client; this.pulsarSinkConfig = pulsarSinkConfig; this.topicSchema = new TopicSchema(client, functionClassLoader); this.properties = properties; this.stats = stats; this.functionClassLoader = functionClassLoader; + this.producerCache = producerCache; } @Override @@ -341,14 +313,17 @@ public void close() throws Exception { } } - Producer createProducer(String topic, Schema schema, String producerName) - throws PulsarClientException { - ProducerBuilder builder = - producerBuilderFactory.createProducerBuilder(topic, schema != null ? schema : this.schema, - producerName); - return builder - .properties(properties) - .create(); + Producer createProducer(String topicName, Schema schema, String producerName) { + Schema schemaToUse = schema != null ? schema : this.schema; + try { + log.info("Initializing producer {} on topic {} with schema {}", producerName, topicName, schemaToUse); + return producerBuilderFactory.createProducerBuilder(topicName, schemaToUse, producerName) + .properties(properties) + .create(); + } catch (PulsarClientException e) { + throw new RuntimeException("Failed to create Producer for topic " + topicName + + " producerName " + producerName + " schema " + schemaToUse, e); + } } @SuppressWarnings("unchecked") diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java index 108d8e4b666639..e62838ed3d2091 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java @@ -71,6 +71,7 @@ import org.mockito.Mockito; import org.slf4j.Logger; import org.testng.Assert; +import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -86,6 +87,7 @@ public class ContextImplTest { private PulsarAdmin pulsarAdmin; private ContextImpl context; private Producer producer; + private ProducerCache producerCache; @BeforeMethod(alwaysRun = true) public void setup() throws PulsarClientException { @@ -116,16 +118,24 @@ public void setup() throws PulsarClientException { TypedMessageBuilder messageBuilder = spy(new TypedMessageBuilderImpl(mock(ProducerBase.class), Schema.STRING)); doReturn(new CompletableFuture<>()).when(messageBuilder).sendAsync(); when(producer.newMessage()).thenReturn(messageBuilder); + doReturn(CompletableFuture.completedFuture(null)).when(producer).flushAsync(); + producerCache = new ProducerCache(); context = new ContextImpl( config, logger, client, new EnvironmentBasedSecretsProvider(), FunctionCollectorRegistry.getDefaultImplementation(), new String[0], FunctionDetails.ComponentType.FUNCTION, null, new InstanceStateManager(), - pulsarAdmin, clientBuilder); + pulsarAdmin, clientBuilder, producerCache); context.setCurrentMessageContext((Record) () -> null); } + @AfterMethod(alwaysRun = true) + public void tearDown() { + producerCache.close(); + producerCache = null; + } + @Test(expectedExceptions = IllegalStateException.class) public void testIncrCounterStateDisabled() { context.incrCounter("test-key", 10); @@ -236,7 +246,7 @@ public void testGetPulsarAdminWithExposePulsarAdminDisabled() throws PulsarClien new EnvironmentBasedSecretsProvider(), FunctionCollectorRegistry.getDefaultImplementation(), new String[0], FunctionDetails.ComponentType.FUNCTION, null, new InstanceStateManager(), - pulsarAdmin, clientBuilder); + pulsarAdmin, clientBuilder, producerCache); context.getPulsarAdmin(); } @@ -250,7 +260,7 @@ public void testUnsupportedExtendedSinkContext() throws PulsarClientException { new EnvironmentBasedSecretsProvider(), FunctionCollectorRegistry.getDefaultImplementation(), new String[0], FunctionDetails.ComponentType.FUNCTION, null, new InstanceStateManager(), - pulsarAdmin, clientBuilder); + pulsarAdmin, clientBuilder, producerCache); try { context.seek("z", 0, Mockito.mock(MessageId.class)); Assert.fail("Expected exception"); @@ -281,7 +291,7 @@ public void testExtendedSinkContext() throws PulsarClientException { new EnvironmentBasedSecretsProvider(), FunctionCollectorRegistry.getDefaultImplementation(), new String[0], FunctionDetails.ComponentType.FUNCTION, null, new InstanceStateManager(), - pulsarAdmin, clientBuilder); + pulsarAdmin, clientBuilder, producerCache); Consumer mockConsumer = Mockito.mock(Consumer.class); when(mockConsumer.getTopic()).thenReturn(TopicName.get("z").toString()); context.setInputConsumers(Lists.newArrayList(mockConsumer)); @@ -313,7 +323,7 @@ public void testGetConsumer() throws PulsarClientException { new EnvironmentBasedSecretsProvider(), FunctionCollectorRegistry.getDefaultImplementation(), new String[0], FunctionDetails.ComponentType.FUNCTION, null, new InstanceStateManager(), - pulsarAdmin, clientBuilder); + pulsarAdmin, clientBuilder, producerCache); Consumer mockConsumer = Mockito.mock(Consumer.class); when(mockConsumer.getTopic()).thenReturn(TopicName.get("z").toString()); context.setInputConsumers(Lists.newArrayList(mockConsumer)); @@ -337,7 +347,7 @@ public void testGetConsumerMultiTopic() throws PulsarClientException { new EnvironmentBasedSecretsProvider(), FunctionCollectorRegistry.getDefaultImplementation(), new String[0], FunctionDetails.ComponentType.FUNCTION, null, new InstanceStateManager(), - pulsarAdmin, clientBuilder); + pulsarAdmin, clientBuilder, producerCache); ConsumerImpl consumer1 = Mockito.mock(ConsumerImpl.class); when(consumer1.getTopic()).thenReturn(TopicName.get("first").toString()); ConsumerImpl consumer2 = Mockito.mock(ConsumerImpl.class); diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java index 799bad839a451c..8a946a3f7571b2 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java @@ -18,13 +18,13 @@ */ package org.apache.pulsar.functions.sink; +import static org.apache.pulsar.functions.instance.ProducerCache.CacheArea.SINK_RECORD_CACHE; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -37,7 +37,6 @@ import static org.testng.Assert.fail; import java.io.IOException; import java.util.HashMap; -import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; import lombok.Getter; @@ -65,12 +64,14 @@ import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.functions.api.SerDe; +import org.apache.pulsar.functions.instance.ProducerCache; import org.apache.pulsar.functions.instance.SinkRecord; import org.apache.pulsar.functions.instance.stats.ComponentStatsManager; import org.apache.pulsar.functions.sink.PulsarSink.PulsarSinkProcessorBase; import org.apache.pulsar.functions.source.TopicSchema; import org.apache.pulsar.io.core.SinkContext; import org.testng.Assert; +import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -132,6 +133,7 @@ private static PulsarClientImpl getPulsarClient() throws PulsarClientException { doReturn(producer).when(producerBuilder).create(); doReturn(typedMessageBuilder).when(producer).newMessage(); doReturn(typedMessageBuilder).when(producer).newMessage(any(Schema.class)); + doReturn(CompletableFuture.completedFuture(null)).when(producer).flushAsync(); doReturn(producerBuilder).when(pulsarClient).newProducer(); doReturn(producerBuilder).when(pulsarClient).newProducer(any()); @@ -139,9 +141,17 @@ private static PulsarClientImpl getPulsarClient() throws PulsarClientException { return pulsarClient; } - @BeforeMethod + ProducerCache producerCache; + + @BeforeMethod(alwaysRun = true) public void setup() { + producerCache = new ProducerCache(); + } + @AfterMethod(alwaysRun = true) + public void tearDown() { + producerCache.close(); + producerCache = null; } private static PulsarSinkConfig getPulsarConfigs() { @@ -182,7 +192,7 @@ public void testVoidOutputClasses() throws Exception { pulsarConfig.setTypeClassName(Void.class.getName()); PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, new HashMap<>(), mock(ComponentStatsManager.class), - Thread.currentThread().getContextClassLoader()); + Thread.currentThread().getContextClassLoader(), producerCache); try { Schema schema = pulsarSink.initializeSchema(); @@ -202,7 +212,7 @@ public void testInconsistentOutputType() throws IOException { pulsarConfig.setSerdeClassName(TestSerDe.class.getName()); PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, new HashMap<>(), mock(ComponentStatsManager.class), - Thread.currentThread().getContextClassLoader()); + Thread.currentThread().getContextClassLoader(), producerCache); try { pulsarSink.initializeSchema(); fail("Should fail constructing java instance if function type is inconsistent with serde type"); @@ -227,7 +237,7 @@ public void testDefaultSerDe() throws PulsarClientException { pulsarConfig.setTypeClassName(String.class.getName()); PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, new HashMap<>(), mock(ComponentStatsManager.class), - Thread.currentThread().getContextClassLoader()); + Thread.currentThread().getContextClassLoader(), producerCache); try { pulsarSink.initializeSchema(); @@ -248,7 +258,7 @@ public void testExplicitDefaultSerDe() throws PulsarClientException { pulsarConfig.setSerdeClassName(TopicSchema.DEFAULT_SERDE); PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, new HashMap<>(), mock(ComponentStatsManager.class), - Thread.currentThread().getContextClassLoader()); + Thread.currentThread().getContextClassLoader(), producerCache); try { pulsarSink.initializeSchema(); @@ -266,7 +276,7 @@ public void testComplexOuputType() throws PulsarClientException { pulsarConfig.setSerdeClassName(ComplexSerDe.class.getName()); PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, new HashMap<>(), mock(ComponentStatsManager.class), - Thread.currentThread().getContextClassLoader()); + Thread.currentThread().getContextClassLoader(), producerCache); try { pulsarSink.initializeSchema(); @@ -286,7 +296,7 @@ public void testInitializeSchema() throws Exception { pulsarSinkConfig.setTypeClassName(GenericRecord.class.getName()); PulsarSink sink = new PulsarSink( pulsarClient, pulsarSinkConfig, new HashMap<>(), mock(ComponentStatsManager.class), - Thread.currentThread().getContextClassLoader()); + Thread.currentThread().getContextClassLoader(), producerCache); Schema schema = sink.initializeSchema(); assertTrue(schema instanceof AutoConsumeSchema); @@ -295,7 +305,7 @@ public void testInitializeSchema() throws Exception { pulsarSinkConfig.setTypeClassName(GenericRecord.class.getName()); sink = new PulsarSink( pulsarClient, pulsarSinkConfig, new HashMap<>(), mock(ComponentStatsManager.class), - Thread.currentThread().getContextClassLoader()); + Thread.currentThread().getContextClassLoader(), producerCache); schema = sink.initializeSchema(); assertTrue(schema instanceof AutoConsumeSchema); @@ -306,7 +316,7 @@ public void testInitializeSchema() throws Exception { pulsarSinkConfig.setTypeClassName(GenericRecord.class.getName()); sink = new PulsarSink( pulsarClient, pulsarSinkConfig, new HashMap<>(), mock(ComponentStatsManager.class), - Thread.currentThread().getContextClassLoader()); + Thread.currentThread().getContextClassLoader(), producerCache); schema = sink.initializeSchema(); assertTrue(schema instanceof AutoConsumeSchema); @@ -317,7 +327,7 @@ public void testInitializeSchema() throws Exception { pulsarSinkConfig.setTypeClassName(GenericRecord.class.getName()); sink = new PulsarSink( pulsarClient, pulsarSinkConfig, new HashMap<>(), mock(ComponentStatsManager.class), - Thread.currentThread().getContextClassLoader()); + Thread.currentThread().getContextClassLoader(), producerCache); schema = sink.initializeSchema(); assertTrue(schema instanceof AutoConsumeSchema); @@ -327,7 +337,7 @@ public void testInitializeSchema() throws Exception { pulsarSinkConfig.setTypeClassName(GenericRecord.class.getName()); sink = new PulsarSink( pulsarClient, pulsarSinkConfig, new HashMap<>(), mock(ComponentStatsManager.class), - Thread.currentThread().getContextClassLoader()); + Thread.currentThread().getContextClassLoader(), producerCache); schema = sink.initializeSchema(); assertTrue(schema instanceof AutoConsumeSchema); } @@ -344,9 +354,12 @@ public void testSinkAndMessageRouting() throws Exception { /** test MANUAL **/ pulsarClient = getPulsarClient(); pulsarConfig.setProcessingGuarantees(ProcessingGuarantees.MANUAL); - PulsarSink pulsarSink = new PulsarSink(pulsarClient, pulsarConfig, new HashMap<>(), mock(ComponentStatsManager.class), Thread.currentThread().getContextClassLoader()); + PulsarSink pulsarSink = + new PulsarSink(pulsarClient, pulsarConfig, new HashMap<>(), mock(ComponentStatsManager.class), + Thread.currentThread().getContextClassLoader(), producerCache); pulsarSink.open(new HashMap<>(), mock(SinkContext.class)); + verify(pulsarClient.newProducer(), times(1)).topic(defaultTopic); for (String topic : topics) { @@ -370,23 +383,19 @@ public Optional getDestinationTopic() { PulsarSink.PulsarSinkManualProcessor pulsarSinkManualProcessor = (PulsarSink.PulsarSinkManualProcessor) pulsarSink.pulsarSinkProcessor; if (topic != null) { - Assert.assertTrue(pulsarSinkManualProcessor.publishProducers.containsKey(topic)); + Assert.assertTrue(producerCache.containsKey(SINK_RECORD_CACHE, topic)); } else { - Assert.assertTrue(pulsarSinkManualProcessor.publishProducers.containsKey(defaultTopic)); + Assert.assertTrue(producerCache.containsKey(SINK_RECORD_CACHE, defaultTopic)); } - verify(pulsarClient.newProducer(), times(1)).topic(argThat(otherTopic -> { - if (topic != null) { - return topic.equals(otherTopic); - } else { - return defaultTopic.equals(otherTopic); - } - })); + String actualTopic = topic != null ? topic : defaultTopic; + verify(pulsarClient.newProducer(), times(1)).topic(actualTopic); } /** test At-least-once **/ pulsarClient = getPulsarClient(); pulsarConfig.setProcessingGuarantees(ProcessingGuarantees.ATLEAST_ONCE); - pulsarSink = new PulsarSink(pulsarClient, pulsarConfig, new HashMap<>(), mock(ComponentStatsManager.class), Thread.currentThread().getContextClassLoader()); + pulsarSink = new PulsarSink(pulsarClient, pulsarConfig, new HashMap<>(), mock(ComponentStatsManager.class), + Thread.currentThread().getContextClassLoader(), producerCache); pulsarSink.open(new HashMap<>(), mock(SinkContext.class)); @@ -410,24 +419,17 @@ public Optional getDestinationTopic() { PulsarSink.PulsarSinkAtLeastOnceProcessor pulsarSinkAtLeastOnceProcessor = (PulsarSink.PulsarSinkAtLeastOnceProcessor) pulsarSink.pulsarSinkProcessor; if (topic != null) { - Assert.assertTrue(pulsarSinkAtLeastOnceProcessor.publishProducers.containsKey(topic)); + Assert.assertTrue(producerCache.containsKey(SINK_RECORD_CACHE, topic)); } else { - Assert.assertTrue(pulsarSinkAtLeastOnceProcessor.publishProducers.containsKey(defaultTopic)); + Assert.assertTrue(producerCache.containsKey(SINK_RECORD_CACHE, defaultTopic)); } - verify(pulsarClient.newProducer(), times(1)).topic(argThat(otherTopic -> { - if (topic != null) { - return topic.equals(otherTopic); - } else { - return defaultTopic.equals(otherTopic); - } - })); } /** test At-most-once **/ pulsarClient = getPulsarClient(); pulsarConfig.setProcessingGuarantees(ProcessingGuarantees.ATMOST_ONCE); pulsarSink = new PulsarSink(pulsarClient, pulsarConfig, new HashMap<>(), mock(ComponentStatsManager.class), - Thread.currentThread().getContextClassLoader()); + Thread.currentThread().getContextClassLoader(), producerCache); pulsarSink.open(new HashMap<>(), mock(SinkContext.class)); @@ -457,20 +459,17 @@ public Optional getDestinationTopic() { PulsarSink.PulsarSinkAtMostOnceProcessor pulsarSinkAtLeastOnceProcessor = (PulsarSink.PulsarSinkAtMostOnceProcessor) pulsarSink.pulsarSinkProcessor; if (topic != null) { - Assert.assertTrue(pulsarSinkAtLeastOnceProcessor.publishProducers.containsKey(topic)); + Assert.assertTrue(producerCache.containsKey(SINK_RECORD_CACHE, topic)); } else { - Assert.assertTrue(pulsarSinkAtLeastOnceProcessor.publishProducers.containsKey(defaultTopic)); + Assert.assertTrue(producerCache.containsKey(SINK_RECORD_CACHE, defaultTopic)); } - verify(pulsarClient.newProducer(), times(1)).topic(argThat(o -> { - return getTopicEquals(o, topic, defaultTopic); - })); } /** test Effectively-once **/ pulsarClient = getPulsarClient(); pulsarConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE); pulsarSink = new PulsarSink(pulsarClient, pulsarConfig, new HashMap<>(), mock(ComponentStatsManager.class), - Thread.currentThread().getContextClassLoader()); + Thread.currentThread().getContextClassLoader(), producerCache); pulsarSink.open(new HashMap<>(), mock(SinkContext.class)); @@ -520,23 +519,19 @@ public Optional getRecordSequence() { PulsarSink.PulsarSinkEffectivelyOnceProcessor pulsarSinkEffectivelyOnceProcessor = (PulsarSink.PulsarSinkEffectivelyOnceProcessor) pulsarSink.pulsarSinkProcessor; if (topic != null) { - Assert.assertTrue(pulsarSinkEffectivelyOnceProcessor.publishProducers - .containsKey(String.format("%s-%s-id-1", topic, topic))); + Assert.assertTrue(producerCache + .containsKey(SINK_RECORD_CACHE, topic, String.format("%s-id-1", topic))); } else { - Assert.assertTrue(pulsarSinkEffectivelyOnceProcessor.publishProducers - .containsKey(String.format("%s-%s-id-1", defaultTopic, defaultTopic))); + Assert.assertTrue(producerCache + .containsKey(SINK_RECORD_CACHE, + defaultTopic, String.format("%s-id-1", defaultTopic) + )); } - verify(pulsarClient.newProducer(), times(1)).topic(argThat(o -> { - return getTopicEquals(o, topic, defaultTopic); - })); - verify(pulsarClient.newProducer(), times(1)).producerName(argThat(o -> { - if (topic != null) { - return String.format("%s-id-1", topic).equals(o); - } else { - return String.format("%s-id-1", defaultTopic).equals(o); - } - })); + String expectedTopicName = topic != null ? topic : defaultTopic; + verify(pulsarClient.newProducer(), times(1)).topic(expectedTopicName); + String expectedProducerName = String.format("%s-id-1", expectedTopicName); + verify(pulsarClient.newProducer(), times(1)).producerName(expectedProducerName); } } @@ -566,7 +561,7 @@ private void testWriteGenericRecords(ProcessingGuarantees guarantees) throws Exc PulsarClient client = getPulsarClient(); PulsarSink pulsarSink = new PulsarSink( client, sinkConfig, new HashMap<>(), mock(ComponentStatsManager.class), - Thread.currentThread().getContextClassLoader()); + Thread.currentThread().getContextClassLoader(), producerCache); pulsarSink.open(new HashMap<>(), mock(SinkContext.class)); @@ -578,7 +573,7 @@ private void testWriteGenericRecords(ProcessingGuarantees guarantees) throws Exc assertTrue(pulsarSink.pulsarSinkProcessor instanceof PulsarSink.PulsarSinkEffectivelyOnceProcessor); } PulsarSinkProcessorBase processor = (PulsarSinkProcessorBase) pulsarSink.pulsarSinkProcessor; - assertFalse(processor.publishProducers.containsKey(defaultTopic)); + assertFalse(producerCache.containsKey(SINK_RECORD_CACHE, defaultTopic)); String[] topics = {"topic-1", "topic-2", "topic-3"}; for (String topic : topics) { @@ -625,17 +620,15 @@ public Optional getRecordSequence() { pulsarSink.write(record); if (ProcessingGuarantees.EFFECTIVELY_ONCE == guarantees) { - assertTrue(processor.publishProducers.containsKey(String.format("%s-%s-id-1", topic, topic))); + assertTrue(producerCache.containsKey(SINK_RECORD_CACHE, + topic, String.format("%s-id-1", topic) + )); } else { - assertTrue(processor.publishProducers.containsKey(topic)); + assertTrue(producerCache.containsKey(SINK_RECORD_CACHE, topic)); } - verify(client.newProducer(), times(1)) - .topic(argThat( - otherTopic -> topic != null ? topic.equals(otherTopic) : defaultTopic.equals(otherTopic))); - - verify(client, times(1)) - .newProducer(argThat( - otherSchema -> Objects.equals(otherSchema, schema))); + String expectedTopicName = topic != null ? topic : defaultTopic; + verify(client.newProducer(), times(1)).topic(expectedTopicName); + verify(client, times(1)).newProducer(schema); } } @@ -646,13 +639,4 @@ private Optional getTopicOptional(String topic) { return Optional.empty(); } } - - private boolean getTopicEquals(Object o, String topic, String defaultTopic) { - if (topic != null) { - return topic.equals(o); - } else { - return defaultTopic.equals(o); - } - } - }