Skip to content

Commit

Permalink
feat(quarkus): use a separate camunda thread pool (camunda#1571)
Browse files Browse the repository at this point in the history
* Use a ThreadPoolExecutor instead of the Quarkus core thread pool
  Executor for the Camunda JobExecutor.
* Refactor the JobExecutor configuration properties to generic ones that
  are always up-to-date with the process engine changes.

Related to CAM-13811
  • Loading branch information
koevskinikola authored Aug 19, 2021
1 parent 6f03067 commit bdf683b
Show file tree
Hide file tree
Showing 20 changed files with 216 additions and 179 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,6 @@ faces-config.NavData
*.lock.db
**/overlays
**/node_modules
camunda-h2-dbs

*.DS_Store
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,11 @@
*/
public class PropertyHelper {

private final static ContainerIntegrationLogger LOG = ProcessEngineLogger.CONTAINER_INTEGRATION_LOGGER;
protected final static ContainerIntegrationLogger LOG = ProcessEngineLogger.CONTAINER_INTEGRATION_LOGGER;

public static final String KEBAB_CASE = "-";
public static final String SNAKE_CASE = "_";
public static final String CAMEL_CASE = "";

/**
* Regex for Ant-style property placeholders
Expand Down Expand Up @@ -99,18 +103,38 @@ public static void applyProperty(Object configuration, String key, String string
/**
* Sets an objects fields via reflection from String values.
* Depending on the field's type the respective values are converted to int or boolean.
* This method allows to specify a property naming strategy, i.e., if a property is written in
* <code>camelCase</code>, <code>kebab-case</code>, or <code>snake_case</code>.
*
* @param configuration
* @param properties
* @param namingStrategy can be either {@link PropertyHelper#KEBAB_CASE}, {@link PropertyHelper#SNAKE_CASE}, or {@link PropertyHelper#CAMEL_CASE}.
* @throws ProcessEngineException if a property is supplied that matches no field or
* if the field's type is not String, nor int, nor boolean.
*/
public static void applyProperties(Object configuration, Map<String, String> properties) {
public static void applyProperties(Object configuration, Map<String, String> properties, String namingStrategy) {
for (Map.Entry<String, String> property : properties.entrySet()) {
applyProperty(configuration, property.getKey(), property.getValue());
String key = property.getKey();
if (!CAMEL_CASE.equals(namingStrategy)) {
key = convertToCamelCase(key, namingStrategy);
}
applyProperty(configuration, key, property.getValue());
}
}

/**
* Sets an objects fields via reflection from String values.
* Depending on the field's type the respective values are converted to int or boolean.
*
* @param configuration
* @param properties
* @throws ProcessEngineException if a property is supplied that matches no field or
* if the field's type is not String, nor int, nor boolean.
*/
public static void applyProperties(Object configuration, Map<String, String> properties) {
applyProperties(configuration, properties, CAMEL_CASE);
}


/**
* Replaces Ant-style property references if the corresponding keys exist in the provided {@link Properties}.
Expand All @@ -133,4 +157,13 @@ public static String resolveProperty(Properties props, String original) {
return found ? buffer.toString() : original;
}

protected static String convertToCamelCase(String value, String token) {
while(value.contains(token)) {
value = value
.replaceFirst(token + "[a-z]",
String.valueOf(Character.toUpperCase(value.charAt(value.indexOf(token) + 1))));
}
return value;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.util.Map;
import java.util.Properties;

import junit.framework.TestCase;
import org.camunda.bpm.engine.ProcessEngineConfiguration;
import org.camunda.bpm.engine.impl.cfg.ProcessEngineConfigurationImpl;
import org.camunda.bpm.engine.impl.cfg.StandaloneProcessEngineConfiguration;
Expand Down Expand Up @@ -48,6 +47,18 @@ public class PropertyHelperTest {
protected static final String WAIT_INCREASE_FACTOR = "waitIncreaseFactor";
protected static final String BACKOFF_TIME_IN_MILLIS = "backoffTimeInMillis";

// kebab case properties
protected static final String KC_JDBC_URL_PROP = "jdbc-url";
protected static final String KC_DB_IDENTITY_USED_PROP = "db-identity-used";
protected static final String KC_WAIT_INCREASE_FACTOR = "wait-increase-factor";
protected static final String KC_BACKOFF_TIME_IN_MILLIS = "backoff-time-in-millis";

// snake case properties
protected static final String SC_JDBC_URL_PROP = "jdbc_url";
protected static final String SC_DB_IDENTITY_USED_PROP = "db_identity_used";
protected static final String SC_WAIT_INCREASE_FACTOR = "wait_increase_factor";
protected static final String SC_BACKOFF_TIME_IN_MILLIS = "backoff_time_in_millis";


/**
* Assert that String, int and boolean properties can be set.
Expand Down Expand Up @@ -166,4 +177,54 @@ public void testResolvePropertyNoTemplate() {
String result = PropertyHelper.resolveProperty(source, "camunda.test.someKey");
Assert.assertEquals("camunda.test.someKey", result);
}

@Test
public void shouldResolveKebabCaseProperties() {
// given
ProcessEngineConfigurationImpl engineConfiguration = new StandaloneProcessEngineConfiguration();
JobExecutor jobExecutor = new DefaultJobExecutor();

Map<String, String> configProperties = new HashMap<>();
configProperties.put(KC_JDBC_URL_PROP, "someUrl");
configProperties.put(KC_DB_IDENTITY_USED_PROP, "true");

Map<String, String> executorProperties = new HashMap<>();
executorProperties.put(KC_BACKOFF_TIME_IN_MILLIS, Integer.toString(Integer.MAX_VALUE));
executorProperties.put(KC_WAIT_INCREASE_FACTOR, Float.toString(Float.MAX_VALUE));

// when
PropertyHelper.applyProperties(engineConfiguration, configProperties, PropertyHelper.KEBAB_CASE);
PropertyHelper.applyProperties(jobExecutor, executorProperties, PropertyHelper.KEBAB_CASE);

// then
Assert.assertEquals(Integer.MAX_VALUE, jobExecutor.getBackoffTimeInMillis());
Assert.assertEquals(Float.MAX_VALUE, jobExecutor.getWaitIncreaseFactor(), 0.0001d);
Assert.assertEquals(true, engineConfiguration.isDbIdentityUsed());
Assert.assertEquals("someUrl", engineConfiguration.getJdbcUrl());
}

@Test
public void shouldResolveSnakeCaseProperties() {
// given
ProcessEngineConfigurationImpl engineConfiguration = new StandaloneProcessEngineConfiguration();
JobExecutor jobExecutor = new DefaultJobExecutor();

Map<String, String> configProperties = new HashMap<>();
configProperties.put(SC_JDBC_URL_PROP, "someUrl");
configProperties.put(SC_DB_IDENTITY_USED_PROP, "true");

Map<String, String> executorProperties = new HashMap<>();
executorProperties.put(SC_BACKOFF_TIME_IN_MILLIS, Integer.toString(Integer.MAX_VALUE));
executorProperties.put(SC_WAIT_INCREASE_FACTOR, Float.toString(Float.MAX_VALUE));

// when
PropertyHelper.applyProperties(engineConfiguration, configProperties, PropertyHelper.SNAKE_CASE);
PropertyHelper.applyProperties(jobExecutor, executorProperties, PropertyHelper.SNAKE_CASE);

// then
Assert.assertEquals(Integer.MAX_VALUE, jobExecutor.getBackoffTimeInMillis());
Assert.assertEquals(Float.MAX_VALUE, jobExecutor.getWaitIncreaseFactor(), 0.0001d);
Assert.assertEquals(true, engineConfiguration.isDbIdentityUsed());
Assert.assertEquals("someUrl", engineConfiguration.getJdbcUrl());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@
import org.camunda.bpm.engine.cdi.impl.context.RequestScopedAssociation;
import org.camunda.bpm.engine.cdi.jsf.TaskForm;
import org.camunda.bpm.engine.impl.cfg.ProcessEngineConfigurationImpl;
import org.camunda.bpm.quarkus.engine.extension.impl.CamundaEngineConfig;
import org.camunda.bpm.quarkus.engine.extension.CamundaEngineConfig;
import org.camunda.bpm.quarkus.engine.extension.impl.CamundaEngineRecorder;
import org.camunda.bpm.quarkus.engine.extension.impl.InjectableBusinessProcessContext;
import org.camunda.bpm.quarkus.engine.extension.impl.QuarkusProcessEngineConfiguration;
import org.camunda.bpm.quarkus.engine.extension.QuarkusProcessEngineConfiguration;
import org.jboss.jandex.DotName;

import javax.enterprise.context.Dependent;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import javax.inject.Inject;

import io.quarkus.test.QuarkusUnitTest;
import org.camunda.bpm.quarkus.engine.extension.impl.CamundaEngineConfig;
import org.camunda.bpm.quarkus.engine.extension.CamundaEngineConfig;
import org.camunda.bpm.quarkus.engine.test.helper.ProcessEngineAwareExtension;
import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
Expand Down Expand Up @@ -52,13 +52,13 @@ public void shouldLoadJobAcquisitionProperties() {
// given a custom application.properties file

// then
assertThat(config.jobExecutor.maxJobsPerAcquisition).isEqualTo(5);
assertThat(config.jobExecutor.lockTimeInMillis).isEqualTo(500000);
assertThat(config.jobExecutor.waitTimeInMillis).isEqualTo(7000);
assertThat(config.jobExecutor.maxWait).isEqualTo(65000);
assertThat(config.jobExecutor.backoffTimeInMillis).isEqualTo(5);
assertThat(config.jobExecutor.maxBackoff).isEqualTo(5);
assertThat(config.jobExecutor.backoffDecreaseThreshold).isEqualTo(120);
assertThat(config.jobExecutor.waitIncreaseFactor).isEqualTo(3);
assertThat(config.jobExecutor.genericConfig.get("max-jobs-per-acquisition")).isEqualTo("5");
assertThat(config.jobExecutor.genericConfig.get("lock-time-in-millis")).isEqualTo("500000");
assertThat(config.jobExecutor.genericConfig.get("wait-time-in-millis")).isEqualTo("7000");
assertThat(config.jobExecutor.genericConfig.get("max-wait")).isEqualTo("65000");
assertThat(config.jobExecutor.genericConfig.get("backoff-time-in-millis")).isEqualTo("5");
assertThat(config.jobExecutor.genericConfig.get("max-backoff")).isEqualTo("5");
assertThat(config.jobExecutor.genericConfig.get("backoff-decrease-threshold")).isEqualTo("120");
assertThat(config.jobExecutor.genericConfig.get("wait-increase-factor")).isEqualTo("3");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import io.quarkus.test.QuarkusUnitTest;
import org.camunda.bpm.engine.ProcessEngine;
import org.camunda.bpm.quarkus.engine.extension.impl.QuarkusProcessEngineConfiguration;
import org.camunda.bpm.quarkus.engine.extension.QuarkusProcessEngineConfiguration;
import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.jupiter.api.Test;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
import org.camunda.bpm.engine.impl.test.TestHelper;
import org.camunda.bpm.engine.runtime.ProcessInstance;
import org.camunda.bpm.engine.test.Deployment;
import org.camunda.bpm.quarkus.engine.extension.QuarkusProcessEngineConfiguration;
import org.camunda.bpm.quarkus.engine.extension.impl.ManagedJobExecutor;
import org.camunda.bpm.quarkus.engine.extension.impl.QuarkusProcessEngineConfiguration;
import org.camunda.bpm.quarkus.engine.test.helper.ProcessEngineAwareExtension;
import org.eclipse.microprofile.context.ManagedExecutor;
import org.jboss.shrinkwrap.api.ShrinkWrap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.camunda.bpm.engine.ProcessEngine;
import org.camunda.bpm.engine.impl.test.TestHelper;
import org.camunda.bpm.engine.test.Deployment;
import org.camunda.bpm.quarkus.engine.extension.impl.QuarkusProcessEngineConfiguration;
import org.camunda.bpm.quarkus.engine.extension.QuarkusProcessEngineConfiguration;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtensionContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.camunda.bpm.engine.impl.cfg.ProcessEngineConfigurationImpl;
import org.camunda.bpm.engine.impl.db.DbIdGenerator;
import org.camunda.bpm.engine.task.Task;
import org.camunda.bpm.quarkus.engine.extension.impl.QuarkusProcessEngineConfiguration;
import org.camunda.bpm.quarkus.engine.extension.QuarkusProcessEngineConfiguration;
import org.camunda.bpm.quarkus.engine.test.helper.ProcessEngineAwareExtension;
import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class ChooseDatasourceConfigurationTest {
@RegisterExtension
static QuarkusUnitTest unitTest = new ProcessEngineAwareExtension()
.withConfigurationResource("persistence/multiple-datasources-application.properties")
.overrideConfigKey("quarkus.camunda.bpm.datasource", "secondary")
.overrideConfigKey("quarkus.camunda.datasource", "secondary")
.setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class));

@Inject
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class ChooseDefaultDatasourceConfigurationTest {
@RegisterExtension
static QuarkusUnitTest unitTest = new ProcessEngineAwareExtension()
.withConfigurationResource("persistence/multiple-datasources-application.properties")
.overrideConfigKey("quarkus.camunda.bpm.datasource", "<default>")
.overrideConfigKey("quarkus.camunda.datasource", "<default>")
.setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class));

@Inject
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class ChooseNotExistingDatasourceConfigurationTest {
@RegisterExtension
static QuarkusUnitTest unitTest = new ProcessEngineAwareExtension()
.withConfigurationResource("persistence/multiple-datasources-application.properties")
.overrideConfigKey("quarkus.camunda.bpm.datasource", "quaternary")
.overrideConfigKey("quarkus.camunda.datasource", "quaternary")
.assertException(throwable -> assertThat(throwable)
.hasMessage("No datasource named 'quaternary' exists")
.isInstanceOf(IllegalArgumentException.class))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
quarkus.camunda.bpm.job-executor.thread-pool.max-pool-size=12
quarkus.camunda.bpm.job-executor.thread-pool.queue-size=5
quarkus.camunda.job-executor.thread-pool.max-pool-size=12
quarkus.camunda.job-executor.thread-pool.queue-size=5

quarkus.camunda.bpm.job-executor.max-jobs-per-acquisition=5
quarkus.camunda.bpm.job-executor.lock-time-in-millis=500000
quarkus.camunda.bpm.job-executor.wait-time-in-millis=7000
quarkus.camunda.bpm.job-executor.max-wait=65000
quarkus.camunda.bpm.job-executor.backoff-time-in-millis=5
quarkus.camunda.bpm.job-executor.max-backoff=5
quarkus.camunda.bpm.job-executor.backoff-decrease-threshold=120
quarkus.camunda.bpm.job-executor.wait-increase-factor=3
quarkus.camunda.job-executor.max-jobs-per-acquisition=5
quarkus.camunda.job-executor.lock-time-in-millis=500000
quarkus.camunda.job-executor.wait-time-in-millis=7000
quarkus.camunda.job-executor.max-wait=65000
quarkus.camunda.job-executor.backoff-time-in-millis=5
quarkus.camunda.job-executor.max-backoff=5
quarkus.camunda.job-executor.backoff-decrease-threshold=120
quarkus.camunda.job-executor.wait-increase-factor=3

quarkus.datasource.jdbc.url=jdbc:h2:mem:camunda;MVCC=TRUE;TRACE_LEVEL_FILE=0;DB_CLOSE_ON_EXIT=FALSE
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import org.camunda.bpm.engine.TaskService;
import org.camunda.bpm.engine.impl.cfg.ProcessEngineConfigurationImpl;
import org.camunda.bpm.engine.impl.test.TestHelper;
import org.camunda.bpm.quarkus.engine.extension.impl.QuarkusProcessEngineConfiguration;
import org.camunda.bpm.quarkus.engine.extension.QuarkusProcessEngineConfiguration;
import org.junit.After;
import org.junit.Before;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.camunda.bpm.quarkus.engine.extension.impl;
package org.camunda.bpm.quarkus.engine.extension;

import io.quarkus.runtime.annotations.ConfigItem;
import io.quarkus.runtime.annotations.ConfigPhase;
import io.quarkus.runtime.annotations.ConfigRoot;

import java.util.Optional;

@ConfigRoot(phase = ConfigPhase.RUN_TIME, name = "camunda.bpm")
@ConfigRoot(phase = ConfigPhase.RUN_TIME, name = "camunda")
public class CamundaEngineConfig {

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH
* under one or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information regarding copyright
* ownership. Camunda licenses this file to you under the Apache License,
* Version 2.0; you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.camunda.bpm.quarkus.engine.extension;

import java.util.Map;

import io.quarkus.runtime.annotations.ConfigGroup;
import io.quarkus.runtime.annotations.ConfigItem;

@ConfigGroup
public class CamundaJobExecutorConfig {

/**
* The Camunda JobExecutor configuration properties. For more details, see
* {@link https://docs.camunda.org/manual/latest/reference/deployment-descriptors/tags/job-executor/#job-acquisition-configuration-properties}
*/
@ConfigItem(name = ConfigItem.PARENT)
public Map<String, String> genericConfig;

/**
* The Camunda JobExecutor thread pool config. This thread pool is responsible for running
* Camunda jobs.
*/
@ConfigItem
public ThreadPoolConfig threadPool;

@ConfigGroup
public static class ThreadPoolConfig {
/**
* Sets the maximum number of threads that can be present in the Quarkus-managed
* thread pool for the Camunda JobExecutor. The default value is 10.
*/
@ConfigItem(defaultValue = "10")
public int maxPoolSize;

/**
* Sets the size of the Quarkus-managed JobExecutor queue. The default value is 3.
*/
@ConfigItem(defaultValue = "3")
public int queueSize;

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.camunda.bpm.quarkus.engine.extension.impl;
package org.camunda.bpm.quarkus.engine.extension;

import org.camunda.bpm.engine.cdi.CdiJtaProcessEngineConfiguration;
import org.camunda.bpm.engine.impl.interceptor.CommandContextInterceptor;
Expand Down
Loading

0 comments on commit bdf683b

Please sign in to comment.