From a6e414c04aeb6ada3804a48c147ac2b813dbc0fa Mon Sep 17 00:00:00 2001 From: xmgdtc Date: Wed, 8 Jan 2025 10:28:42 +0800 Subject: [PATCH 1/2] 1 use UrlResolver to replace baseUrl in ExternalTaskClientBuilder.class so that we can use load balance address --- .../bpm/client/ExternalTaskClientBuilder.java | 34 + .../org/camunda/bpm/client/UrlResolver.java | 25 + .../camunda/bpm/client/impl/EngineClient.java | 81 ++- .../impl/ExternalTaskClientBuilderImpl.java | 643 +++++++++--------- .../bpm/client/impl/PermanentUrlResolver.java | 41 ++ .../ExternalTaskClientBuilderImplTest.java | 156 ++++- 6 files changed, 600 insertions(+), 380 deletions(-) create mode 100644 clients/java/client/src/main/java/org/camunda/bpm/client/UrlResolver.java create mode 100644 clients/java/client/src/main/java/org/camunda/bpm/client/impl/PermanentUrlResolver.java diff --git a/clients/java/client/src/main/java/org/camunda/bpm/client/ExternalTaskClientBuilder.java b/clients/java/client/src/main/java/org/camunda/bpm/client/ExternalTaskClientBuilder.java index bda58fb7b26..48f7e1e8fff 100644 --- a/clients/java/client/src/main/java/org/camunda/bpm/client/ExternalTaskClientBuilder.java +++ b/clients/java/client/src/main/java/org/camunda/bpm/client/ExternalTaskClientBuilder.java @@ -34,11 +34,45 @@ public interface ExternalTaskClientBuilder { /** * Base url of the Camunda BPM Platform REST API. This information is mandatory. * + * If this method is used, it will create a permanent url resolver with the given baseUrl. + * * @param baseUrl of the Camunda BPM Platform REST API * @return the builder */ ExternalTaskClientBuilder baseUrl(String baseUrl); + + /** + * Url resolver of the Camunda BPM Platform REST API. This information is mandatory. + * + * If the server is in a cluster or you are using spring cloud, you can create a class which implements UrlResolver.. + * + * this is a sample for spring cloud DiscoveryClient + * + * public class CustomUrlResolver implements UrlResolver{ + * + * protected String serviceId; + * + * protected DiscoveryClient discoveryClient; + * + * protected String getRandomServiceInstance() { + * List serviceInstances = discoveryClient.getInstances(serviceId); + * Random random = new Random(); + * return serviceInstances.get(random.nextInt(serviceInstances.size())).getUri().toString(); + * } + * + * @Override + * public String getBaseUrl() { + * return getRandomServiceInstance(); + * } + * } + * + * + * @param urlResolver of the Camunda BPM Platform REST API + * @return the builder + */ + ExternalTaskClientBuilder urlResolver(UrlResolver urlResolver); + /** * A custom worker id the Workflow Engine is aware of. This information is optional. * Note: make sure to choose a unique worker id diff --git a/clients/java/client/src/main/java/org/camunda/bpm/client/UrlResolver.java b/clients/java/client/src/main/java/org/camunda/bpm/client/UrlResolver.java new file mode 100644 index 00000000000..d89d92636d3 --- /dev/null +++ b/clients/java/client/src/main/java/org/camunda/bpm/client/UrlResolver.java @@ -0,0 +1,25 @@ +/* + * Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH + * under one or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. Camunda licenses this file to you under the Apache License, + * Version 2.0; you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.camunda.bpm.client; + +/** + * Get service url of the Camunda server + */ +public interface UrlResolver { + + String getBaseUrl(); +} diff --git a/clients/java/client/src/main/java/org/camunda/bpm/client/impl/EngineClient.java b/clients/java/client/src/main/java/org/camunda/bpm/client/impl/EngineClient.java index efbcd6422ae..ea126444b36 100644 --- a/clients/java/client/src/main/java/org/camunda/bpm/client/impl/EngineClient.java +++ b/clients/java/client/src/main/java/org/camunda/bpm/client/impl/EngineClient.java @@ -20,6 +20,7 @@ import java.util.List; import java.util.Map; +import org.camunda.bpm.client.UrlResolver; import org.camunda.bpm.client.task.OrderingConfig; import org.camunda.bpm.client.task.ExternalTask; import org.camunda.bpm.client.task.impl.ExternalTaskImpl; @@ -52,12 +53,11 @@ public class EngineClient { public static final String FAILURE_RESOURCE_PATH = ID_RESOURCE_PATH + "/failure"; public static final String BPMN_ERROR_RESOURCE_PATH = ID_RESOURCE_PATH + "/bpmnError"; public static final String NAME_PATH_PARAM = "{name}"; - public static final String EXECUTION_RESOURCE_PATH = "/execution"; - public static final String EXECUTION_ID_RESOURCE_PATH = EXECUTION_RESOURCE_PATH + "/" + ID_PATH_PARAM; - public static final String GET_LOCAL_VARIABLE = EXECUTION_ID_RESOURCE_PATH + "/localVariables/" + NAME_PATH_PARAM; - public static final String GET_LOCAL_BINARY_VARIABLE = GET_LOCAL_VARIABLE + "/data"; - - protected String baseUrl; + public static final String PROCESS_INSTANCE_RESOURCE_PATH = "/process-instance"; + public static final String PROCESS_INSTANCE_ID_RESOURCE_PATH = PROCESS_INSTANCE_RESOURCE_PATH + "/" + ID_PATH_PARAM; + public static final String GET_BINARY_VARIABLE = + PROCESS_INSTANCE_ID_RESOURCE_PATH + "/variables/" + NAME_PATH_PARAM + "/data"; + protected UrlResolver urlResolver; protected String workerId; protected int maxTasks; protected boolean usePriority; @@ -66,26 +66,43 @@ public class EngineClient { protected RequestExecutor engineInteraction; protected TypedValues typedValues; - public EngineClient(String workerId, int maxTasks, Long asyncResponseTimeout, String baseUrl, RequestExecutor engineInteraction) { - this(workerId, maxTasks, asyncResponseTimeout, baseUrl, engineInteraction, true, OrderingConfig.empty()); - } - - public EngineClient(String workerId, int maxTasks, Long asyncResponseTimeout, String baseUrl, RequestExecutor engineInteraction, - boolean usePriority, OrderingConfig orderingConfig) { - this.workerId = workerId; - this.asyncResponseTimeout = asyncResponseTimeout; - this.maxTasks = maxTasks; - this.usePriority = usePriority; - this.engineInteraction = engineInteraction; - this.baseUrl = baseUrl; - this.orderingConfig = orderingConfig; - } + public EngineClient(String workerId, int maxTasks, Long asyncResponseTimeout, String baseUrl, RequestExecutor engineInteraction) { + this(workerId, maxTasks, asyncResponseTimeout, baseUrl, engineInteraction, true, OrderingConfig.empty()); + } + + public EngineClient(String workerId, int maxTasks, Long asyncResponseTimeout, String baseUrl, RequestExecutor engineInteraction, + boolean usePriority, OrderingConfig orderingConfig) { + this.workerId = workerId; + this.asyncResponseTimeout = asyncResponseTimeout; + this.maxTasks = maxTasks; + this.usePriority = usePriority; + this.engineInteraction = engineInteraction; + this.urlResolver = new PermanentUrlResolver(baseUrl); + this.orderingConfig = orderingConfig; + } + + + public EngineClient(String workerId, int maxTasks, Long asyncResponseTimeout, UrlResolver urlResolver, RequestExecutor engineInteraction) { + this(workerId, maxTasks, asyncResponseTimeout, urlResolver, engineInteraction, true, OrderingConfig.empty()); + } + + + public EngineClient(String workerId, int maxTasks, Long asyncResponseTimeout, UrlResolver urlResolver, RequestExecutor engineInteraction, + boolean usePriority, OrderingConfig orderingConfig) { + this.workerId = workerId; + this.asyncResponseTimeout = asyncResponseTimeout; + this.maxTasks = maxTasks; + this.usePriority = usePriority; + this.engineInteraction = engineInteraction; + this.urlResolver = urlResolver; + this.orderingConfig = orderingConfig; + } public List fetchAndLock(List topics) { FetchAndLockRequestDto payload = new FetchAndLockRequestDto(workerId, maxTasks, asyncResponseTimeout, topics, usePriority, orderingConfig); - String resourceUrl = baseUrl + FETCH_AND_LOCK_RESOURCE_PATH; + String resourceUrl = getBaseUrl() + FETCH_AND_LOCK_RESOURCE_PATH; ExternalTask[] externalTasks = engineInteraction.postRequest(resourceUrl, payload, ExternalTaskImpl[].class); return Arrays.asList(externalTasks); } @@ -93,13 +110,13 @@ public List fetchAndLock(List topics) { public void lock(String taskId, long lockDuration) { LockRequestDto payload = new LockRequestDto(workerId, lockDuration); String resourcePath = LOCK_RESOURCE_PATH.replace("{id}", taskId); - String resourceUrl = baseUrl + resourcePath; + String resourceUrl = getBaseUrl() + resourcePath; engineInteraction.postRequest(resourceUrl, payload, Void.class); } public void unlock(String taskId) { String resourcePath = UNLOCK_RESOURCE_PATH.replace("{id}", taskId); - String resourceUrl = baseUrl + resourcePath; + String resourceUrl = getBaseUrl() + resourcePath; engineInteraction.postRequest(resourceUrl, null, Void.class); } @@ -109,7 +126,7 @@ public void complete(String taskId, Map variables, Map variables) { Map typedValueDtoMap = typedValues.serializeVariables(variables); SetVariablesRequestDto payload = new SetVariablesRequestDto(workerId, typedValueDtoMap); String resourcePath = SET_VARIABLES_RESOURCE_PATH.replace("{id}", processId); - String resourceUrl = baseUrl + resourcePath; + String resourceUrl = getBaseUrl() + resourcePath; engineInteraction.postRequest(resourceUrl, payload, Void.class); } @@ -128,7 +145,7 @@ public void failure(String taskId, String errorMessage, String errorDetails, int FailureRequestDto payload = new FailureRequestDto(workerId, errorMessage, errorDetails, retries, retryTimeout, typedValueDtoMap, localTypedValueDtoMap); String resourcePath = FAILURE_RESOURCE_PATH.replace("{id}", taskId); - String resourceUrl = baseUrl + resourcePath; + String resourceUrl = getBaseUrl() + resourcePath; engineInteraction.postRequest(resourceUrl, payload, Void.class); } @@ -136,27 +153,27 @@ public void bpmnError(String taskId, String errorCode, String errorMessage, Map< Map typeValueDtoMap = typedValues.serializeVariables(variables); BpmnErrorRequestDto payload = new BpmnErrorRequestDto(workerId, errorCode, errorMessage, typeValueDtoMap); String resourcePath = BPMN_ERROR_RESOURCE_PATH.replace("{id}", taskId); - String resourceUrl = baseUrl + resourcePath; + String resourceUrl = getBaseUrl() + resourcePath; engineInteraction.postRequest(resourceUrl, payload, Void.class); } public void extendLock(String taskId, long newDuration) { ExtendLockRequestDto payload = new ExtendLockRequestDto(workerId, newDuration); String resourcePath = EXTEND_LOCK_RESOURCE_PATH.replace("{id}", taskId); - String resourceUrl = baseUrl + resourcePath; + String resourceUrl = getBaseUrl() + resourcePath; engineInteraction.postRequest(resourceUrl, payload, Void.class); } - public byte[] getLocalBinaryVariable(String variableName, String processInstanceId) { - String resourcePath = baseUrl + GET_LOCAL_BINARY_VARIABLE - .replace(ID_PATH_PARAM, processInstanceId) + public byte[] getLocalBinaryVariable(String variableName, String executionId) { + String resourcePath = getBaseUrl() + GET_BINARY_VARIABLE + .replace(ID_PATH_PARAM, executionId) .replace(NAME_PATH_PARAM, variableName); return engineInteraction.getRequest(resourcePath); } public String getBaseUrl() { - return baseUrl; + return this.urlResolver.getBaseUrl(); } public String getWorkerId() { diff --git a/clients/java/client/src/main/java/org/camunda/bpm/client/impl/ExternalTaskClientBuilderImpl.java b/clients/java/client/src/main/java/org/camunda/bpm/client/impl/ExternalTaskClientBuilderImpl.java index d06958a4cbf..d8efa41c72f 100644 --- a/clients/java/client/src/main/java/org/camunda/bpm/client/impl/ExternalTaskClientBuilderImpl.java +++ b/clients/java/client/src/main/java/org/camunda/bpm/client/impl/ExternalTaskClientBuilderImpl.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; + import java.net.InetAddress; import java.net.UnknownHostException; import java.text.SimpleDateFormat; @@ -33,10 +34,12 @@ import java.util.ServiceLoader; import java.util.UUID; import java.util.function.Consumer; + import org.apache.hc.client5.http.impl.classic.HttpClientBuilder; import org.apache.hc.client5.http.impl.classic.HttpClients; import org.camunda.bpm.client.ExternalTaskClient; import org.camunda.bpm.client.ExternalTaskClientBuilder; +import org.camunda.bpm.client.UrlResolver; import org.camunda.bpm.client.backoff.BackoffStrategy; import org.camunda.bpm.client.backoff.ExponentialBackoffStrategy; import org.camunda.bpm.client.interceptor.ClientRequestInterceptor; @@ -69,379 +72,385 @@ */ public class ExternalTaskClientBuilderImpl implements ExternalTaskClientBuilder { - protected static final ExternalTaskClientLogger LOG = ExternalTaskClientLogger.CLIENT_LOGGER; - - protected String baseUrl; - protected String workerId; - protected int maxTasks; - protected boolean usePriority; - protected OrderingConfig orderingConfig = OrderingConfig.empty(); - protected Long asyncResponseTimeout; - protected long lockDuration; - - protected String defaultSerializationFormat = Variables.SerializationDataFormats.JSON.getName(); - - protected String dateFormat = "yyyy-MM-dd'T'HH:mm:ss.SSSZ"; - - protected ObjectMapper objectMapper; - protected ValueMappers valueMappers; - protected TypedValues typedValues; - protected EngineClient engineClient; - protected TopicSubscriptionManager topicSubscriptionManager; - protected HttpClientBuilder httpClientBuilder; - - protected List interceptors; - protected boolean isAutoFetchingEnabled; - protected BackoffStrategy backoffStrategy; - protected boolean isBackoffStrategyDisabled; - - public ExternalTaskClientBuilderImpl() { - // default values - this.maxTasks = 10; - this.usePriority = true; - this.asyncResponseTimeout = null; - this.lockDuration = 20_000; - this.interceptors = new ArrayList<>(); - this.isAutoFetchingEnabled = true; - this.backoffStrategy = new ExponentialBackoffStrategy(); - this.isBackoffStrategyDisabled = false; - this.httpClientBuilder = HttpClients.custom().useSystemProperties(); - } - - public ExternalTaskClientBuilder baseUrl(String baseUrl) { - this.baseUrl = baseUrl; - return this; - } - - public ExternalTaskClientBuilder workerId(String workerId) { - this.workerId = workerId; - return this; - } - - public ExternalTaskClientBuilder addInterceptor(ClientRequestInterceptor interceptor) { - this.interceptors.add(interceptor); - return this; - } - - public ExternalTaskClientBuilder maxTasks(int maxTasks) { - this.maxTasks = maxTasks; - return this; - } - - public ExternalTaskClientBuilder usePriority(boolean usePriority) { - this.usePriority = usePriority; - return this; - } - - public ExternalTaskClientBuilder useCreateTime(boolean useCreateTime) { - if (useCreateTime) { - orderingConfig.configureField(CREATE_TIME); - orderingConfig.configureDirectionOnLastField(DESC); - } - return this; - } - - public ExternalTaskClientBuilder orderByCreateTime() { - orderingConfig.configureField(CREATE_TIME); - return this; - } - - public ExternalTaskClientBuilder asc() { - orderingConfig.configureDirectionOnLastField(ASC); - return this; - } - - public ExternalTaskClientBuilder desc() { - orderingConfig.configureDirectionOnLastField(DESC); - return this; - } - - public ExternalTaskClientBuilder asyncResponseTimeout(long asyncResponseTimeout) { - this.asyncResponseTimeout = asyncResponseTimeout; - return this; - } - - public ExternalTaskClientBuilder lockDuration(long lockDuration) { - this.lockDuration = lockDuration; - return this; - } - - public ExternalTaskClientBuilder disableAutoFetching() { - this.isAutoFetchingEnabled = false; - return this; - } - - public ExternalTaskClientBuilder backoffStrategy(BackoffStrategy backoffStrategy) { - this.backoffStrategy = backoffStrategy; - return this; - } - - public ExternalTaskClientBuilder disableBackoffStrategy() { - this.isBackoffStrategyDisabled = true; - return this; - } - - public ExternalTaskClientBuilder defaultSerializationFormat(String defaultSerializationFormat) { - this.defaultSerializationFormat = defaultSerializationFormat; - return this; - } - - public ExternalTaskClientBuilder dateFormat(String dateFormat) { - this.dateFormat = dateFormat; - return this; - } - - public ExternalTaskClientBuilder customizeHttpClient(Consumer httpClientConsumer) { - httpClientConsumer.accept(httpClientBuilder); - return this; - } - - public ExternalTaskClient build() { - if (maxTasks <= 0) { - throw LOG.maxTasksNotGreaterThanZeroException(maxTasks); - } - - if (asyncResponseTimeout != null && asyncResponseTimeout <= 0) { - throw LOG.asyncResponseTimeoutNotGreaterThanZeroException(asyncResponseTimeout); - } - - if (lockDuration <= 0L) { - throw LOG.lockDurationIsNotGreaterThanZeroException(lockDuration); - } - - if (baseUrl == null || baseUrl.isEmpty()) { - throw LOG.baseUrlNullException(); - } - - checkInterceptors(); - - orderingConfig.validateOrderingProperties(); - - initBaseUrl(); - initWorkerId(); - initObjectMapper(); - initEngineClient(); - initVariableMappers(); - initTopicSubscriptionManager(); + protected static final ExternalTaskClientLogger LOG = ExternalTaskClientLogger.CLIENT_LOGGER; + + protected String workerId; + protected int maxTasks; + protected boolean usePriority; + protected OrderingConfig orderingConfig = OrderingConfig.empty(); + protected Long asyncResponseTimeout; + protected long lockDuration; + + protected String defaultSerializationFormat = Variables.SerializationDataFormats.JSON.getName(); + + protected String dateFormat = "yyyy-MM-dd'T'HH:mm:ss.SSSZ"; + + protected ObjectMapper objectMapper; + protected ValueMappers valueMappers; + protected TypedValues typedValues; + protected EngineClient engineClient; + protected TopicSubscriptionManager topicSubscriptionManager; + protected HttpClientBuilder httpClientBuilder; + + protected List interceptors; + protected boolean isAutoFetchingEnabled; + protected BackoffStrategy backoffStrategy; + protected boolean isBackoffStrategyDisabled; + protected UrlResolver urlResolver; + + public ExternalTaskClientBuilderImpl() { + // default values + this.maxTasks = 10; + this.usePriority = true; + this.asyncResponseTimeout = null; + this.lockDuration = 20_000; + this.interceptors = new ArrayList<>(); + this.isAutoFetchingEnabled = true; + this.backoffStrategy = new ExponentialBackoffStrategy(); + this.isBackoffStrategyDisabled = false; + this.httpClientBuilder = HttpClients.custom().useSystemProperties(); + } + + public ExternalTaskClientBuilder baseUrl(String baseUrl) { + this.urlResolver = new PermanentUrlResolver(baseUrl); + return this; + } + + public ExternalTaskClientBuilder urlResolver(UrlResolver urlResolver) { + this.urlResolver = urlResolver; + return this; + } + + public ExternalTaskClientBuilder workerId(String workerId) { + this.workerId = workerId; + return this; + } + + public ExternalTaskClientBuilder addInterceptor(ClientRequestInterceptor interceptor) { + this.interceptors.add(interceptor); + return this; + } + + public ExternalTaskClientBuilder maxTasks(int maxTasks) { + this.maxTasks = maxTasks; + return this; + } + + public ExternalTaskClientBuilder usePriority(boolean usePriority) { + this.usePriority = usePriority; + return this; + } + + public ExternalTaskClientBuilder useCreateTime(boolean useCreateTime) { + if (useCreateTime) { + orderingConfig.configureField(CREATE_TIME); + orderingConfig.configureDirectionOnLastField(DESC); + } + return this; + } + + public ExternalTaskClientBuilder orderByCreateTime() { + orderingConfig.configureField(CREATE_TIME); + return this; + } + + public ExternalTaskClientBuilder asc() { + orderingConfig.configureDirectionOnLastField(ASC); + return this; + } + + public ExternalTaskClientBuilder desc() { + orderingConfig.configureDirectionOnLastField(DESC); + return this; + } + + public ExternalTaskClientBuilder asyncResponseTimeout(long asyncResponseTimeout) { + this.asyncResponseTimeout = asyncResponseTimeout; + return this; + } + + public ExternalTaskClientBuilder lockDuration(long lockDuration) { + this.lockDuration = lockDuration; + return this; + } + + public ExternalTaskClientBuilder disableAutoFetching() { + this.isAutoFetchingEnabled = false; + return this; + } + + public ExternalTaskClientBuilder backoffStrategy(BackoffStrategy backoffStrategy) { + this.backoffStrategy = backoffStrategy; + return this; + } - return new ExternalTaskClientImpl(topicSubscriptionManager); - } + public ExternalTaskClientBuilder disableBackoffStrategy() { + this.isBackoffStrategyDisabled = true; + return this; + } - protected void initBaseUrl() { - baseUrl = sanitizeUrl(baseUrl); - } + public ExternalTaskClientBuilder defaultSerializationFormat(String defaultSerializationFormat) { + this.defaultSerializationFormat = defaultSerializationFormat; + return this; + } - protected String sanitizeUrl(String url) { - url = url.trim(); - if (url.endsWith("/")) { - url = url.replaceAll("/$", ""); - url = sanitizeUrl(url); + public ExternalTaskClientBuilder dateFormat(String dateFormat) { + this.dateFormat = dateFormat; + return this; } - return url; - } - protected void initWorkerId() { - if (workerId == null) { - String hostname = checkHostname(); - this.workerId = hostname + UUID.randomUUID(); + public ExternalTaskClientBuilder customizeHttpClient(Consumer httpClientConsumer) { + httpClientConsumer.accept(httpClientBuilder); + return this; } - } - protected void checkInterceptors() { - interceptors.forEach(interceptor -> { - if (interceptor == null) { - throw LOG.interceptorNullException(); - } - }); - } + public ExternalTaskClient build() { + if (maxTasks <= 0) { + throw LOG.maxTasksNotGreaterThanZeroException(maxTasks); + } - protected void initObjectMapper() { - objectMapper = new ObjectMapper(); - objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - objectMapper.configure(DeserializationFeature.FAIL_ON_UNRESOLVED_OBJECT_IDS, false); - objectMapper.configure(DeserializationFeature.FAIL_ON_INVALID_SUBTYPE, false); + if (asyncResponseTimeout != null && asyncResponseTimeout <= 0) { + throw LOG.asyncResponseTimeoutNotGreaterThanZeroException(asyncResponseTimeout); + } - SimpleDateFormat sdf = new SimpleDateFormat(dateFormat); - objectMapper.setDateFormat(sdf); - objectMapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false); - } + if (lockDuration <= 0L) { + throw LOG.lockDurationIsNotGreaterThanZeroException(lockDuration); + } - @SuppressWarnings({ "rawtypes", "unchecked" }) - protected void initVariableMappers() { - valueMappers = new DefaultValueMappers(defaultSerializationFormat); + if (urlResolver == null || getBaseUrl() == null || getBaseUrl().isEmpty()) { + throw LOG.baseUrlNullException(); + } - valueMappers.addMapper(new NullValueMapper()); - valueMappers.addMapper(new BooleanValueMapper()); - valueMappers.addMapper(new StringValueMapper()); - valueMappers.addMapper(new DateValueMapper(dateFormat)); - valueMappers.addMapper(new ByteArrayValueMapper()); + checkInterceptors(); - // number mappers - valueMappers.addMapper(new IntegerValueMapper()); - valueMappers.addMapper(new LongValueMapper()); - valueMappers.addMapper(new ShortValueMapper()); - valueMappers.addMapper(new DoubleValueMapper()); + orderingConfig.validateOrderingProperties(); - // object - Map dataFormats = lookupDataFormats(); - dataFormats.forEach((key, format) -> { - valueMappers.addMapper(new ObjectValueMapper(key, format)); - }); + initBaseUrl(); + initWorkerId(); + initObjectMapper(); + initEngineClient(); + initVariableMappers(); + initTopicSubscriptionManager(); - // json/xml - valueMappers.addMapper(new JsonValueMapper()); - valueMappers.addMapper(new XmlValueMapper()); + return new ExternalTaskClientImpl(topicSubscriptionManager); + } - // file - valueMappers.addMapper(new FileValueMapper(engineClient)); + protected void initBaseUrl() { + if (this.urlResolver instanceof PermanentUrlResolver) { + ((PermanentUrlResolver) this.urlResolver).setBaseUrl(sanitizeUrl(this.urlResolver.getBaseUrl())); + } + } - typedValues = new TypedValues(valueMappers); - engineClient.setTypedValues(typedValues); - } + protected String sanitizeUrl(String url) { + url = url.trim(); + if (url.endsWith("/")) { + url = url.replaceAll("/$", ""); + url = sanitizeUrl(url); + } + return url; + } - protected void initEngineClient() { - RequestInterceptorHandler requestInterceptorHandler = new RequestInterceptorHandler(interceptors); - httpClientBuilder.addRequestInterceptorLast(requestInterceptorHandler); - RequestExecutor requestExecutor = new RequestExecutor(httpClientBuilder.build(), objectMapper); + protected void initWorkerId() { + if (workerId == null) { + String hostname = checkHostname(); + this.workerId = hostname + UUID.randomUUID(); + } + } - engineClient = new EngineClient(workerId, maxTasks, asyncResponseTimeout, baseUrl, requestExecutor, - usePriority, orderingConfig); - } + protected void checkInterceptors() { + interceptors.forEach(interceptor -> { + if (interceptor == null) { + throw LOG.interceptorNullException(); + } + }); + } - protected void initTopicSubscriptionManager() { - topicSubscriptionManager = new TopicSubscriptionManager(engineClient, typedValues, lockDuration); - topicSubscriptionManager.setBackoffStrategy(getBackoffStrategy()); + protected void initObjectMapper() { + objectMapper = new ObjectMapper(); + objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + objectMapper.configure(DeserializationFeature.FAIL_ON_UNRESOLVED_OBJECT_IDS, false); + objectMapper.configure(DeserializationFeature.FAIL_ON_INVALID_SUBTYPE, false); - if (isBackoffStrategyDisabled) { - topicSubscriptionManager.disableBackoffStrategy(); + SimpleDateFormat sdf = new SimpleDateFormat(dateFormat); + objectMapper.setDateFormat(sdf); + objectMapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false); } - if (isAutoFetchingEnabled()) { - topicSubscriptionManager.start(); + @SuppressWarnings({"rawtypes", "unchecked"}) + protected void initVariableMappers() { + valueMappers = new DefaultValueMappers(defaultSerializationFormat); + + valueMappers.addMapper(new NullValueMapper()); + valueMappers.addMapper(new BooleanValueMapper()); + valueMappers.addMapper(new StringValueMapper()); + valueMappers.addMapper(new DateValueMapper(dateFormat)); + valueMappers.addMapper(new ByteArrayValueMapper()); + + // number mappers + valueMappers.addMapper(new IntegerValueMapper()); + valueMappers.addMapper(new LongValueMapper()); + valueMappers.addMapper(new ShortValueMapper()); + valueMappers.addMapper(new DoubleValueMapper()); + + // object + Map dataFormats = lookupDataFormats(); + dataFormats.forEach((key, format) -> { + valueMappers.addMapper(new ObjectValueMapper(key, format)); + }); + + // json/xml + valueMappers.addMapper(new JsonValueMapper()); + valueMappers.addMapper(new XmlValueMapper()); + + // file + valueMappers.addMapper(new FileValueMapper(engineClient)); + + typedValues = new TypedValues(valueMappers); + engineClient.setTypedValues(typedValues); } - } - protected Map lookupDataFormats() { - Map dataFormats = new HashMap<>(); + protected void initEngineClient() { + RequestInterceptorHandler requestInterceptorHandler = new RequestInterceptorHandler(interceptors); + httpClientBuilder.addRequestInterceptorLast(requestInterceptorHandler); + RequestExecutor requestExecutor = new RequestExecutor(httpClientBuilder.build(), objectMapper); - lookupCustomDataFormats(dataFormats); - applyConfigurators(dataFormats); + engineClient = new EngineClient(workerId, maxTasks, asyncResponseTimeout, urlResolver, requestExecutor, + usePriority, orderingConfig); + } - return dataFormats; - } + protected void initTopicSubscriptionManager() { + topicSubscriptionManager = new TopicSubscriptionManager(engineClient, typedValues, lockDuration); + topicSubscriptionManager.setBackoffStrategy(getBackoffStrategy()); - protected void lookupCustomDataFormats(Map dataFormats) { - // use java.util.ServiceLoader to load custom DataFormatProvider instances on the classpath - ServiceLoader providerLoader = ServiceLoader.load(DataFormatProvider.class); + if (isBackoffStrategyDisabled) { + topicSubscriptionManager.disableBackoffStrategy(); + } - for (DataFormatProvider provider : providerLoader) { - LOG.logDataFormatProvider(provider); - lookupProvider(dataFormats, provider); + if (isAutoFetchingEnabled()) { + topicSubscriptionManager.start(); + } } - } - protected void lookupProvider(Map dataFormats, DataFormatProvider provider) { + protected Map lookupDataFormats() { + Map dataFormats = new HashMap<>(); - String dataFormatName = provider.getDataFormatName(); + lookupCustomDataFormats(dataFormats); + applyConfigurators(dataFormats); - if(!dataFormats.containsKey(dataFormatName)) { - DataFormat dataFormatInstance = provider.createInstance(); - dataFormats.put(dataFormatName, dataFormatInstance); - LOG.logDataFormat(dataFormatInstance); + return dataFormats; } - else { - throw LOG.multipleProvidersForDataformat(dataFormatName); + + protected void lookupCustomDataFormats(Map dataFormats) { + // use java.util.ServiceLoader to load custom DataFormatProvider instances on the classpath + ServiceLoader providerLoader = ServiceLoader.load(DataFormatProvider.class); + + for (DataFormatProvider provider : providerLoader) { + LOG.logDataFormatProvider(provider); + lookupProvider(dataFormats, provider); + } } - } - @SuppressWarnings("rawtypes") - protected void applyConfigurators(Map dataFormats) { - ServiceLoader configuratorLoader = ServiceLoader.load(DataFormatConfigurator.class); + protected void lookupProvider(Map dataFormats, DataFormatProvider provider) { + + String dataFormatName = provider.getDataFormatName(); - for (DataFormatConfigurator configurator : configuratorLoader) { - LOG.logDataFormatConfigurator(configurator); - applyConfigurator(dataFormats, configurator); + if (!dataFormats.containsKey(dataFormatName)) { + DataFormat dataFormatInstance = provider.createInstance(); + dataFormats.put(dataFormatName, dataFormatInstance); + LOG.logDataFormat(dataFormatInstance); + } else { + throw LOG.multipleProvidersForDataformat(dataFormatName); + } } - } - @SuppressWarnings({ "rawtypes", "unchecked" }) - protected void applyConfigurator(Map dataFormats, DataFormatConfigurator configurator) { - for (DataFormat dataFormat : dataFormats.values()) { - if (configurator.getDataFormatClass().isAssignableFrom(dataFormat.getClass())) { - configurator.configure(dataFormat); - } + @SuppressWarnings("rawtypes") + protected void applyConfigurators(Map dataFormats) { + ServiceLoader configuratorLoader = ServiceLoader.load(DataFormatConfigurator.class); + + for (DataFormatConfigurator configurator : configuratorLoader) { + LOG.logDataFormatConfigurator(configurator); + applyConfigurator(dataFormats, configurator); + } } - } - public String checkHostname() { - String hostname; - try { - hostname = getHostname(); - } catch (UnknownHostException e) { - throw LOG.cannotGetHostnameException(e); + @SuppressWarnings({"rawtypes", "unchecked"}) + protected void applyConfigurator(Map dataFormats, DataFormatConfigurator configurator) { + for (DataFormat dataFormat : dataFormats.values()) { + if (configurator.getDataFormatClass().isAssignableFrom(dataFormat.getClass())) { + configurator.configure(dataFormat); + } + } } - return hostname; - } + public String checkHostname() { + String hostname; + try { + hostname = getHostname(); + } catch (UnknownHostException e) { + throw LOG.cannotGetHostnameException(e); + } + + return hostname; + } - public String getHostname() throws UnknownHostException { - return InetAddress.getLocalHost().getHostName(); - } + public String getHostname() throws UnknownHostException { + return InetAddress.getLocalHost().getHostName(); + } - public String getBaseUrl() { - return baseUrl; - } + public String getBaseUrl() { + return urlResolver.getBaseUrl(); + } - protected String getWorkerId() { - return workerId; - } + protected String getWorkerId() { + return workerId; + } - protected List getInterceptors() { - return interceptors; - } + protected List getInterceptors() { + return interceptors; + } - protected int getMaxTasks() { - return maxTasks; - } + protected int getMaxTasks() { + return maxTasks; + } - protected Long getAsyncResponseTimeout() { - return asyncResponseTimeout; - } + protected Long getAsyncResponseTimeout() { + return asyncResponseTimeout; + } - protected long getLockDuration() { - return lockDuration; - } + protected long getLockDuration() { + return lockDuration; + } - protected boolean isAutoFetchingEnabled() { - return isAutoFetchingEnabled; - } + protected boolean isAutoFetchingEnabled() { + return isAutoFetchingEnabled; + } - protected BackoffStrategy getBackoffStrategy() { - return backoffStrategy; - } + protected BackoffStrategy getBackoffStrategy() { + return backoffStrategy; + } - public String getDefaultSerializationFormat() { - return defaultSerializationFormat; - } + public String getDefaultSerializationFormat() { + return defaultSerializationFormat; + } - public String getDateFormat() { - return dateFormat; - } + public String getDateFormat() { + return dateFormat; + } - public ObjectMapper getObjectMapper() { - return objectMapper; - } + public ObjectMapper getObjectMapper() { + return objectMapper; + } - public ValueMappers getValueMappers() { - return valueMappers; - } + public ValueMappers getValueMappers() { + return valueMappers; + } - public TypedValues getTypedValues() { - return typedValues; - } + public TypedValues getTypedValues() { + return typedValues; + } - public EngineClient getEngineClient() { - return engineClient; - } + public EngineClient getEngineClient() { + return engineClient; + } } diff --git a/clients/java/client/src/main/java/org/camunda/bpm/client/impl/PermanentUrlResolver.java b/clients/java/client/src/main/java/org/camunda/bpm/client/impl/PermanentUrlResolver.java new file mode 100644 index 00000000000..1628893eb56 --- /dev/null +++ b/clients/java/client/src/main/java/org/camunda/bpm/client/impl/PermanentUrlResolver.java @@ -0,0 +1,41 @@ +/* + * Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH + * under one or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. Camunda licenses this file to you under the Apache License, + * Version 2.0; you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.camunda.bpm.client.impl; + +import org.camunda.bpm.client.UrlResolver; + +/** + * UrlResolver with permanent address + */ +public class PermanentUrlResolver implements UrlResolver { + + + protected String baseUrl; + + public PermanentUrlResolver(String baseUrl) { + this.setBaseUrl(baseUrl); + } + + public void setBaseUrl(String baseUrl) { + assert baseUrl != null && !baseUrl.isEmpty() : "camunda address can not be null or empty"; + this.baseUrl = baseUrl; + } + + public String getBaseUrl() { + return this.baseUrl; + } +} diff --git a/clients/java/client/src/test/java/org/camunda/bpm/client/impl/ExternalTaskClientBuilderImplTest.java b/clients/java/client/src/test/java/org/camunda/bpm/client/impl/ExternalTaskClientBuilderImplTest.java index 5ab3ab4ab44..d2773b1764d 100644 --- a/clients/java/client/src/test/java/org/camunda/bpm/client/impl/ExternalTaskClientBuilderImplTest.java +++ b/clients/java/client/src/test/java/org/camunda/bpm/client/impl/ExternalTaskClientBuilderImplTest.java @@ -20,47 +20,141 @@ import org.apache.hc.client5.http.impl.classic.HttpClientBuilder; import org.apache.hc.core5.util.Timeout; import org.camunda.bpm.client.ExternalTaskClient; +import org.camunda.bpm.client.UrlResolver; import org.camunda.bpm.engine.impl.util.ReflectUtil; import org.junit.Test; import org.mockito.ArgumentCaptor; +import java.util.HashSet; +import java.util.List; + import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; public class ExternalTaskClientBuilderImplTest { - @Test - public void testCustomizeHttpClientExposesInternalHttpClientBuilder() { - // given - var clientBuilder = new ExternalTaskClientBuilderImpl(); - var requestConfigArgumentCaptor = ArgumentCaptor.forClass(RequestConfig.class); - var httpClientBuilderSpy = spy(HttpClientBuilder.class); - var httpClientBuilderField = ReflectUtil.getField("httpClientBuilder", clientBuilder); - ReflectUtil.setField(httpClientBuilderField, clientBuilder, httpClientBuilderSpy); - - ExternalTaskClient client = null; - try { - // when - client = clientBuilder.baseUrl("localhost") - .customizeHttpClient(httpClientBuilder -> httpClientBuilder.setDefaultRequestConfig(RequestConfig.custom() - .setResponseTimeout(Timeout.ofSeconds(5)) - .setConnectionRequestTimeout(Timeout.ofSeconds(6)) - .build())) - .build(); - - // then - verify(httpClientBuilderSpy).build(); - verify(httpClientBuilderSpy).setDefaultRequestConfig(requestConfigArgumentCaptor.capture()); - - var requestConfig = requestConfigArgumentCaptor.getValue(); - assertThat(requestConfig.getResponseTimeout().toSeconds()).isEqualTo(5); - assertThat(requestConfig.getConnectionRequestTimeout().toSeconds()).isEqualTo(6); - } finally { - if (client != null) { - client.stop(); - } + @Test + public void testCustomizeHttpClientExposesInternalHttpClientBuilder() { + // given + var clientBuilder = new ExternalTaskClientBuilderImpl(); + var requestConfigArgumentCaptor = ArgumentCaptor.forClass(RequestConfig.class); + var httpClientBuilderSpy = spy(HttpClientBuilder.class); + var httpClientBuilderField = ReflectUtil.getField("httpClientBuilder", clientBuilder); + ReflectUtil.setField(httpClientBuilderField, clientBuilder, httpClientBuilderSpy); + + ExternalTaskClient client = null; + try { + // when + client = clientBuilder.baseUrl("localhost") + .customizeHttpClient(httpClientBuilder -> httpClientBuilder.setDefaultRequestConfig(RequestConfig.custom() + .setResponseTimeout(Timeout.ofSeconds(5)) + .setConnectionRequestTimeout(Timeout.ofSeconds(6)) + .build())) + .build(); + + // then + verify(httpClientBuilderSpy).build(); + verify(httpClientBuilderSpy).setDefaultRequestConfig(requestConfigArgumentCaptor.capture()); + + var requestConfig = requestConfigArgumentCaptor.getValue(); + assertThat(requestConfig.getResponseTimeout().toSeconds()).isEqualTo(5); + assertThat(requestConfig.getConnectionRequestTimeout().toSeconds()).isEqualTo(6); + } finally { + if (client != null) { + client.stop(); + } + } + } + + + @Test + public void testLoadBalanceHttpClient() { + + var address = List.of("server1", "server2", "server3"); + var tryAddress = new HashSet<>(); + + + RoundAddressResolver roundAddressResolver = new RoundAddressResolver(address); + ExternalTaskClientBuilderImpl2 clientBuilder = new ExternalTaskClientBuilderImpl2().urlResolver(roundAddressResolver); + clientBuilder.build(); + + for (int i = 0; i < address.size(); i++) { + tryAddress.add(clientBuilder.getEngineClient().getBaseUrl()); + } + assertThat(new HashSet<>(address).equals(tryAddress)).isEqualTo(true); + } + + + //Just used for get EngineClient + class ExternalTaskClientBuilderImpl2 extends ExternalTaskClientBuilderImpl { + + @Override + public EngineClient getEngineClient() { + return super.getEngineClient(); + } + + @Override + public ExternalTaskClientBuilderImpl2 urlResolver(UrlResolver urlResolver) { + this.urlResolver = urlResolver; + return this; + } + + @Override + public ExternalTaskClient build() { + if (maxTasks <= 0) { + throw LOG.maxTasksNotGreaterThanZeroException(maxTasks); + } + + if (asyncResponseTimeout != null && asyncResponseTimeout <= 0) { + throw LOG.asyncResponseTimeoutNotGreaterThanZeroException(asyncResponseTimeout); + } + + if (lockDuration <= 0L) { + throw LOG.lockDurationIsNotGreaterThanZeroException(lockDuration); + } + + if (urlResolver == null || getBaseUrl() == null || getBaseUrl().isEmpty()) { + throw LOG.baseUrlNullException(); + } + + checkInterceptors(); + + orderingConfig.validateOrderingProperties(); + + initBaseUrl(); + initWorkerId(); + initObjectMapper(); + initEngineClient(); + initVariableMappers(); + initTopicSubscriptionManager(); + + return new ExternalTaskClientImpl(topicSubscriptionManager); + } + } + + + /** + * Round robin load balancing used for test testLoadBalanceHttpClient + */ + class RoundAddressResolver implements UrlResolver { + + public RoundAddressResolver(List urls) { + this.urls = urls; + i = 0; + max = urls.size(); + } + + List urls; + int i; + int max; + + public String getBaseUrl() { + + String address = urls.get(i); + if (i++ >= max) i = 0; + return address; + } } - } } From be5cf43a83cd4efe278d29b55ce1d080acb7ea9c Mon Sep 17 00:00:00 2001 From: PHWaechtler Date: Fri, 20 Dec 2024 13:24:43 +0100 Subject: [PATCH 2/2] chore(urlResolver): fix merge conflicts --- .../camunda/bpm/client/client/ClientIT.java | 25 + .../bpm/client/ExternalTaskClientBuilder.java | 66 +- .../org/camunda/bpm/client/UrlResolver.java | 2 +- .../camunda/bpm/client/impl/EngineClient.java | 116 ++-- .../impl/ExternalTaskClientBuilderImpl.java | 648 +++++++++--------- .../bpm/client/impl/PermanentUrlResolver.java | 22 +- .../ExternalTaskClientBuilderImplTest.java | 174 ++--- 7 files changed, 512 insertions(+), 541 deletions(-) diff --git a/clients/java/client/src/it/java/org/camunda/bpm/client/client/ClientIT.java b/clients/java/client/src/it/java/org/camunda/bpm/client/client/ClientIT.java index b151cdd244a..f2c6c319fa4 100644 --- a/clients/java/client/src/it/java/org/camunda/bpm/client/client/ClientIT.java +++ b/clients/java/client/src/it/java/org/camunda/bpm/client/client/ClientIT.java @@ -38,6 +38,7 @@ import java.util.concurrent.atomic.AtomicReference; import org.camunda.bpm.client.ExternalTaskClient; import org.camunda.bpm.client.ExternalTaskClientBuilder; +import org.camunda.bpm.client.UrlResolver; import org.camunda.bpm.client.backoff.BackoffStrategy; import org.camunda.bpm.client.backoff.ErrorAwareBackoffStrategy; import org.camunda.bpm.client.dto.ProcessDefinitionDto; @@ -233,6 +234,30 @@ public void shouldThrowExceptionDueToBaseUrlIsNull() { } } + @Test + public void shouldThrowExceptionDueToBaseUrlAndBaseUrlResolverIsNull() { + ExternalTaskClient client = null; + + try { + // given + ExternalTaskClientBuilder externalTaskClientBuilder = ExternalTaskClient.create(); + + // then + thrown.expect(ExternalTaskClientException.class); + + // when + client = externalTaskClientBuilder + .baseUrl(null) + .urlResolver(null) + .build(); + } + finally { + if (client != null) { + client.stop(); + } + } + } + @Test public void shouldThrowExceptionDueToMaxTasksNotGreaterThanZero() { ExternalTaskClient client = null; diff --git a/clients/java/client/src/main/java/org/camunda/bpm/client/ExternalTaskClientBuilder.java b/clients/java/client/src/main/java/org/camunda/bpm/client/ExternalTaskClientBuilder.java index 48f7e1e8fff..c3ebd133d67 100644 --- a/clients/java/client/src/main/java/org/camunda/bpm/client/ExternalTaskClientBuilder.java +++ b/clients/java/client/src/main/java/org/camunda/bpm/client/ExternalTaskClientBuilder.java @@ -16,14 +16,13 @@ */ package org.camunda.bpm.client; +import java.util.function.Consumer; import org.apache.hc.client5.http.impl.classic.HttpClientBuilder; import org.camunda.bpm.client.backoff.BackoffStrategy; import org.camunda.bpm.client.backoff.ExponentialBackoffStrategy; import org.camunda.bpm.client.exception.ExternalTaskClientException; import org.camunda.bpm.client.interceptor.ClientRequestInterceptor; -import java.util.function.Consumer; - /** *

A fluent builder to configure the Camunda client

* @@ -33,7 +32,7 @@ public interface ExternalTaskClientBuilder { /** * Base url of the Camunda BPM Platform REST API. This information is mandatory. - * + *

* If this method is used, it will create a permanent url resolver with the given baseUrl. * * @param baseUrl of the Camunda BPM Platform REST API @@ -41,35 +40,31 @@ public interface ExternalTaskClientBuilder { */ ExternalTaskClientBuilder baseUrl(String baseUrl); - /** * Url resolver of the Camunda BPM Platform REST API. This information is mandatory. - * + *

* If the server is in a cluster or you are using spring cloud, you can create a class which implements UrlResolver.. - * + *

* this is a sample for spring cloud DiscoveryClient - * + *

* public class CustomUrlResolver implements UrlResolver{ - * - * protected String serviceId; - * - * protected DiscoveryClient discoveryClient; - * - * protected String getRandomServiceInstance() { - * List serviceInstances = discoveryClient.getInstances(serviceId); - * Random random = new Random(); - * return serviceInstances.get(random.nextInt(serviceInstances.size())).getUri().toString(); - * } - * - * @Override - * public String getBaseUrl() { - * return getRandomServiceInstance(); - * } + *

+ * protected String serviceId; + *

+ * protected DiscoveryClient discoveryClient; + *

+ * protected String getRandomServiceInstance() { + * List serviceInstances = discoveryClient.getInstances(serviceId); + * Random random = new Random(); + * return serviceInstances.get(random.nextInt(serviceInstances.size())).getUri().toString(); * } * - * * @param urlResolver of the Camunda BPM Platform REST API * @return the builder + * @Override public String getBaseUrl() { + * return getRandomServiceInstance(); + * } + * } */ ExternalTaskClientBuilder urlResolver(UrlResolver urlResolver); @@ -178,10 +173,10 @@ public interface ExternalTaskClientBuilder { /** * @param lockDuration

    - *
  • in milliseconds to lock the external tasks - *
  • must be greater than zero - *
  • the default lock duration is 20 seconds (20,000 milliseconds) - *
  • is overridden by the lock duration configured on a topic subscription + *
  • in milliseconds to lock the external tasks + *
  • must be greater than zero + *
  • the default lock duration is 20 seconds (20,000 milliseconds) + *
  • is overridden by the lock duration configured on a topic subscription *
* @return the builder */ @@ -208,7 +203,7 @@ public interface ExternalTaskClientBuilder { * Disables the client-side backoff strategy. On invocation, the configuration option {@link #backoffStrategy} is ignored. *

* NOTE: Please bear in mind that disabling the client-side backoff can lead to heavy load situations on engine side. - * To avoid this, please specify an appropriate {@link #asyncResponseTimeout(long)}. + * To avoid this, please specify an appropriate {@link #asyncResponseTimeout(long)}. * * @return the builder */ @@ -227,15 +222,14 @@ public interface ExternalTaskClientBuilder { /** * Bootstraps the Camunda client * - * @throws ExternalTaskClientException - *

    - *
  • if base url is null or string is empty - *
  • if hostname cannot be retrieved - *
  • if maximum amount of tasks is not greater than zero - *
  • if maximum asynchronous response timeout is not greater than zero - *
  • if lock duration is not greater than zero - *
* @return the builder + * @throws ExternalTaskClientException
    + *
  • if base url is null or string is empty + *
  • if hostname cannot be retrieved + *
  • if maximum amount of tasks is not greater than zero + *
  • if maximum asynchronous response timeout is not greater than zero + *
  • if lock duration is not greater than zero + *
*/ ExternalTaskClient build(); diff --git a/clients/java/client/src/main/java/org/camunda/bpm/client/UrlResolver.java b/clients/java/client/src/main/java/org/camunda/bpm/client/UrlResolver.java index d89d92636d3..a1e0147c78d 100644 --- a/clients/java/client/src/main/java/org/camunda/bpm/client/UrlResolver.java +++ b/clients/java/client/src/main/java/org/camunda/bpm/client/UrlResolver.java @@ -21,5 +21,5 @@ */ public interface UrlResolver { - String getBaseUrl(); + String getBaseUrl(); } diff --git a/clients/java/client/src/main/java/org/camunda/bpm/client/impl/EngineClient.java b/clients/java/client/src/main/java/org/camunda/bpm/client/impl/EngineClient.java index ea126444b36..04eed275e64 100644 --- a/clients/java/client/src/main/java/org/camunda/bpm/client/impl/EngineClient.java +++ b/clients/java/client/src/main/java/org/camunda/bpm/client/impl/EngineClient.java @@ -19,10 +19,9 @@ import java.util.Arrays; import java.util.List; import java.util.Map; - import org.camunda.bpm.client.UrlResolver; -import org.camunda.bpm.client.task.OrderingConfig; import org.camunda.bpm.client.task.ExternalTask; +import org.camunda.bpm.client.task.OrderingConfig; import org.camunda.bpm.client.task.impl.ExternalTaskImpl; import org.camunda.bpm.client.task.impl.dto.BpmnErrorRequestDto; import org.camunda.bpm.client.task.impl.dto.CompleteRequestDto; @@ -47,7 +46,8 @@ public class EngineClient { protected static final String ID_RESOURCE_PATH = EXTERNAL_TASK_RESOURCE_PATH + "/" + ID_PATH_PARAM; public static final String LOCK_RESOURCE_PATH = ID_RESOURCE_PATH + "/lock"; public static final String EXTEND_LOCK_RESOURCE_PATH = ID_RESOURCE_PATH + "/extendLock"; - public static final String SET_VARIABLES_RESOURCE_PATH = EXTERNAL_TASK__PROCESS_RESOURCE_PATH + "/" + ID_PATH_PARAM + "/variables"; + public static final String SET_VARIABLES_RESOURCE_PATH = + EXTERNAL_TASK__PROCESS_RESOURCE_PATH + "/" + ID_PATH_PARAM + "/variables"; public static final String UNLOCK_RESOURCE_PATH = ID_RESOURCE_PATH + "/unlock"; public static final String COMPLETE_RESOURCE_PATH = ID_RESOURCE_PATH + "/complete"; public static final String FAILURE_RESOURCE_PATH = ID_RESOURCE_PATH + "/failure"; @@ -66,61 +66,77 @@ public class EngineClient { protected RequestExecutor engineInteraction; protected TypedValues typedValues; - public EngineClient(String workerId, int maxTasks, Long asyncResponseTimeout, String baseUrl, RequestExecutor engineInteraction) { - this(workerId, maxTasks, asyncResponseTimeout, baseUrl, engineInteraction, true, OrderingConfig.empty()); - } - - public EngineClient(String workerId, int maxTasks, Long asyncResponseTimeout, String baseUrl, RequestExecutor engineInteraction, - boolean usePriority, OrderingConfig orderingConfig) { - this.workerId = workerId; - this.asyncResponseTimeout = asyncResponseTimeout; - this.maxTasks = maxTasks; - this.usePriority = usePriority; - this.engineInteraction = engineInteraction; - this.urlResolver = new PermanentUrlResolver(baseUrl); - this.orderingConfig = orderingConfig; - } - - - public EngineClient(String workerId, int maxTasks, Long asyncResponseTimeout, UrlResolver urlResolver, RequestExecutor engineInteraction) { - this(workerId, maxTasks, asyncResponseTimeout, urlResolver, engineInteraction, true, OrderingConfig.empty()); - } - - - public EngineClient(String workerId, int maxTasks, Long asyncResponseTimeout, UrlResolver urlResolver, RequestExecutor engineInteraction, - boolean usePriority, OrderingConfig orderingConfig) { - this.workerId = workerId; - this.asyncResponseTimeout = asyncResponseTimeout; - this.maxTasks = maxTasks; - this.usePriority = usePriority; - this.engineInteraction = engineInteraction; - this.urlResolver = urlResolver; - this.orderingConfig = orderingConfig; - } - - public List fetchAndLock(List topics) { - FetchAndLockRequestDto payload = new FetchAndLockRequestDto(workerId, maxTasks, asyncResponseTimeout, - topics, usePriority, orderingConfig); + public EngineClient(String workerId, + int maxTasks, + Long asyncResponseTimeout, + String baseUrl, + RequestExecutor engineInteraction) { + this(workerId, maxTasks, asyncResponseTimeout, baseUrl, engineInteraction, true, OrderingConfig.empty()); + } + + public EngineClient(String workerId, + int maxTasks, + Long asyncResponseTimeout, + String baseUrl, + RequestExecutor engineInteraction, + boolean usePriority, + OrderingConfig orderingConfig) { + this.workerId = workerId; + this.asyncResponseTimeout = asyncResponseTimeout; + this.maxTasks = maxTasks; + this.usePriority = usePriority; + this.engineInteraction = engineInteraction; + this.urlResolver = new PermanentUrlResolver(baseUrl); + this.orderingConfig = orderingConfig; + } + + public EngineClient(String workerId, + int maxTasks, + Long asyncResponseTimeout, + UrlResolver urlResolver, + RequestExecutor engineInteraction) { + this(workerId, maxTasks, asyncResponseTimeout, urlResolver, engineInteraction, true, OrderingConfig.empty()); + } + + public EngineClient(String workerId, + int maxTasks, + Long asyncResponseTimeout, + UrlResolver urlResolver, + RequestExecutor engineInteraction, + boolean usePriority, + OrderingConfig orderingConfig) { + this.workerId = workerId; + this.asyncResponseTimeout = asyncResponseTimeout; + this.maxTasks = maxTasks; + this.usePriority = usePriority; + this.engineInteraction = engineInteraction; + this.urlResolver = urlResolver; + this.orderingConfig = orderingConfig; + } + + public List fetchAndLock(List topics) { + FetchAndLockRequestDto payload = new FetchAndLockRequestDto(workerId, maxTasks, asyncResponseTimeout, topics, + usePriority, orderingConfig); String resourceUrl = getBaseUrl() + FETCH_AND_LOCK_RESOURCE_PATH; ExternalTask[] externalTasks = engineInteraction.postRequest(resourceUrl, payload, ExternalTaskImpl[].class); return Arrays.asList(externalTasks); } - public void lock(String taskId, long lockDuration) { + public void lock(String taskId, long lockDuration) { LockRequestDto payload = new LockRequestDto(workerId, lockDuration); String resourcePath = LOCK_RESOURCE_PATH.replace("{id}", taskId); String resourceUrl = getBaseUrl() + resourcePath; engineInteraction.postRequest(resourceUrl, payload, Void.class); } - public void unlock(String taskId) { + public void unlock(String taskId) { String resourcePath = UNLOCK_RESOURCE_PATH.replace("{id}", taskId); String resourceUrl = getBaseUrl() + resourcePath; engineInteraction.postRequest(resourceUrl, null, Void.class); } - public void complete(String taskId, Map variables, Map localVariables) { + public void complete(String taskId, Map variables, Map localVariables) { Map typedValueDtoMap = typedValues.serializeVariables(variables); Map localTypedValueDtoMap = typedValues.serializeVariables(localVariables); @@ -130,7 +146,7 @@ public void complete(String taskId, Map variables, Map variables) { + public void setVariables(String processId, Map variables) { Map typedValueDtoMap = typedValues.serializeVariables(variables); SetVariablesRequestDto payload = new SetVariablesRequestDto(workerId, typedValueDtoMap); String resourcePath = SET_VARIABLES_RESOURCE_PATH.replace("{id}", processId); @@ -138,18 +154,24 @@ public void setVariables(String processId, Map variables) { engineInteraction.postRequest(resourceUrl, payload, Void.class); } - - public void failure(String taskId, String errorMessage, String errorDetails, int retries, long retryTimeout, Map variables, Map localVariables) { + public void failure(String taskId, + String errorMessage, + String errorDetails, + int retries, + long retryTimeout, + Map variables, + Map localVariables) { Map typedValueDtoMap = typedValues.serializeVariables(variables); Map localTypedValueDtoMap = typedValues.serializeVariables(localVariables); - FailureRequestDto payload = new FailureRequestDto(workerId, errorMessage, errorDetails, retries, retryTimeout, typedValueDtoMap, localTypedValueDtoMap); + FailureRequestDto payload = new FailureRequestDto(workerId, errorMessage, errorDetails, retries, retryTimeout, + typedValueDtoMap, localTypedValueDtoMap); String resourcePath = FAILURE_RESOURCE_PATH.replace("{id}", taskId); String resourceUrl = getBaseUrl() + resourcePath; engineInteraction.postRequest(resourceUrl, payload, Void.class); } - public void bpmnError(String taskId, String errorCode, String errorMessage, Map variables) { + public void bpmnError(String taskId, String errorCode, String errorMessage, Map variables) { Map typeValueDtoMap = typedValues.serializeVariables(variables); BpmnErrorRequestDto payload = new BpmnErrorRequestDto(workerId, errorCode, errorMessage, typeValueDtoMap); String resourcePath = BPMN_ERROR_RESOURCE_PATH.replace("{id}", taskId); @@ -173,7 +195,7 @@ public byte[] getLocalBinaryVariable(String variableName, String executionId) { } public String getBaseUrl() { - return this.urlResolver.getBaseUrl(); + return urlResolver.getBaseUrl(); } public String getWorkerId() { diff --git a/clients/java/client/src/main/java/org/camunda/bpm/client/impl/ExternalTaskClientBuilderImpl.java b/clients/java/client/src/main/java/org/camunda/bpm/client/impl/ExternalTaskClientBuilderImpl.java index d8efa41c72f..6e8a80ea54d 100644 --- a/clients/java/client/src/main/java/org/camunda/bpm/client/impl/ExternalTaskClientBuilderImpl.java +++ b/clients/java/client/src/main/java/org/camunda/bpm/client/impl/ExternalTaskClientBuilderImpl.java @@ -23,7 +23,6 @@ import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; - import java.net.InetAddress; import java.net.UnknownHostException; import java.text.SimpleDateFormat; @@ -34,7 +33,6 @@ import java.util.ServiceLoader; import java.util.UUID; import java.util.function.Consumer; - import org.apache.hc.client5.http.impl.classic.HttpClientBuilder; import org.apache.hc.client5.http.impl.classic.HttpClients; import org.camunda.bpm.client.ExternalTaskClient; @@ -72,385 +70,385 @@ */ public class ExternalTaskClientBuilderImpl implements ExternalTaskClientBuilder { - protected static final ExternalTaskClientLogger LOG = ExternalTaskClientLogger.CLIENT_LOGGER; - - protected String workerId; - protected int maxTasks; - protected boolean usePriority; - protected OrderingConfig orderingConfig = OrderingConfig.empty(); - protected Long asyncResponseTimeout; - protected long lockDuration; - - protected String defaultSerializationFormat = Variables.SerializationDataFormats.JSON.getName(); - - protected String dateFormat = "yyyy-MM-dd'T'HH:mm:ss.SSSZ"; - - protected ObjectMapper objectMapper; - protected ValueMappers valueMappers; - protected TypedValues typedValues; - protected EngineClient engineClient; - protected TopicSubscriptionManager topicSubscriptionManager; - protected HttpClientBuilder httpClientBuilder; - - protected List interceptors; - protected boolean isAutoFetchingEnabled; - protected BackoffStrategy backoffStrategy; - protected boolean isBackoffStrategyDisabled; - protected UrlResolver urlResolver; - - public ExternalTaskClientBuilderImpl() { - // default values - this.maxTasks = 10; - this.usePriority = true; - this.asyncResponseTimeout = null; - this.lockDuration = 20_000; - this.interceptors = new ArrayList<>(); - this.isAutoFetchingEnabled = true; - this.backoffStrategy = new ExponentialBackoffStrategy(); - this.isBackoffStrategyDisabled = false; - this.httpClientBuilder = HttpClients.custom().useSystemProperties(); - } - - public ExternalTaskClientBuilder baseUrl(String baseUrl) { - this.urlResolver = new PermanentUrlResolver(baseUrl); - return this; - } - - public ExternalTaskClientBuilder urlResolver(UrlResolver urlResolver) { - this.urlResolver = urlResolver; - return this; - } - - public ExternalTaskClientBuilder workerId(String workerId) { - this.workerId = workerId; - return this; - } - - public ExternalTaskClientBuilder addInterceptor(ClientRequestInterceptor interceptor) { - this.interceptors.add(interceptor); - return this; - } - - public ExternalTaskClientBuilder maxTasks(int maxTasks) { - this.maxTasks = maxTasks; - return this; - } - - public ExternalTaskClientBuilder usePriority(boolean usePriority) { - this.usePriority = usePriority; - return this; - } - - public ExternalTaskClientBuilder useCreateTime(boolean useCreateTime) { - if (useCreateTime) { - orderingConfig.configureField(CREATE_TIME); - orderingConfig.configureDirectionOnLastField(DESC); - } - return this; - } - - public ExternalTaskClientBuilder orderByCreateTime() { - orderingConfig.configureField(CREATE_TIME); - return this; - } - - public ExternalTaskClientBuilder asc() { - orderingConfig.configureDirectionOnLastField(ASC); - return this; - } - - public ExternalTaskClientBuilder desc() { - orderingConfig.configureDirectionOnLastField(DESC); - return this; - } - - public ExternalTaskClientBuilder asyncResponseTimeout(long asyncResponseTimeout) { - this.asyncResponseTimeout = asyncResponseTimeout; - return this; - } - - public ExternalTaskClientBuilder lockDuration(long lockDuration) { - this.lockDuration = lockDuration; - return this; - } - - public ExternalTaskClientBuilder disableAutoFetching() { - this.isAutoFetchingEnabled = false; - return this; - } - - public ExternalTaskClientBuilder backoffStrategy(BackoffStrategy backoffStrategy) { - this.backoffStrategy = backoffStrategy; - return this; - } + protected static final ExternalTaskClientLogger LOG = ExternalTaskClientLogger.CLIENT_LOGGER; + + protected String workerId; + protected int maxTasks; + protected boolean usePriority; + protected OrderingConfig orderingConfig = OrderingConfig.empty(); + protected Long asyncResponseTimeout; + protected long lockDuration; + + protected String defaultSerializationFormat = Variables.SerializationDataFormats.JSON.getName(); + + protected String dateFormat = "yyyy-MM-dd'T'HH:mm:ss.SSSZ"; + + protected ObjectMapper objectMapper; + protected ValueMappers valueMappers; + protected TypedValues typedValues; + protected EngineClient engineClient; + protected TopicSubscriptionManager topicSubscriptionManager; + protected HttpClientBuilder httpClientBuilder; + + protected List interceptors; + protected boolean isAutoFetchingEnabled; + protected BackoffStrategy backoffStrategy; + protected boolean isBackoffStrategyDisabled; + protected UrlResolver urlResolver; + + public ExternalTaskClientBuilderImpl() { + // default values + this.maxTasks = 10; + this.usePriority = true; + this.asyncResponseTimeout = null; + this.lockDuration = 20_000; + this.interceptors = new ArrayList<>(); + this.isAutoFetchingEnabled = true; + this.backoffStrategy = new ExponentialBackoffStrategy(); + this.isBackoffStrategyDisabled = false; + this.httpClientBuilder = HttpClients.custom().useSystemProperties(); + } + + public ExternalTaskClientBuilder baseUrl(String baseUrl) { + this.urlResolver = new PermanentUrlResolver(baseUrl); + return this; + } + + public ExternalTaskClientBuilder urlResolver(UrlResolver urlResolver) { + this.urlResolver = urlResolver; + return this; + } + + public ExternalTaskClientBuilder workerId(String workerId) { + this.workerId = workerId; + return this; + } + + public ExternalTaskClientBuilder addInterceptor(ClientRequestInterceptor interceptor) { + this.interceptors.add(interceptor); + return this; + } + + public ExternalTaskClientBuilder maxTasks(int maxTasks) { + this.maxTasks = maxTasks; + return this; + } + + public ExternalTaskClientBuilder usePriority(boolean usePriority) { + this.usePriority = usePriority; + return this; + } + + public ExternalTaskClientBuilder useCreateTime(boolean useCreateTime) { + if (useCreateTime) { + orderingConfig.configureField(CREATE_TIME); + orderingConfig.configureDirectionOnLastField(DESC); + } + return this; + } + + public ExternalTaskClientBuilder orderByCreateTime() { + orderingConfig.configureField(CREATE_TIME); + return this; + } + + public ExternalTaskClientBuilder asc() { + orderingConfig.configureDirectionOnLastField(ASC); + return this; + } + + public ExternalTaskClientBuilder desc() { + orderingConfig.configureDirectionOnLastField(DESC); + return this; + } + + public ExternalTaskClientBuilder asyncResponseTimeout(long asyncResponseTimeout) { + this.asyncResponseTimeout = asyncResponseTimeout; + return this; + } + + public ExternalTaskClientBuilder lockDuration(long lockDuration) { + this.lockDuration = lockDuration; + return this; + } + + public ExternalTaskClientBuilder disableAutoFetching() { + this.isAutoFetchingEnabled = false; + return this; + } + + public ExternalTaskClientBuilder backoffStrategy(BackoffStrategy backoffStrategy) { + this.backoffStrategy = backoffStrategy; + return this; + } + + public ExternalTaskClientBuilder disableBackoffStrategy() { + this.isBackoffStrategyDisabled = true; + return this; + } + + public ExternalTaskClientBuilder defaultSerializationFormat(String defaultSerializationFormat) { + this.defaultSerializationFormat = defaultSerializationFormat; + return this; + } + + public ExternalTaskClientBuilder dateFormat(String dateFormat) { + this.dateFormat = dateFormat; + return this; + } + + public ExternalTaskClientBuilder customizeHttpClient(Consumer httpClientConsumer) { + httpClientConsumer.accept(httpClientBuilder); + return this; + } + + public ExternalTaskClient build() { + if (maxTasks <= 0) { + throw LOG.maxTasksNotGreaterThanZeroException(maxTasks); + } + + if (asyncResponseTimeout != null && asyncResponseTimeout <= 0) { + throw LOG.asyncResponseTimeoutNotGreaterThanZeroException(asyncResponseTimeout); + } + + if (lockDuration <= 0L) { + throw LOG.lockDurationIsNotGreaterThanZeroException(lockDuration); + } + + if (urlResolver == null || getBaseUrl() == null || getBaseUrl().isEmpty()) { + throw LOG.baseUrlNullException(); + } + + checkInterceptors(); + + orderingConfig.validateOrderingProperties(); + + initBaseUrl(); + initWorkerId(); + initObjectMapper(); + initEngineClient(); + initVariableMappers(); + initTopicSubscriptionManager(); - public ExternalTaskClientBuilder disableBackoffStrategy() { - this.isBackoffStrategyDisabled = true; - return this; - } + return new ExternalTaskClientImpl(topicSubscriptionManager); + } - public ExternalTaskClientBuilder defaultSerializationFormat(String defaultSerializationFormat) { - this.defaultSerializationFormat = defaultSerializationFormat; - return this; + protected void initBaseUrl() { + if (this.urlResolver instanceof PermanentUrlResolver) { + ((PermanentUrlResolver) this.urlResolver).setBaseUrl(sanitizeUrl(this.urlResolver.getBaseUrl())); } + } - public ExternalTaskClientBuilder dateFormat(String dateFormat) { - this.dateFormat = dateFormat; - return this; + protected String sanitizeUrl(String url) { + url = url.trim(); + if (url.endsWith("/")) { + url = url.replaceAll("/$", ""); + url = sanitizeUrl(url); } + return url; + } - public ExternalTaskClientBuilder customizeHttpClient(Consumer httpClientConsumer) { - httpClientConsumer.accept(httpClientBuilder); - return this; + protected void initWorkerId() { + if (workerId == null) { + String hostname = checkHostname(); + this.workerId = hostname + UUID.randomUUID(); } + } - public ExternalTaskClient build() { - if (maxTasks <= 0) { - throw LOG.maxTasksNotGreaterThanZeroException(maxTasks); - } + protected void checkInterceptors() { + interceptors.forEach(interceptor -> { + if (interceptor == null) { + throw LOG.interceptorNullException(); + } + }); + } - if (asyncResponseTimeout != null && asyncResponseTimeout <= 0) { - throw LOG.asyncResponseTimeoutNotGreaterThanZeroException(asyncResponseTimeout); - } + protected void initObjectMapper() { + objectMapper = new ObjectMapper(); + objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + objectMapper.configure(DeserializationFeature.FAIL_ON_UNRESOLVED_OBJECT_IDS, false); + objectMapper.configure(DeserializationFeature.FAIL_ON_INVALID_SUBTYPE, false); - if (lockDuration <= 0L) { - throw LOG.lockDurationIsNotGreaterThanZeroException(lockDuration); - } + SimpleDateFormat sdf = new SimpleDateFormat(dateFormat); + objectMapper.setDateFormat(sdf); + objectMapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false); + } - if (urlResolver == null || getBaseUrl() == null || getBaseUrl().isEmpty()) { - throw LOG.baseUrlNullException(); - } + @SuppressWarnings({ "rawtypes", "unchecked" }) + protected void initVariableMappers() { + valueMappers = new DefaultValueMappers(defaultSerializationFormat); - checkInterceptors(); + valueMappers.addMapper(new NullValueMapper()); + valueMappers.addMapper(new BooleanValueMapper()); + valueMappers.addMapper(new StringValueMapper()); + valueMappers.addMapper(new DateValueMapper(dateFormat)); + valueMappers.addMapper(new ByteArrayValueMapper()); - orderingConfig.validateOrderingProperties(); + // number mappers + valueMappers.addMapper(new IntegerValueMapper()); + valueMappers.addMapper(new LongValueMapper()); + valueMappers.addMapper(new ShortValueMapper()); + valueMappers.addMapper(new DoubleValueMapper()); - initBaseUrl(); - initWorkerId(); - initObjectMapper(); - initEngineClient(); - initVariableMappers(); - initTopicSubscriptionManager(); + // object + Map dataFormats = lookupDataFormats(); + dataFormats.forEach((key, format) -> { + valueMappers.addMapper(new ObjectValueMapper(key, format)); + }); - return new ExternalTaskClientImpl(topicSubscriptionManager); - } + // json/xml + valueMappers.addMapper(new JsonValueMapper()); + valueMappers.addMapper(new XmlValueMapper()); - protected void initBaseUrl() { - if (this.urlResolver instanceof PermanentUrlResolver) { - ((PermanentUrlResolver) this.urlResolver).setBaseUrl(sanitizeUrl(this.urlResolver.getBaseUrl())); - } - } + // file + valueMappers.addMapper(new FileValueMapper(engineClient)); - protected String sanitizeUrl(String url) { - url = url.trim(); - if (url.endsWith("/")) { - url = url.replaceAll("/$", ""); - url = sanitizeUrl(url); - } - return url; - } + typedValues = new TypedValues(valueMappers); + engineClient.setTypedValues(typedValues); + } - protected void initWorkerId() { - if (workerId == null) { - String hostname = checkHostname(); - this.workerId = hostname + UUID.randomUUID(); - } - } + protected void initEngineClient() { + RequestInterceptorHandler requestInterceptorHandler = new RequestInterceptorHandler(interceptors); + httpClientBuilder.addRequestInterceptorLast(requestInterceptorHandler); + RequestExecutor requestExecutor = new RequestExecutor(httpClientBuilder.build(), objectMapper); - protected void checkInterceptors() { - interceptors.forEach(interceptor -> { - if (interceptor == null) { - throw LOG.interceptorNullException(); - } - }); - } + engineClient = new EngineClient(workerId, maxTasks, asyncResponseTimeout, urlResolver, requestExecutor, usePriority, + orderingConfig); + } - protected void initObjectMapper() { - objectMapper = new ObjectMapper(); - objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - objectMapper.configure(DeserializationFeature.FAIL_ON_UNRESOLVED_OBJECT_IDS, false); - objectMapper.configure(DeserializationFeature.FAIL_ON_INVALID_SUBTYPE, false); + protected void initTopicSubscriptionManager() { + topicSubscriptionManager = new TopicSubscriptionManager(engineClient, typedValues, lockDuration); + topicSubscriptionManager.setBackoffStrategy(getBackoffStrategy()); - SimpleDateFormat sdf = new SimpleDateFormat(dateFormat); - objectMapper.setDateFormat(sdf); - objectMapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false); + if (isBackoffStrategyDisabled) { + topicSubscriptionManager.disableBackoffStrategy(); } - @SuppressWarnings({"rawtypes", "unchecked"}) - protected void initVariableMappers() { - valueMappers = new DefaultValueMappers(defaultSerializationFormat); - - valueMappers.addMapper(new NullValueMapper()); - valueMappers.addMapper(new BooleanValueMapper()); - valueMappers.addMapper(new StringValueMapper()); - valueMappers.addMapper(new DateValueMapper(dateFormat)); - valueMappers.addMapper(new ByteArrayValueMapper()); - - // number mappers - valueMappers.addMapper(new IntegerValueMapper()); - valueMappers.addMapper(new LongValueMapper()); - valueMappers.addMapper(new ShortValueMapper()); - valueMappers.addMapper(new DoubleValueMapper()); - - // object - Map dataFormats = lookupDataFormats(); - dataFormats.forEach((key, format) -> { - valueMappers.addMapper(new ObjectValueMapper(key, format)); - }); - - // json/xml - valueMappers.addMapper(new JsonValueMapper()); - valueMappers.addMapper(new XmlValueMapper()); - - // file - valueMappers.addMapper(new FileValueMapper(engineClient)); - - typedValues = new TypedValues(valueMappers); - engineClient.setTypedValues(typedValues); + if (isAutoFetchingEnabled()) { + topicSubscriptionManager.start(); } + } - protected void initEngineClient() { - RequestInterceptorHandler requestInterceptorHandler = new RequestInterceptorHandler(interceptors); - httpClientBuilder.addRequestInterceptorLast(requestInterceptorHandler); - RequestExecutor requestExecutor = new RequestExecutor(httpClientBuilder.build(), objectMapper); + protected Map lookupDataFormats() { + Map dataFormats = new HashMap<>(); - engineClient = new EngineClient(workerId, maxTasks, asyncResponseTimeout, urlResolver, requestExecutor, - usePriority, orderingConfig); - } + lookupCustomDataFormats(dataFormats); + applyConfigurators(dataFormats); - protected void initTopicSubscriptionManager() { - topicSubscriptionManager = new TopicSubscriptionManager(engineClient, typedValues, lockDuration); - topicSubscriptionManager.setBackoffStrategy(getBackoffStrategy()); + return dataFormats; + } - if (isBackoffStrategyDisabled) { - topicSubscriptionManager.disableBackoffStrategy(); - } + protected void lookupCustomDataFormats(Map dataFormats) { + // use java.util.ServiceLoader to load custom DataFormatProvider instances on the classpath + ServiceLoader providerLoader = ServiceLoader.load(DataFormatProvider.class); - if (isAutoFetchingEnabled()) { - topicSubscriptionManager.start(); - } + for (DataFormatProvider provider : providerLoader) { + LOG.logDataFormatProvider(provider); + lookupProvider(dataFormats, provider); } + } - protected Map lookupDataFormats() { - Map dataFormats = new HashMap<>(); + protected void lookupProvider(Map dataFormats, DataFormatProvider provider) { - lookupCustomDataFormats(dataFormats); - applyConfigurators(dataFormats); + String dataFormatName = provider.getDataFormatName(); - return dataFormats; + if (!dataFormats.containsKey(dataFormatName)) { + DataFormat dataFormatInstance = provider.createInstance(); + dataFormats.put(dataFormatName, dataFormatInstance); + LOG.logDataFormat(dataFormatInstance); + } else { + throw LOG.multipleProvidersForDataformat(dataFormatName); } + } - protected void lookupCustomDataFormats(Map dataFormats) { - // use java.util.ServiceLoader to load custom DataFormatProvider instances on the classpath - ServiceLoader providerLoader = ServiceLoader.load(DataFormatProvider.class); + @SuppressWarnings("rawtypes") + protected void applyConfigurators(Map dataFormats) { + ServiceLoader configuratorLoader = ServiceLoader.load(DataFormatConfigurator.class); - for (DataFormatProvider provider : providerLoader) { - LOG.logDataFormatProvider(provider); - lookupProvider(dataFormats, provider); - } + for (DataFormatConfigurator configurator : configuratorLoader) { + LOG.logDataFormatConfigurator(configurator); + applyConfigurator(dataFormats, configurator); } + } - protected void lookupProvider(Map dataFormats, DataFormatProvider provider) { - - String dataFormatName = provider.getDataFormatName(); - - if (!dataFormats.containsKey(dataFormatName)) { - DataFormat dataFormatInstance = provider.createInstance(); - dataFormats.put(dataFormatName, dataFormatInstance); - LOG.logDataFormat(dataFormatInstance); - } else { - throw LOG.multipleProvidersForDataformat(dataFormatName); - } + @SuppressWarnings({ "rawtypes", "unchecked" }) + protected void applyConfigurator(Map dataFormats, DataFormatConfigurator configurator) { + for (DataFormat dataFormat : dataFormats.values()) { + if (configurator.getDataFormatClass().isAssignableFrom(dataFormat.getClass())) { + configurator.configure(dataFormat); + } } + } - @SuppressWarnings("rawtypes") - protected void applyConfigurators(Map dataFormats) { - ServiceLoader configuratorLoader = ServiceLoader.load(DataFormatConfigurator.class); - - for (DataFormatConfigurator configurator : configuratorLoader) { - LOG.logDataFormatConfigurator(configurator); - applyConfigurator(dataFormats, configurator); - } + public String checkHostname() { + String hostname; + try { + hostname = getHostname(); + } catch (UnknownHostException e) { + throw LOG.cannotGetHostnameException(e); } - @SuppressWarnings({"rawtypes", "unchecked"}) - protected void applyConfigurator(Map dataFormats, DataFormatConfigurator configurator) { - for (DataFormat dataFormat : dataFormats.values()) { - if (configurator.getDataFormatClass().isAssignableFrom(dataFormat.getClass())) { - configurator.configure(dataFormat); - } - } - } - - public String checkHostname() { - String hostname; - try { - hostname = getHostname(); - } catch (UnknownHostException e) { - throw LOG.cannotGetHostnameException(e); - } - - return hostname; - } + return hostname; + } - public String getHostname() throws UnknownHostException { - return InetAddress.getLocalHost().getHostName(); - } + public String getHostname() throws UnknownHostException { + return InetAddress.getLocalHost().getHostName(); + } - public String getBaseUrl() { - return urlResolver.getBaseUrl(); - } + public String getBaseUrl() { + return urlResolver.getBaseUrl(); + } - protected String getWorkerId() { - return workerId; - } + protected String getWorkerId() { + return workerId; + } - protected List getInterceptors() { - return interceptors; - } + protected List getInterceptors() { + return interceptors; + } - protected int getMaxTasks() { - return maxTasks; - } + protected int getMaxTasks() { + return maxTasks; + } - protected Long getAsyncResponseTimeout() { - return asyncResponseTimeout; - } + protected Long getAsyncResponseTimeout() { + return asyncResponseTimeout; + } - protected long getLockDuration() { - return lockDuration; - } + protected long getLockDuration() { + return lockDuration; + } - protected boolean isAutoFetchingEnabled() { - return isAutoFetchingEnabled; - } + protected boolean isAutoFetchingEnabled() { + return isAutoFetchingEnabled; + } - protected BackoffStrategy getBackoffStrategy() { - return backoffStrategy; - } + protected BackoffStrategy getBackoffStrategy() { + return backoffStrategy; + } - public String getDefaultSerializationFormat() { - return defaultSerializationFormat; - } + public String getDefaultSerializationFormat() { + return defaultSerializationFormat; + } - public String getDateFormat() { - return dateFormat; - } + public String getDateFormat() { + return dateFormat; + } - public ObjectMapper getObjectMapper() { - return objectMapper; - } + public ObjectMapper getObjectMapper() { + return objectMapper; + } - public ValueMappers getValueMappers() { - return valueMappers; - } + public ValueMappers getValueMappers() { + return valueMappers; + } - public TypedValues getTypedValues() { - return typedValues; - } + public TypedValues getTypedValues() { + return typedValues; + } - public EngineClient getEngineClient() { - return engineClient; - } + public EngineClient getEngineClient() { + return engineClient; + } } diff --git a/clients/java/client/src/main/java/org/camunda/bpm/client/impl/PermanentUrlResolver.java b/clients/java/client/src/main/java/org/camunda/bpm/client/impl/PermanentUrlResolver.java index 1628893eb56..adb3eac1639 100644 --- a/clients/java/client/src/main/java/org/camunda/bpm/client/impl/PermanentUrlResolver.java +++ b/clients/java/client/src/main/java/org/camunda/bpm/client/impl/PermanentUrlResolver.java @@ -23,19 +23,17 @@ */ public class PermanentUrlResolver implements UrlResolver { + protected String baseUrl; - protected String baseUrl; + public PermanentUrlResolver(String baseUrl) { + this.setBaseUrl(baseUrl); + } - public PermanentUrlResolver(String baseUrl) { - this.setBaseUrl(baseUrl); - } + public void setBaseUrl(String baseUrl) { + this.baseUrl = baseUrl; + } - public void setBaseUrl(String baseUrl) { - assert baseUrl != null && !baseUrl.isEmpty() : "camunda address can not be null or empty"; - this.baseUrl = baseUrl; - } - - public String getBaseUrl() { - return this.baseUrl; - } + public String getBaseUrl() { + return this.baseUrl; + } } diff --git a/clients/java/client/src/test/java/org/camunda/bpm/client/impl/ExternalTaskClientBuilderImplTest.java b/clients/java/client/src/test/java/org/camunda/bpm/client/impl/ExternalTaskClientBuilderImplTest.java index d2773b1764d..ea7fc9a494d 100644 --- a/clients/java/client/src/test/java/org/camunda/bpm/client/impl/ExternalTaskClientBuilderImplTest.java +++ b/clients/java/client/src/test/java/org/camunda/bpm/client/impl/ExternalTaskClientBuilderImplTest.java @@ -16,6 +16,10 @@ */ package org.camunda.bpm.client.impl; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; + import org.apache.hc.client5.http.config.RequestConfig; import org.apache.hc.client5.http.impl.classic.HttpClientBuilder; import org.apache.hc.core5.util.Timeout; @@ -25,136 +29,66 @@ import org.junit.Test; import org.mockito.ArgumentCaptor; -import java.util.HashSet; -import java.util.List; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.verify; - public class ExternalTaskClientBuilderImplTest { - @Test - public void testCustomizeHttpClientExposesInternalHttpClientBuilder() { - // given - var clientBuilder = new ExternalTaskClientBuilderImpl(); - var requestConfigArgumentCaptor = ArgumentCaptor.forClass(RequestConfig.class); - var httpClientBuilderSpy = spy(HttpClientBuilder.class); - var httpClientBuilderField = ReflectUtil.getField("httpClientBuilder", clientBuilder); - ReflectUtil.setField(httpClientBuilderField, clientBuilder, httpClientBuilderSpy); - - ExternalTaskClient client = null; - try { - // when - client = clientBuilder.baseUrl("localhost") - .customizeHttpClient(httpClientBuilder -> httpClientBuilder.setDefaultRequestConfig(RequestConfig.custom() - .setResponseTimeout(Timeout.ofSeconds(5)) - .setConnectionRequestTimeout(Timeout.ofSeconds(6)) - .build())) - .build(); - - // then - verify(httpClientBuilderSpy).build(); - verify(httpClientBuilderSpy).setDefaultRequestConfig(requestConfigArgumentCaptor.capture()); - - var requestConfig = requestConfigArgumentCaptor.getValue(); - assertThat(requestConfig.getResponseTimeout().toSeconds()).isEqualTo(5); - assertThat(requestConfig.getConnectionRequestTimeout().toSeconds()).isEqualTo(6); - } finally { - if (client != null) { - client.stop(); - } - } + @Test + public void testCustomizeHttpClientExposesInternalHttpClientBuilder() { + // given + var clientBuilder = new ExternalTaskClientBuilderImpl(); + var requestConfigArgumentCaptor = ArgumentCaptor.forClass(RequestConfig.class); + var httpClientBuilderSpy = spy(HttpClientBuilder.class); + var httpClientBuilderField = ReflectUtil.getField("httpClientBuilder", clientBuilder); + ReflectUtil.setField(httpClientBuilderField, clientBuilder, httpClientBuilderSpy); + + ExternalTaskClient client = null; + try { + // when + client = clientBuilder.baseUrl("localhost") + .customizeHttpClient(httpClientBuilder -> httpClientBuilder.setDefaultRequestConfig(RequestConfig.custom() + .setResponseTimeout(Timeout.ofSeconds(5)) + .setConnectionRequestTimeout(Timeout.ofSeconds(6)) + .build())) + .build(); + + // then + verify(httpClientBuilderSpy).build(); + verify(httpClientBuilderSpy).setDefaultRequestConfig(requestConfigArgumentCaptor.capture()); + + var requestConfig = requestConfigArgumentCaptor.getValue(); + assertThat(requestConfig.getResponseTimeout().toSeconds()).isEqualTo(5); + assertThat(requestConfig.getConnectionRequestTimeout().toSeconds()).isEqualTo(6); + } finally { + if (client != null) { + client.stop(); + } } + } + @Test + public void testCustomBaseUrlResolver() { + // given + var expectedBaseUrl = "expectedBaseUrl"; + TestUrlResolver testUrlResolver = new TestUrlResolver(expectedBaseUrl); - @Test - public void testLoadBalanceHttpClient() { + // when + var clientBuilder = new ExternalTaskClientBuilderImpl(); + clientBuilder.urlResolver(testUrlResolver); + clientBuilder.build(); - var address = List.of("server1", "server2", "server3"); - var tryAddress = new HashSet<>(); + // then + assertThat(spy(clientBuilder).engineClient.getBaseUrl()).isEqualTo(expectedBaseUrl); + } + static class TestUrlResolver implements UrlResolver { + final String baseUrl; - RoundAddressResolver roundAddressResolver = new RoundAddressResolver(address); - ExternalTaskClientBuilderImpl2 clientBuilder = new ExternalTaskClientBuilderImpl2().urlResolver(roundAddressResolver); - clientBuilder.build(); - - for (int i = 0; i < address.size(); i++) { - tryAddress.add(clientBuilder.getEngineClient().getBaseUrl()); - } - assertThat(new HashSet<>(address).equals(tryAddress)).isEqualTo(true); + public TestUrlResolver(final String baseURl) { + this.baseUrl = baseURl; } - - //Just used for get EngineClient - class ExternalTaskClientBuilderImpl2 extends ExternalTaskClientBuilderImpl { - - @Override - public EngineClient getEngineClient() { - return super.getEngineClient(); - } - - @Override - public ExternalTaskClientBuilderImpl2 urlResolver(UrlResolver urlResolver) { - this.urlResolver = urlResolver; - return this; - } - - @Override - public ExternalTaskClient build() { - if (maxTasks <= 0) { - throw LOG.maxTasksNotGreaterThanZeroException(maxTasks); - } - - if (asyncResponseTimeout != null && asyncResponseTimeout <= 0) { - throw LOG.asyncResponseTimeoutNotGreaterThanZeroException(asyncResponseTimeout); - } - - if (lockDuration <= 0L) { - throw LOG.lockDurationIsNotGreaterThanZeroException(lockDuration); - } - - if (urlResolver == null || getBaseUrl() == null || getBaseUrl().isEmpty()) { - throw LOG.baseUrlNullException(); - } - - checkInterceptors(); - - orderingConfig.validateOrderingProperties(); - - initBaseUrl(); - initWorkerId(); - initObjectMapper(); - initEngineClient(); - initVariableMappers(); - initTopicSubscriptionManager(); - - return new ExternalTaskClientImpl(topicSubscriptionManager); - } - } - - - /** - * Round robin load balancing used for test testLoadBalanceHttpClient - */ - class RoundAddressResolver implements UrlResolver { - - public RoundAddressResolver(List urls) { - this.urls = urls; - i = 0; - max = urls.size(); - } - - List urls; - int i; - int max; - - public String getBaseUrl() { - - String address = urls.get(i); - if (i++ >= max) i = 0; - return address; - } + public String getBaseUrl() { + return baseUrl; } + } }