Skip to content

Commit

Permalink
Merge pull request #45853 from mcruzdev/config-kafka-streams
Browse files Browse the repository at this point in the history
Convert kafka-streams extension to use @ConfigMapping
  • Loading branch information
gsmet authored Jan 24, 2025
2 parents 091d578 + 22fd585 commit 4a53e63
Show file tree
Hide file tree
Showing 13 changed files with 136 additions and 182 deletions.
3 changes: 0 additions & 3 deletions extensions/kafka-streams/deployment/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,6 @@
<version>${project.version}</version>
</path>
</annotationProcessorPaths>
<compilerArgs>
<arg>-AlegacyConfigRoot=true</arg>
</compilerArgs>
</configuration>
</execution>
</executions>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
package io.quarkus.kafka.streams.deployment;

import io.quarkus.runtime.annotations.ConfigItem;
import io.quarkus.runtime.annotations.ConfigPhase;
import io.quarkus.runtime.annotations.ConfigRoot;
import io.smallrye.config.ConfigMapping;
import io.smallrye.config.WithDefault;
import io.smallrye.config.WithName;

@ConfigRoot(name = "kafka-streams", phase = ConfigPhase.BUILD_TIME)
public class KafkaStreamsBuildTimeConfig {
@ConfigMapping(prefix = "quarkus.kafka-streams")
@ConfigRoot(phase = ConfigPhase.BUILD_TIME)
public interface KafkaStreamsBuildTimeConfig {

/**
* Whether a health check is published in case the smallrye-health extension is present (defaults to true).
*/
@ConfigItem(name = "health.enabled", defaultValue = "true")
public boolean healthEnabled;
@WithName("health.enabled")
@WithDefault("true")
boolean healthEnabled();
}
Original file line number Diff line number Diff line change
Expand Up @@ -213,11 +213,11 @@ void addHealthChecks(KafkaStreamsBuildTimeConfig buildTimeConfig, BuildProducer<
healthChecks.produce(
new HealthBuildItem(
"io.quarkus.kafka.streams.runtime.health.KafkaStreamsTopicsHealthCheck",
buildTimeConfig.healthEnabled));
buildTimeConfig.healthEnabled()));
healthChecks.produce(
new HealthBuildItem(
"io.quarkus.kafka.streams.runtime.health.KafkaStreamsStateHealthCheck",
buildTimeConfig.healthEnabled));
buildTimeConfig.healthEnabled()));
}

