Skip to content

Commit

Permalink
add toggle to choose channel type
Browse files Browse the repository at this point in the history
  • Loading branch information
rmanibus committed Sep 15, 2024
1 parent d0aa81f commit 9b147cd
Show file tree
Hide file tree
Showing 8 changed files with 185 additions and 23 deletions.
18 changes: 18 additions & 0 deletions docs/modules/ROOT/pages/includes/quarkus-temporal.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,24 @@ endif::add-copy-button-to-env-var[]
|`false`


a|icon:lock[title=Fixed at build time] [[quarkus-temporal_quarkus-temporal-channel-type]]`link:#quarkus-temporal_quarkus-temporal-channel-type[quarkus.temporal.channel-type]`


[.description]
--
either use a channel managed by temporal client (built-in) or use a channel managed by quarkus (quarkus-managed). In this case the channel can be configured using quarkus.grpc.clients.temporal-client.

ifdef::add-copy-button-to-env-var[]
Environment variable: env_var_with_copy_button:+++QUARKUS_TEMPORAL_CHANNEL_TYPE+++[]
endif::add-copy-button-to-env-var[]
ifndef::add-copy-button-to-env-var[]
Environment variable: `+++QUARKUS_TEMPORAL_CHANNEL_TYPE+++`
endif::add-copy-button-to-env-var[]
-- a|
`quarkus-managed`, `built-in`
|`built-in`


a|icon:lock[title=Fixed at build time] [[quarkus-temporal_quarkus-temporal-health-enabled]]`link:#quarkus-temporal_quarkus-temporal-health-enabled[quarkus.temporal.health.enabled]`


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import static io.quarkiverse.temporal.Constants.DEFAULT_WORKER_NAME;
import static io.quarkiverse.temporal.Constants.TEMPORAL_TESTING_CAPABILITY;
import static io.quarkiverse.temporal.config.TemporalBuildtimeConfig.ChannelType.BUILT_IN;
import static io.quarkiverse.temporal.config.TemporalBuildtimeConfig.ChannelType.QUARKUS_MANAGED;
import static io.quarkus.deployment.Capability.OPENTELEMETRY_TRACER;

import java.util.ArrayList;
Expand All @@ -14,7 +16,6 @@
import java.util.Map;
import java.util.Set;
import java.util.function.BooleanSupplier;
import java.util.function.Function;
import java.util.stream.Stream;

import jakarta.enterprise.context.ApplicationScoped;
Expand Down Expand Up @@ -42,7 +43,6 @@
import io.quarkiverse.temporal.WorkflowServiceStubsRecorder;
import io.quarkiverse.temporal.WorkflowStubRecorder;
import io.quarkiverse.temporal.config.TemporalBuildtimeConfig;
import io.quarkus.arc.SyntheticCreationalContext;
import io.quarkus.arc.deployment.AdditionalBeanBuildItem;
import io.quarkus.arc.deployment.SyntheticBeanBuildItem;
import io.quarkus.arc.deployment.SyntheticBeansRuntimeInitBuildItem;
Expand Down Expand Up @@ -316,7 +316,7 @@ void produceActivityBeans(
});
}

