Skip to content

Commit

Permalink
[IO] Pass client builder to debezium database history (apache#11293)
Browse files Browse the repository at this point in the history
an alternative approach for apache#11251
  • Loading branch information
sijie authored Aug 20, 2021
1 parent f893c08 commit 12f6566
Show file tree
Hide file tree
Showing 16 changed files with 255 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.api;

import java.io.Serializable;
import java.net.InetSocketAddress;
import java.time.Clock;
import java.util.Map;
Expand All @@ -34,7 +35,7 @@
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface ClientBuilder extends Cloneable {
public interface ClientBuilder extends Serializable, Cloneable {

/**
* Construct the final {@link PulsarClient} instance.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.functions.api;

import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.common.classification.InterfaceAudience;
import org.apache.pulsar.common.classification.InterfaceStability;
Expand Down Expand Up @@ -194,11 +195,27 @@ default <S extends StateStore> S getStateStore(String tenant, String ns, String
void recordMetric(String metricName, double value);

/**
* Get the pulsar client.
* Get the pre-configured pulsar client.
*
* You can use this client to access Pulsar cluster.
* The Function will be responsible for disposing this client.
*
* @return the instance of pulsar client
*/
default PulsarClient getPulsarClient() {
throw new UnsupportedOperationException("not implemented");
}

/**
* Get the pre-configured pulsar client builder.
*
* You can use this Builder to setup client to connect to the Pulsar cluster.
* But you need to close client properly after using it.
*
* @return the instance of pulsar client builder.
*/
default ClientBuilder getPulsarClientBuilder() {
throw new UnsupportedOperationException("not implemented");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import lombok.ToString;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
Expand Down Expand Up @@ -88,6 +89,7 @@ class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable
// Per Message related
private Record<?> record;

private final ClientBuilder clientBuilder;
private final PulsarClient client;
private final PulsarAdmin pulsarAdmin;
private Map<String, Producer<?>> publishProducers;
Expand Down Expand Up @@ -132,9 +134,10 @@ class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable
public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client,
SecretsProvider secretsProvider, FunctionCollectorRegistry collectorRegistry, String[] metricsLabels,
Function.FunctionDetails.ComponentType componentType, ComponentStatsManager statsManager,
StateManager stateManager, PulsarAdmin pulsarAdmin) {
StateManager stateManager, PulsarAdmin pulsarAdmin, ClientBuilder clientBuilder) throws PulsarClientException {
this.config = config;
this.logger = logger;
this.clientBuilder = clientBuilder;
this.client = client;
this.pulsarAdmin = pulsarAdmin;
this.topicSchema = new TopicSchema(client);
Expand Down Expand Up @@ -492,6 +495,11 @@ public PulsarClient getPulsarClient() {
return client;
}

@Override
public ClientBuilder getPulsarClientBuilder() {
return clientBuilder;
}

private <O> Producer<O> getProducer(String topicName, Schema<O> schema) throws PulsarClientException {
Producer<O> producer;
if (tlPublishProducers != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,15 +156,9 @@ public static Map<String, String> getProperties(Function.FunctionDetails.Compone
return properties;
}


public static PulsarClient createPulsarClient(String pulsarServiceUrl, AuthenticationConfig authConfig)
throws PulsarClientException {
return createPulsarClient(pulsarServiceUrl, authConfig, Optional.empty());
}

public static PulsarClient createPulsarClient(String pulsarServiceUrl,
AuthenticationConfig authConfig,
Optional<Long> memoryLimit) throws PulsarClientException {
public static ClientBuilder createPulsarClientBuilder(String pulsarServiceUrl,
AuthenticationConfig authConfig,
Optional<Long> memoryLimit) throws PulsarClientException {
ClientBuilder clientBuilder = null;
if (isNotBlank(pulsarServiceUrl)) {
clientBuilder = PulsarClient.builder().serviceUrl(pulsarServiceUrl);
Expand All @@ -183,10 +177,20 @@ && isNotBlank(authConfig.getClientAuthenticationParameters())) {
clientBuilder.memoryLimit(memoryLimit.get(), SizeUnit.BYTES);
}
clientBuilder.ioThreads(Runtime.getRuntime().availableProcessors());
return clientBuilder.build();
return clientBuilder;
}
log.warn("pulsarServiceUrl cannot be null");
return null;
throw new PulsarClientException("pulsarServiceUrl cannot be null");
}

public static PulsarClient createPulsarClient(String pulsarServiceUrl, AuthenticationConfig authConfig)
throws PulsarClientException {
return createPulsarClient(pulsarServiceUrl, authConfig, Optional.empty());
}

public static PulsarClient createPulsarClient(String pulsarServiceUrl,
AuthenticationConfig authConfig,
Optional<Long> memoryLimit) throws PulsarClientException {
return createPulsarClientBuilder(pulsarServiceUrl, authConfig, memoryLimit).build();
}

public static PulsarAdmin createPulsarAdminClient(String pulsarWebServiceUrl, AuthenticationConfig authConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@
import org.apache.logging.log4j.core.config.Configuration;
import org.apache.logging.log4j.core.config.LoggerConfig;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.functions.ConsumerConfig;
Expand Down Expand Up @@ -90,7 +92,8 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
private final InstanceConfig instanceConfig;

// input topic consumer & output topic producer
private final PulsarClientImpl client;
private final ClientBuilder clientBuilder;
private PulsarClientImpl client;
private final PulsarAdmin pulsarAdmin;

private LogAppender logAppender;
Expand Down Expand Up @@ -134,13 +137,15 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
private ReadWriteLock statsLock = new ReentrantReadWriteLock();

public JavaInstanceRunnable(InstanceConfig instanceConfig,
ClientBuilder clientBuilder,
PulsarClient pulsarClient,
PulsarAdmin pulsarAdmin,
String stateStorageServiceUrl,
SecretsProvider secretsProvider,
FunctionCollectorRegistry collectorRegistry,
ClassLoader functionClassLoader) {
ClassLoader functionClassLoader) throws PulsarClientException {
this.instanceConfig = instanceConfig;
this.clientBuilder = clientBuilder;
this.client = (PulsarClientImpl) pulsarClient;
this.pulsarAdmin = pulsarAdmin;
this.stateStorageServiceUrl = stateStorageServiceUrl;
Expand Down Expand Up @@ -226,12 +231,12 @@ synchronized private void setup() throws Exception {
isInitialized = true;
}

ContextImpl setupContext() {
ContextImpl setupContext() throws PulsarClientException {
Logger instanceLog = LoggerFactory.getILoggerFactory().getLogger(
"function-" + instanceConfig.getFunctionDetails().getName());
return new ContextImpl(instanceConfig, instanceLog, client, secretsProvider,
collectorRegistry, metricsLabels, this.componentType, this.stats, stateManager,
pulsarAdmin);
pulsarAdmin, clientBuilder);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.functions.instance;

import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
Expand Down Expand Up @@ -74,28 +75,33 @@ public class ContextImplTest {

private InstanceConfig config;
private Logger logger;
private ClientBuilder clientBuilder;
private PulsarClientImpl client;
private PulsarAdmin pulsarAdmin;
private ContextImpl context;
private Producer producer = mock(Producer.class);

@BeforeMethod
public void setup() {
public void setup() throws PulsarClientException {
config = new InstanceConfig();
config.setExposePulsarAdminClientEnabled(true);
FunctionDetails functionDetails = FunctionDetails.newBuilder()
.setUserConfig("")
.build();
config.setFunctionDetails(functionDetails);
logger = mock(Logger.class);
client = mock(PulsarClientImpl.class);
pulsarAdmin = mock(PulsarAdmin.class);

client = mock(PulsarClientImpl.class);
when(client.newProducer()).thenReturn(new ProducerBuilderImpl(client, Schema.BYTES));
when(client.createProducerAsync(any(ProducerConfigurationData.class), any(), any()))
.thenReturn(CompletableFuture.completedFuture(producer));
when(client.getSchema(anyString())).thenReturn(CompletableFuture.completedFuture(Optional.empty()));
when(producer.sendAsync(anyString())).thenReturn(CompletableFuture.completedFuture(null));

clientBuilder = mock(ClientBuilder.class);
when(clientBuilder.build()).thenReturn(client);

TypedMessageBuilder messageBuilder = spy(new TypedMessageBuilderImpl(mock(ProducerBase.class), Schema.STRING));
doReturn(new CompletableFuture<>()).when(messageBuilder).sendAsync();
when(producer.newMessage()).thenReturn(messageBuilder);
Expand All @@ -105,7 +111,7 @@ public void setup() {
client,
new EnvironmentBasedSecretsProvider(), FunctionCollectorRegistry.getDefaultImplementation(), new String[0],
FunctionDetails.ComponentType.FUNCTION, null, new InstanceStateManager(),
pulsarAdmin);
pulsarAdmin, clientBuilder);
context.setCurrentMessageContext((Record<String>) () -> null);
}

Expand Down Expand Up @@ -190,28 +196,28 @@ public void testGetPulsarAdmin() throws Exception {
}

@Test(expectedExceptions = IllegalStateException.class)
public void testGetPulsarAdminWithExposePulsarAdminDisabled() {
public void testGetPulsarAdminWithExposePulsarAdminDisabled() throws PulsarClientException {
config.setExposePulsarAdminClientEnabled(false);
context = new ContextImpl(
config,
logger,
client,
new EnvironmentBasedSecretsProvider(), FunctionCollectorRegistry.getDefaultImplementation(), new String[0],
FunctionDetails.ComponentType.FUNCTION, null, new InstanceStateManager(),
pulsarAdmin);
pulsarAdmin, clientBuilder);
context.getPulsarAdmin();
}

@Test
public void testUnsupportedExtendedSinkContext(){
public void testUnsupportedExtendedSinkContext() throws PulsarClientException {
config.setExposePulsarAdminClientEnabled(false);
context = new ContextImpl(
config,
logger,
client,
new EnvironmentBasedSecretsProvider(), FunctionCollectorRegistry.getDefaultImplementation(), new String[0],
FunctionDetails.ComponentType.FUNCTION, null, new InstanceStateManager(),
pulsarAdmin);
pulsarAdmin, clientBuilder);
try {
context.seek("z", 0, Mockito.mock(MessageId.class));
Assert.fail("Expected exception");
Expand Down Expand Up @@ -241,7 +247,7 @@ public void testExtendedSinkContext() throws PulsarClientException {
client,
new EnvironmentBasedSecretsProvider(), FunctionCollectorRegistry.getDefaultImplementation(), new String[0],
FunctionDetails.ComponentType.FUNCTION, null, new InstanceStateManager(),
pulsarAdmin);
pulsarAdmin, clientBuilder);
Consumer<?> mockConsumer = Mockito.mock(Consumer.class);
when(mockConsumer.getTopic()).thenReturn(TopicName.get("z").toString());
context.setInputConsumers(Lists.newArrayList(mockConsumer));
Expand Down Expand Up @@ -272,7 +278,7 @@ public void testGetConsumer() throws PulsarClientException {
client,
new EnvironmentBasedSecretsProvider(), FunctionCollectorRegistry.getDefaultImplementation(), new String[0],
FunctionDetails.ComponentType.FUNCTION, null, new InstanceStateManager(),
pulsarAdmin);
pulsarAdmin, clientBuilder);
Consumer<?> mockConsumer = Mockito.mock(Consumer.class);
when(mockConsumer.getTopic()).thenReturn(TopicName.get("z").toString());
context.setInputConsumers(Lists.newArrayList(mockConsumer));
Expand All @@ -295,7 +301,7 @@ public void testGetConsumerMultiTopic() throws PulsarClientException {
client,
new EnvironmentBasedSecretsProvider(), FunctionCollectorRegistry.getDefaultImplementation(), new String[0],
FunctionDetails.ComponentType.FUNCTION, null, new InstanceStateManager(),
pulsarAdmin);
pulsarAdmin, clientBuilder);
ConsumerImpl<?> consumer1 = Mockito.mock(ConsumerImpl.class);
when(consumer1.getTopic()).thenReturn(TopicName.get("first").toString());
ConsumerImpl<?> consumer2 = Mockito.mock(ConsumerImpl.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.Getter;
import lombok.Setter;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.SerDe;
Expand All @@ -37,6 +38,9 @@
import java.lang.reflect.Method;
import java.util.Map;

import static org.mockito.Mockito.mock;
import static org.powermock.api.mockito.PowerMockito.when;

public class JavaInstanceRunnableTest {

static class IntegerSerDe implements SerDe<Integer> {
Expand Down Expand Up @@ -64,8 +68,10 @@ private static InstanceConfig createInstanceConfig(String outputSerde) {

private JavaInstanceRunnable createRunnable(String outputSerde) throws Exception {
InstanceConfig config = createInstanceConfig(outputSerde);
ClientBuilder clientBuilder = mock(ClientBuilder.class);
when(clientBuilder.build()).thenReturn(null);
JavaInstanceRunnable javaInstanceRunnable = new JavaInstanceRunnable(
config, null, null, null, null, null, null);
config, clientBuilder, null, null, null, null, null, null);
return javaInstanceRunnable;
}

Expand Down Expand Up @@ -126,7 +132,7 @@ public void testStatsManagerNull() throws Exception {

@Test
public void testSinkConfigParsingPreservesOriginalType() throws Exception {
SinkSpecOrBuilder sinkSpec = Mockito.mock(SinkSpecOrBuilder.class);
SinkSpecOrBuilder sinkSpec = mock(SinkSpecOrBuilder.class);
Mockito.when(sinkSpec.getConfigs()).thenReturn("{\"ttl\": 9223372036854775807}");
Map<String, Object> parsedConfig =
new ObjectMapper().readValue(sinkSpec.getConfigs(), new TypeReference<Map<String, Object>>() {});
Expand All @@ -136,7 +142,7 @@ public void testSinkConfigParsingPreservesOriginalType() throws Exception {

@Test
public void testSourceConfigParsingPreservesOriginalType() throws Exception {
SourceSpecOrBuilder sourceSpec = Mockito.mock(SourceSpecOrBuilder.class);
SourceSpecOrBuilder sourceSpec = mock(SourceSpecOrBuilder.class);
Mockito.when(sourceSpec.getConfigs()).thenReturn("{\"ttl\": 9223372036854775807}");
Map<String, Object> parsedConfig =
new ObjectMapper().readValue(sourceSpec.getConfigs(), new TypeReference<Map<String, Object>>() {});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.instance.InstanceUtils;
Expand Down Expand Up @@ -62,6 +63,7 @@ public class ThreadRuntime implements Runtime {
private ThreadGroup threadGroup;
private FunctionCacheManager fnCache;
private String jarFile;
private ClientBuilder clientBuilder;
private PulsarClient pulsarClient;
private PulsarAdmin pulsarAdmin;
private String stateStorageServiceUrl;
Expand All @@ -74,7 +76,8 @@ public class ThreadRuntime implements Runtime {
FunctionCacheManager fnCache,
ThreadGroup threadGroup,
String jarFile,
PulsarClient pulsarClient,
PulsarClient client,
ClientBuilder clientBuilder,
PulsarAdmin pulsarAdmin,
String stateStorageServiceUrl,
SecretsProvider secretsProvider,
Expand All @@ -89,7 +92,8 @@ public class ThreadRuntime implements Runtime {
this.threadGroup = threadGroup;
this.fnCache = fnCache;
this.jarFile = jarFile;
this.pulsarClient = pulsarClient;
this.clientBuilder = clientBuilder;
this.pulsarClient = client;
this.pulsarAdmin = pulsarAdmin;
this.stateStorageServiceUrl = stateStorageServiceUrl;
this.secretsProvider = secretsProvider;
Expand Down Expand Up @@ -167,6 +171,7 @@ public void start() throws Exception {
// re-initialize JavaInstanceRunnable so that variables in constructor can be re-initialized
this.javaInstanceRunnable = new JavaInstanceRunnable(
instanceConfig,
clientBuilder,
pulsarClient,
pulsarAdmin,
stateStorageServiceUrl,
Expand Down
Loading

0 comments on commit 12f6566

Please sign in to comment.