@BuildStep
Expand Down
3 changes: 0 additions & 3 deletions extensions/kafka-streams/runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,6 @@
<version>${project.version}</version>
</path>
</annotationProcessorPaths>
<compilerArgs>
<arg>-AlegacyConfigRoot=true</arg>
</compilerArgs>
</configuration>
</execution>
</executions>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public KafkaStreamsProducer(KafkaStreamsSupport kafkaStreamsSupport, KafkaStream

Properties buildTimeProperties = kafkaStreamsSupport.getProperties();

String bootstrapServersConfig = asString(runtimeConfig.bootstrapServers);
String bootstrapServersConfig = asString(runtimeConfig.bootstrapServers());
if (DEFAULT_KAFKA_BROKER.equalsIgnoreCase(bootstrapServersConfig)) {
// Try to see if kafka.bootstrap.servers is set, if so, use that value, if not, keep localhost:9092
bootstrapServersConfig = ConfigProvider.getConfig().getOptionalValue("kafka.bootstrap.servers", String.class)
Expand All @@ -109,7 +109,7 @@ public KafkaStreamsProducer(KafkaStreamsSupport kafkaStreamsSupport, KafkaStream

this.executorService = executorService;

this.topicsTimeout = runtimeConfig.topicsTimeout;
this.topicsTimeout = runtimeConfig.topicsTimeout();
this.trimmedTopics = isTopicsCheckEnabled() ? runtimeConfig.getTrimmedTopics() : Collections.emptyList();
this.streamsConfig = new StreamsConfig(kafkaStreamsProperties);
this.kafkaStreams = initializeKafkaStreams(streamsConfig, topology.get(),
Expand Down Expand Up @@ -217,93 +217,95 @@ private static Properties getStreamsProperties(Properties properties,

// add runtime options
streamsProperties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServersConfig);
streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, runtimeConfig.applicationId);
streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, runtimeConfig.applicationId());

// app id
if (runtimeConfig.applicationServer.isPresent()) {
streamsProperties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, runtimeConfig.applicationServer.get());
if (runtimeConfig.applicationServer().isPresent()) {
streamsProperties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, runtimeConfig.applicationServer().get());
}

// schema registry
if (runtimeConfig.schemaRegistryUrl.isPresent()) {
streamsProperties.put(runtimeConfig.schemaRegistryKey, runtimeConfig.schemaRegistryUrl.get());
if (runtimeConfig.schemaRegistryUrl().isPresent()) {
streamsProperties.put(runtimeConfig.schemaRegistryKey(), runtimeConfig.schemaRegistryUrl().get());
}

// set the security protocol (in case we are doing PLAIN_TEXT)
setProperty(runtimeConfig.securityProtocol, streamsProperties, CommonClientConfigs.SECURITY_PROTOCOL_CONFIG);
setProperty(runtimeConfig.securityProtocol(), streamsProperties, CommonClientConfigs.SECURITY_PROTOCOL_CONFIG);

// sasl
SaslConfig sc = runtimeConfig.sasl;
SaslConfig sc = runtimeConfig.sasl();
if (sc != null) {
setProperty(sc.mechanism, streamsProperties, SaslConfigs.SASL_MECHANISM);
setProperty(sc.mechanism(), streamsProperties, SaslConfigs.SASL_MECHANISM);

setProperty(sc.jaasConfig, streamsProperties, SaslConfigs.SASL_JAAS_CONFIG);
setProperty(sc.jaasConfig(), streamsProperties, SaslConfigs.SASL_JAAS_CONFIG);

setProperty(sc.clientCallbackHandlerClass, streamsProperties, SaslConfigs.SASL_CLIENT_CALLBACK_HANDLER_CLASS);
setProperty(sc.clientCallbackHandlerClass(), streamsProperties, SaslConfigs.SASL_CLIENT_CALLBACK_HANDLER_CLASS);

setProperty(sc.loginCallbackHandlerClass, streamsProperties, SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS);
setProperty(sc.loginClass, streamsProperties, SaslConfigs.SASL_LOGIN_CLASS);
setProperty(sc.loginCallbackHandlerClass(), streamsProperties, SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS);
setProperty(sc.loginClass(), streamsProperties, SaslConfigs.SASL_LOGIN_CLASS);

setProperty(sc.kerberosServiceName, streamsProperties, SaslConfigs.SASL_KERBEROS_SERVICE_NAME);
setProperty(sc.kerberosKinitCmd, streamsProperties, SaslConfigs.SASL_KERBEROS_KINIT_CMD);
setProperty(sc.kerberosTicketRenewWindowFactor, streamsProperties,
setProperty(sc.kerberosServiceName(), streamsProperties, SaslConfigs.SASL_KERBEROS_SERVICE_NAME);
setProperty(sc.kerberosKinitCmd(), streamsProperties, SaslConfigs.SASL_KERBEROS_KINIT_CMD);
setProperty(sc.kerberosTicketRenewWindowFactor(), streamsProperties,
SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR);
setProperty(sc.kerberosTicketRenewJitter, streamsProperties, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER);
setProperty(sc.kerberosMinTimeBeforeRelogin, streamsProperties, SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN);
setProperty(sc.kerberosTicketRenewJitter(), streamsProperties, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER);
setProperty(sc.kerberosMinTimeBeforeRelogin(), streamsProperties,
SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN);

setProperty(sc.loginRefreshWindowFactor, streamsProperties, SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_FACTOR);
setProperty(sc.loginRefreshWindowJitter, streamsProperties, SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_JITTER);
setProperty(sc.loginRefreshWindowFactor(), streamsProperties, SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_FACTOR);
setProperty(sc.loginRefreshWindowJitter(), streamsProperties, SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_JITTER);

setProperty(sc.loginRefreshMinPeriod, streamsProperties, SaslConfigs.SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS,
setProperty(sc.loginRefreshMinPeriod(), streamsProperties, SaslConfigs.SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS,
DurationToSecondsFunction.INSTANCE);
setProperty(sc.loginRefreshBuffer, streamsProperties, SaslConfigs.SASL_LOGIN_REFRESH_BUFFER_SECONDS,
setProperty(sc.loginRefreshBuffer(), streamsProperties, SaslConfigs.SASL_LOGIN_REFRESH_BUFFER_SECONDS,
DurationToSecondsFunction.INSTANCE);
}

// ssl
SslConfig ssl = runtimeConfig.ssl;
SslConfig ssl = runtimeConfig.ssl();
if (ssl != null) {
setProperty(ssl.protocol, streamsProperties, SslConfigs.SSL_PROTOCOL_CONFIG);
setProperty(ssl.provider, streamsProperties, SslConfigs.SSL_PROVIDER_CONFIG);
setProperty(ssl.cipherSuites, streamsProperties, SslConfigs.SSL_CIPHER_SUITES_CONFIG);
setProperty(ssl.enabledProtocols, streamsProperties, SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG);

setTrustStoreConfig(ssl.truststore, streamsProperties);
setKeyStoreConfig(ssl.keystore, streamsProperties);
setKeyConfig(ssl.key, streamsProperties);

setProperty(ssl.keymanagerAlgorithm, streamsProperties, SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG);
setProperty(ssl.trustmanagerAlgorithm, streamsProperties, SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG);
Optional<String> eia = Optional.of(ssl.endpointIdentificationAlgorithm.orElse(""));
setProperty(ssl.protocol(), streamsProperties, SslConfigs.SSL_PROTOCOL_CONFIG);
setProperty(ssl.provider(), streamsProperties, SslConfigs.SSL_PROVIDER_CONFIG);
setProperty(ssl.cipherSuites(), streamsProperties, SslConfigs.SSL_CIPHER_SUITES_CONFIG);
setProperty(ssl.enabledProtocols(), streamsProperties, SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG);

setTrustStoreConfig(ssl.truststore(), streamsProperties);
setKeyStoreConfig(ssl.keystore(), streamsProperties);
setKeyConfig(ssl.key(), streamsProperties);

setProperty(ssl.keymanagerAlgorithm(), streamsProperties, SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG);
setProperty(ssl.trustmanagerAlgorithm(), streamsProperties, SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG);
Optional<String> eia = Optional.of(ssl.endpointIdentificationAlgorithm().orElse(""));
setProperty(eia, streamsProperties, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG);
setProperty(ssl.secureRandomImplementation, streamsProperties, SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG);
setProperty(ssl.secureRandomImplementation(), streamsProperties,
SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG);
}

return streamsProperties;
}

private static void setTrustStoreConfig(TrustStoreConfig tsc, Properties properties) {
if (tsc != null) {
setProperty(tsc.type, properties, SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG);
setProperty(tsc.location, properties, SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG);
setProperty(tsc.password, properties, SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG);
setProperty(tsc.certificates, properties, SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG);
setProperty(tsc.type(), properties, SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG);
setProperty(tsc.location(), properties, SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG);
setProperty(tsc.password(), properties, SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG);
setProperty(tsc.certificates(), properties, SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG);
}
}

private static void setKeyStoreConfig(KeyStoreConfig ksc, Properties properties) {
if (ksc != null) {
setProperty(ksc.type, properties, SslConfigs.SSL_KEYSTORE_TYPE_CONFIG);
setProperty(ksc.location, properties, SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG);
setProperty(ksc.password, properties, SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG);
setProperty(ksc.key, properties, SslConfigs.SSL_KEYSTORE_KEY_CONFIG);
setProperty(ksc.certificateChain, properties, SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG);
setProperty(ksc.type(), properties, SslConfigs.SSL_KEYSTORE_TYPE_CONFIG);
setProperty(ksc.location(), properties, SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG);
setProperty(ksc.password(), properties, SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG);
setProperty(ksc.key(), properties, SslConfigs.SSL_KEYSTORE_KEY_CONFIG);
setProperty(ksc.certificateChain(), properties, SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG);
}
}

private static void setKeyConfig(KeyConfig kc, Properties properties) {
if (kc != null) {
setProperty(kc.password, properties, SslConfigs.SSL_KEY_PASSWORD_CONFIG);
setProperty(kc.password(), properties, SslConfigs.SSL_KEY_PASSWORD_CONFIG);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,52 +6,53 @@
import java.util.Optional;
import java.util.stream.Collectors;

import io.quarkus.runtime.annotations.ConfigItem;
import io.quarkus.runtime.annotations.ConfigPhase;
import io.quarkus.runtime.annotations.ConfigRoot;
import io.smallrye.config.ConfigMapping;
import io.smallrye.config.WithDefault;
import io.smallrye.config.WithName;

@ConfigRoot(name = "kafka-streams", phase = ConfigPhase.RUN_TIME)
public class KafkaStreamsRuntimeConfig {
@ConfigMapping(prefix = "quarkus.kafka-streams")
@ConfigRoot(phase = ConfigPhase.RUN_TIME)
public interface KafkaStreamsRuntimeConfig {

/**
* Default Kafka bootstrap server.
*/
public static final String DEFAULT_KAFKA_BROKER = "localhost:9092";
String DEFAULT_KAFKA_BROKER = "localhost:9092";

/**
* A unique identifier for this Kafka Streams application.
* If not set, defaults to quarkus.application.name.
*/
@ConfigItem(defaultValue = "${quarkus.application.name}")
public String applicationId;
@WithDefault("${quarkus.application.name}")
String applicationId();

/**
* A comma-separated list of host:port pairs identifying the Kafka bootstrap server(s).
* If not set, fallback to {@code kafka.bootstrap.servers}, and if not set either use {@code localhost:9092}.
*/
@ConfigItem(defaultValue = DEFAULT_KAFKA_BROKER)
public List<InetSocketAddress> bootstrapServers;
@WithDefault(DEFAULT_KAFKA_BROKER)
List<InetSocketAddress> bootstrapServers();

/**
* A unique identifier of this application instance, typically in the form host:port.
*/
@ConfigItem
public Optional<String> applicationServer;
Optional<String> applicationServer();

/**
* A comma-separated list of topic names.
* The pipeline will only be started once all these topics are present in the Kafka cluster
* and {@code ignore.topics} is set to false.
*/
@ConfigItem
public Optional<List<String>> topics;
Optional<List<String>> topics();

/**
* Timeout to wait for topic names to be returned from admin client.
* If set to 0 (or negative), {@code topics} check is ignored.
*/
@ConfigItem(defaultValue = "10S")
public Duration topicsTimeout;
@WithDefault("10S")
Duration topicsTimeout();

/**
* The schema registry key.
Expand All @@ -62,48 +63,33 @@ public class KafkaStreamsRuntimeConfig {
* For Apicurio Registry, use {@code apicurio.registry.url}.
* For Confluent schema registry, use {@code schema.registry.url}.
*/
@ConfigItem(defaultValue = "schema.registry.url")
public String schemaRegistryKey;
@WithDefault("schema.registry.url")
String schemaRegistryKey();

/**
* The schema registry URL.
*/
@ConfigItem
public Optional<String> schemaRegistryUrl;
Optional<String> schemaRegistryUrl();

/**
* The security protocol to use
* See https://docs.confluent.io/current/streams/developer-guide/security.html#security-example
*/
@ConfigItem(name = "security.protocol")
public Optional<String> securityProtocol;
@WithName("security.protocol")
Optional<String> securityProtocol();

/**
* The SASL JAAS config.
*/
public SaslConfig sasl;
SaslConfig sasl();

/**
* Kafka SSL config
*/
public SslConfig ssl;
SslConfig ssl();

@Override
public String toString() {
return "KafkaStreamsRuntimeConfig{" +
"applicationId='" + applicationId + '\'' +
", bootstrapServers=" + bootstrapServers +
", applicationServer=" + applicationServer +
", topics=" + topics +
", schemaRegistryKey='" + schemaRegistryKey + '\'' +
", schemaRegistryUrl=" + schemaRegistryUrl +
", sasl=" + sasl +
", ssl=" + ssl +
'}';
}

public List<String> getTrimmedTopics() {
return topics.orElseThrow(() -> new IllegalArgumentException("Missing list of topics"))
default List<String> getTrimmedTopics() {
return topics().orElseThrow(() -> new IllegalArgumentException("Missing list of topics"))
.stream().map(String::trim).collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,11 @@
import java.util.Optional;

import io.quarkus.runtime.annotations.ConfigGroup;
import io.quarkus.runtime.annotations.ConfigItem;

@ConfigGroup
public class KeyConfig {
public interface KeyConfig {
/**
* Password of the private key in the key store
*/
@ConfigItem
public Optional<String> password;
Optional<String> password();
}
Loading

0 comments on commit 4a53e63

Please sign in to comment.