@BuildStep(onlyIfNot = EnableMock.class)
@BuildStep(onlyIf = EnableQuarkusManagedChannel.class)
GrpcClientBuildItem produceGrpcClient() {
GrpcClientBuildItem grpcClientBuildItem = new GrpcClientBuildItem("temporal-client");
grpcClientBuildItem.addClient(
Expand All @@ -326,28 +326,53 @@ GrpcClientBuildItem produceGrpcClient() {

@BuildStep(onlyIfNot = EnableMock.class)
@Record(ExecutionTime.RUNTIME_INIT)
SyntheticBeanBuildItem recordWorkflowClient(
WorkflowServiceStubsRecorder recorder,
WorkflowClientRecorder clientRecorder) {

Function<SyntheticCreationalContext<WorkflowClient>, WorkflowServiceStubs> workflowServiceStubs = recorder
.createWorkflowServiceStubs();
SyntheticBeanBuildItem recordWorkflowServiceStub(TemporalBuildtimeConfig config,
WorkflowServiceStubsRecorder recorder) {

if (BUILT_IN.equals(config.channelType())) {
return SyntheticBeanBuildItem
.configure(WorkflowServiceStubs.class)
.scope(ApplicationScoped.class)
.unremovable()
.defaultBean()
.createWith(recorder.createWorkflowServiceStubs())
.setRuntimeInit()
.done();
}

// QUARKUS_MANAGED
return SyntheticBeanBuildItem
.configure(WorkflowClient.class)
.configure(WorkflowServiceStubs.class)
.scope(ApplicationScoped.class)
.unremovable()
.defaultBean()
.addInjectionPoint(ClassType.create(Channel.class),
AnnotationInstance.builder(GrpcClient.class).value("temporal-client").build())
.createWith(recorder.createQuarkusManagedWorkflowServiceStubs())
.setRuntimeInit()
.done();

}

@BuildStep(onlyIfNot = EnableMock.class)
@Record(ExecutionTime.RUNTIME_INIT)
SyntheticBeanBuildItem recordWorkflowClient(WorkflowClientRecorder clientRecorder) {

return SyntheticBeanBuildItem
.configure(WorkflowClient.class)
.scope(ApplicationScoped.class)
.unremovable()
.defaultBean()
.addInjectionPoint(ClassType.create(WorkflowServiceStubs.class))
.addInjectionPoint(
ParameterizedType.create(Instance.class, ClassType.create(WorkflowClientInterceptor.class)),
AnnotationInstance.builder(Any.class).build())
.addInjectionPoint(ParameterizedType.create(Instance.class, ClassType.create(ContextPropagator.class)),
AnnotationInstance.builder(Any.class).build())
.createWith(clientRecorder.createWorkflowClient(workflowServiceStubs))
.createWith(clientRecorder.createWorkflowClient())
.setRuntimeInit()
.done();

}

@BuildStep
Expand Down Expand Up @@ -491,6 +516,14 @@ Class<?> loadClass(ClassInfo classInfo) {
}
}

public static class EnableQuarkusManagedChannel implements BooleanSupplier {
TemporalBuildtimeConfig config;

public boolean getAsBoolean() {
return !config.enableMock() && config.channelType() == QUARKUS_MANAGED;
}
}

public static class EnableMock implements BooleanSupplier {
TemporalBuildtimeConfig config;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package io.quarkiverse.temporal.deployment;

import jakarta.inject.Inject;

import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.asset.StringAsset;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.test.QuarkusUnitTest;
import io.temporal.serviceclient.WorkflowServiceStubs;

public class QuarkusManagedChannelConfigPriorityTest {

@RegisterExtension
static final QuarkusUnitTest unitTest = new QuarkusUnitTest()
.setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class)
.addAsResource(
new StringAsset(
"quarkus.temporal.start-workers: false\n" +
"quarkus.temporal.channel-type: quarkus-managed\n" +
"quarkus.grpc.clients.temporal-client.host: grpcHost\n" +
"quarkus.temporal.connection.target: customTarget:1234\n"),
"application.properties"));

@Inject
WorkflowServiceStubs serviceStubs;

@Test
public void testQuarkusManagedChannel() {
Assertions.assertNull(serviceStubs.getOptions().getTarget());
Assertions.assertEquals("grpcHost:1234", serviceStubs.getOptions().getChannel().authority());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package io.quarkiverse.temporal.deployment;

import jakarta.inject.Inject;

import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.asset.StringAsset;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.test.QuarkusUnitTest;
import io.temporal.serviceclient.WorkflowServiceStubs;

public class QuarkusManagedChannelTest {

@RegisterExtension
static final QuarkusUnitTest unitTest = new QuarkusUnitTest()
.setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class)
.addAsResource(
new StringAsset(
"quarkus.temporal.start-workers: false\n" +
"quarkus.temporal.channel-type: quarkus-managed\n" +
"quarkus.temporal.connection.target: customTarget:1234\n"),
"application.properties"));

@Inject
WorkflowServiceStubs serviceStubs;

@Test
public void testQuarkusManagedChannel() {
Assertions.assertNull(serviceStubs.getOptions().getTarget());
Assertions.assertEquals("customTarget:1234", serviceStubs.getOptions().getChannel().authority());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,20 @@ public ConfigValue getValue(ConfigSourceInterceptorContext context, String name)
return port;
}

if (name.equals("quarkus.grpc.clients.temporal-client.test-port")) {

ConfigValue port = context.proceed("quarkus.grpc.clients.temporal-client.test-port");
ConfigValue target = context.proceed("quarkus.temporal.connection.target");
if (port == null && target != null) {
String[] split = target.getValue().split(":");
return target.from()
.withName("quarkus.grpc.clients.temporal-client.test-port")
.withValue(split[1])
.build();
}
return port;
}

return context.proceed(name);
}

Expand All @@ -53,6 +67,7 @@ public Iterator<String> iterateNames(ConfigSourceInterceptorContext context) {
}
names.add("quarkus.grpc.clients.temporal-client.host");
names.add("quarkus.grpc.clients.temporal-client.port");
names.add("quarkus.grpc.clients.temporal-client.test-port");
return names.iterator();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,11 @@ public WorkflowClientOptions createWorkflowClientOptions(
* Creates a new instance of {@link WorkflowClient} using the provided {@link WorkflowServiceStubs},
* context propagators, and telemetry settings.
*
* @param serviceStubs The {@link WorkflowServiceStubs} used to connect to the Temporal service.
* @return A configured {@link WorkflowClient} instance.
*/
public Function<SyntheticCreationalContext<WorkflowClient>, WorkflowClient> createWorkflowClient(
Function<SyntheticCreationalContext<WorkflowClient>, WorkflowServiceStubs> serviceStubs) {
return context -> WorkflowClient.newInstance(serviceStubs.apply(context), createWorkflowClientOptions(context));
public Function<SyntheticCreationalContext<WorkflowClient>, WorkflowClient> createWorkflowClient() {
return context -> WorkflowClient.newInstance(context.getInjectedReference(WorkflowServiceStubs.class),
createWorkflowClientOptions(context));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import io.quarkus.arc.SyntheticCreationalContext;
import io.quarkus.grpc.GrpcClient;
import io.quarkus.runtime.annotations.Recorder;
import io.temporal.client.WorkflowClient;
import io.temporal.serviceclient.RpcRetryOptions;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.serviceclient.WorkflowServiceStubsOptions;
Expand Down Expand Up @@ -42,24 +41,39 @@ public RpcRetryOptions createRpcRetryOptions(RpcRetryRuntimeConfig rpcRetry) {
return builder.build();
}

public WorkflowServiceStubsOptions createWorkflowServiceStubsOptions(SyntheticCreationalContext<WorkflowClient> context,
public WorkflowServiceStubsOptions createWorkflowServiceStubsOptions(
SyntheticCreationalContext<WorkflowServiceStubs> context,
ConnectionRuntimeConfig connection) {
if (connection == null) {
return WorkflowServiceStubsOptions.getDefaultInstance();
}
WorkflowServiceStubsOptions.Builder builder = WorkflowServiceStubsOptions.newBuilder()
.setChannel(
(ManagedChannel) context.getInjectedReference(Channel.class, GrpcClient.Literal.of("temporal-client")))
.setRpcRetryOptions(createRpcRetryOptions(connection.rpcRetry()))
//.setTarget(connection.target())
//.setEnableHttps(connection.enableHttps())
;
.setTarget(connection.target())
.setEnableHttps(connection.enableHttps());
return builder.build();
}

public Function<SyntheticCreationalContext<WorkflowClient>, WorkflowServiceStubs> createWorkflowServiceStubs() {
public Function<SyntheticCreationalContext<WorkflowServiceStubs>, WorkflowServiceStubs> createWorkflowServiceStubs() {
return context -> WorkflowServiceStubs
.newServiceStubs(createWorkflowServiceStubsOptions(context, runtimeConfig.connection()));
}

public WorkflowServiceStubsOptions createQuarkusManagedWorkflowServiceStubsOptions(
SyntheticCreationalContext<WorkflowServiceStubs> context,
ConnectionRuntimeConfig connection) {
if (connection == null) {
return WorkflowServiceStubsOptions.getDefaultInstance();
}
WorkflowServiceStubsOptions.Builder builder = WorkflowServiceStubsOptions.newBuilder()
.setChannel(
(ManagedChannel) context.getInjectedReference(Channel.class, GrpcClient.Literal.of("temporal-client")))
.setRpcRetryOptions(createRpcRetryOptions(connection.rpcRetry()));
return builder.build();
}

public Function<SyntheticCreationalContext<WorkflowServiceStubs>, WorkflowServiceStubs> createQuarkusManagedWorkflowServiceStubs() {
return context -> WorkflowServiceStubs
.newServiceStubs(createQuarkusManagedWorkflowServiceStubsOptions(context, runtimeConfig.connection()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,24 @@
@ConfigRoot(phase = ConfigPhase.BUILD_AND_RUN_TIME_FIXED)
public interface TemporalBuildtimeConfig {

enum ChannelType {
QUARKUS_MANAGED,
BUILT_IN
}

/**
* enable mock for testing
*/
@WithDefault("false")
Boolean enableMock();

/**
* either use a channel managed by temporal client (built-in) or use a channel managed by quarkus (quarkus-managed).
* In this case the channel can be configured using quarkus.grpc.clients.temporal-client.
*/
@WithDefault("BUILT_IN")
ChannelType channelType();

/**
* If Temporal registers in the health check by pinging the service.
*/
Expand Down

0 comments on commit 9b147cd

Please sign in to comment.