diff --git a/.github/workflows/pulsar-ci.yaml b/.github/workflows/pulsar-ci.yaml
index 7767beaa9aa1e..effeab90beb95 100644
--- a/.github/workflows/pulsar-ci.yaml
+++ b/.github/workflows/pulsar-ci.yaml
@@ -589,6 +589,9 @@ jobs:
- name: Transaction
group: TRANSACTION
+ - name: Metrics
+ group: METRICS
+
steps:
- name: checkout
uses: actions/checkout@v4
diff --git a/build/run_integration_group.sh b/build/run_integration_group.sh
index f20a7ad0793e1..2d82fce08878d 100755
--- a/build/run_integration_group.sh
+++ b/build/run_integration_group.sh
@@ -181,6 +181,10 @@ test_group_transaction() {
mvn_run_integration_test "$@" -DintegrationTestSuiteFile=pulsar-transaction.xml -DintegrationTests
}
+test_group_metrics() {
+ mvn_run_integration_test "$@" -DintegrationTestSuiteFile=pulsar-metrics.xml -DintegrationTests
+}
+
test_group_tiered_filesystem() {
mvn_run_integration_test "$@" -DintegrationTestSuiteFile=tiered-filesystem-storage.xml -DintegrationTests
}
diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt
index e3941c54a74b1..7c95811faf7de 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -333,6 +333,13 @@ The Apache Software License, Version 2.0
- io.prometheus-simpleclient_tracer_common-0.16.0.jar
- io.prometheus-simpleclient_tracer_otel-0.16.0.jar
- io.prometheus-simpleclient_tracer_otel_agent-0.16.0.jar
+ * Prometheus exporter
+ - io.prometheus-prometheus-metrics-config-1.1.0.jar
+ - io.prometheus-prometheus-metrics-exporter-common-1.1.0.jar
+ - io.prometheus-prometheus-metrics-exporter-httpserver-1.1.0.jar
+ - io.prometheus-prometheus-metrics-exposition-formats-1.1.0.jar
+ - io.prometheus-prometheus-metrics-model-1.1.0.jar
+ - io.prometheus-prometheus-metrics-shaded-protobuf-1.1.0.jar
* Jakarta Bean Validation API
- jakarta.validation-jakarta.validation-api-2.0.2.jar
- javax.validation-validation-api-1.1.0.Final.jar
@@ -503,6 +510,27 @@ The Apache Software License, Version 2.0
* RoaringBitmap
- org.roaringbitmap-RoaringBitmap-0.9.44.jar
- org.roaringbitmap-shims-0.9.44.jar
+ * OpenTelemetry
+ - io.opentelemetry-opentelemetry-api-1.34.1.jar
+ - io.opentelemetry-opentelemetry-api-events-1.34.1-alpha.jar
+ - io.opentelemetry-opentelemetry-context-1.34.1.jar
+ - io.opentelemetry-opentelemetry-exporter-common-1.34.1.jar
+ - io.opentelemetry-opentelemetry-exporter-otlp-1.34.1.jar
+ - io.opentelemetry-opentelemetry-exporter-otlp-common-1.34.1.jar
+ - io.opentelemetry-opentelemetry-exporter-prometheus-1.34.1-alpha.jar
+ - io.opentelemetry-opentelemetry-exporter-sender-okhttp-1.34.1.jar
+ - io.opentelemetry-opentelemetry-extension-incubator-1.34.1-alpha.jar
+ - io.opentelemetry-opentelemetry-sdk-1.34.1.jar
+ - io.opentelemetry-opentelemetry-sdk-common-1.34.1.jar
+ - io.opentelemetry-opentelemetry-sdk-extension-autoconfigure-1.34.1.jar
+ - io.opentelemetry-opentelemetry-sdk-extension-autoconfigure-spi-1.34.1.jar
+ - io.opentelemetry-opentelemetry-sdk-logs-1.34.1.jar
+ - io.opentelemetry-opentelemetry-sdk-metrics-1.34.1.jar
+ - io.opentelemetry-opentelemetry-sdk-trace-1.34.1.jar
+ - io.opentelemetry.instrumentation-opentelemetry-instrumentation-api-1.32.1.jar
+ - io.opentelemetry.instrumentation-opentelemetry-instrumentation-api-semconv-1.32.1-alpha.jar
+ - io.opentelemetry.instrumentation-opentelemetry-resources-1.32.1-alpha.jar
+ - io.opentelemetry.semconv-opentelemetry-semconv-1.23.1-alpha.jar
BSD 3-clause "New" or "Revised" License
* Google auth library
diff --git a/pom.xml b/pom.xml
index 4dfeb30821a3f..52a638ac09f3e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -249,6 +249,10 @@ flexible messaging model and an intuitive client API.
3.4.3
1.5.2-3
2.0.6
+ 1.34.1
+ 1.34.1-alpha
+ 1.32.1-alpha
+ 1.23.1-alpha
1.18.3
@@ -1446,6 +1450,31 @@ flexible messaging model and an intuitive client API.
${restassured.version}
test
+
+
+ io.opentelemetry
+ opentelemetry-bom
+ ${opentelemetry.version}
+ pom
+ import
+
+
+ io.opentelemetry
+ opentelemetry-bom-alpha
+ ${opentelemetry.alpha.version}
+ pom
+ import
+
+
+ io.opentelemetry.instrumentation
+ opentelemetry-resources
+ ${opentelemetry.instrumentation.version}
+
+
+ io.opentelemetry.semconv
+ opentelemetry-semconv
+ ${opentelemetry.semconv.version}
+
@@ -2266,6 +2295,7 @@ flexible messaging model and an intuitive client API.
pulsar-broker-auth-sasl
pulsar-client-auth-sasl
pulsar-config-validation
+ pulsar-opentelemetry
structured-event-log
@@ -2330,6 +2360,7 @@ flexible messaging model and an intuitive client API.
pulsar-broker-auth-sasl
pulsar-client-auth-sasl
pulsar-config-validation
+ pulsar-opentelemetry
pulsar-transaction
diff --git a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java
index f6ad76a083bc3..a27384c989000 100644
--- a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java
+++ b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java
@@ -242,6 +242,7 @@ void testAuthentication() throws Exception {
proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
proxyConfig.setSaslJaasClientAllowedIds(".*" + localHostname + ".*");
proxyConfig.setSaslJaasServerSectionName("PulsarProxy");
+ proxyConfig.setClusterName(configClusterName);
// proxy connect to broker
proxyConfig.setBrokerClientAuthenticationPlugin(AuthenticationSasl.class.getName());
diff --git a/pulsar-broker-common/pom.xml b/pulsar-broker-common/pom.xml
index d73dba288a3c6..8e942c78d5b40 100644
--- a/pulsar-broker-common/pom.xml
+++ b/pulsar-broker-common/pom.xml
@@ -82,10 +82,28 @@
awaitility
test
+
+
+ io.rest-assured
+ rest-assured
+ test
+
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+
+
+
+ test-jar
+
+
+
+
+
org.gaul
modernizer-maven-plugin
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsClient.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsClient.java
similarity index 100%
rename from pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsClient.java
rename to pulsar-broker-common/src/test/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsClient.java
diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml
index c39de184b05cc..18da38b43dc25 100644
--- a/pulsar-broker/pom.xml
+++ b/pulsar-broker/pom.xml
@@ -143,6 +143,12 @@
${project.version}
+
+ ${project.groupId}
+ pulsar-opentelemetry
+ ${project.version}
+
+
${project.groupId}
pulsar-io-batch-discovery-triggerers
@@ -209,6 +215,14 @@
test
+
+ ${project.groupId}
+ pulsar-broker-common
+ ${project.version}
+ test-jar
+ test
+
+
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 054411c49f6ef..3701f354b62b0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -108,6 +108,7 @@
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.service.schema.SchemaStorageFactory;
import org.apache.pulsar.broker.stats.MetricsGenerator;
+import org.apache.pulsar.broker.stats.PulsarBrokerOpenTelemetry;
import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
import org.apache.pulsar.broker.stats.prometheus.PulsarPrometheusMetricsServlet;
import org.apache.pulsar.broker.storage.ManagedLedgerStorage;
@@ -248,6 +249,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
private final Timer brokerClientSharedTimer;
private MetricsGenerator metricsGenerator;
+ private PulsarBrokerOpenTelemetry openTelemetry;
private TransactionMetadataStoreService transactionMetadataStoreService;
private TransactionBufferProvider transactionBufferProvider;
@@ -461,6 +463,9 @@ public CompletableFuture closeAsync() {
}
resetMetricsServlet();
+ if (openTelemetry != null) {
+ openTelemetry.close();
+ }
if (this.compactionServiceFactory != null) {
try {
@@ -897,6 +902,7 @@ public void start() throws PulsarServerException {
}
this.metricsGenerator = new MetricsGenerator(this);
+ this.openTelemetry = new PulsarBrokerOpenTelemetry(config);
// Initialize the message protocol handlers.
// start the protocol handlers only after the broker is ready,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/PulsarBrokerOpenTelemetry.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/PulsarBrokerOpenTelemetry.java
new file mode 100644
index 0000000000000..4b76b993001c2
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/PulsarBrokerOpenTelemetry.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats;
+
+import io.opentelemetry.api.metrics.Meter;
+import java.io.Closeable;
+import lombok.Getter;
+import org.apache.pulsar.PulsarVersion;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.opentelemetry.OpenTelemetryService;
+
+public class PulsarBrokerOpenTelemetry implements Closeable {
+
+ public static final String SERVICE_NAME = "pulsar-broker";
+ private final OpenTelemetryService openTelemetryService;
+
+ @Getter
+ private final Meter meter;
+
+ public PulsarBrokerOpenTelemetry(ServiceConfiguration config) {
+ openTelemetryService = OpenTelemetryService.builder()
+ .clusterName(config.getClusterName())
+ .serviceName(SERVICE_NAME)
+ .serviceVersion(PulsarVersion.getVersion())
+ .build();
+ meter = openTelemetryService.getOpenTelemetry().getMeter("org.apache.pulsar.broker");
+ }
+
+ @Override
+ public void close() {
+ openTelemetryService.close();
+ }
+}
diff --git a/pulsar-functions/worker/pom.xml b/pulsar-functions/worker/pom.xml
index cd89bacbf9ede..bb93eeb98d7e1 100644
--- a/pulsar-functions/worker/pom.xml
+++ b/pulsar-functions/worker/pom.xml
@@ -46,6 +46,12 @@
${project.version}
+
+ ${project.groupId}
+ pulsar-opentelemetry
+ ${project.version}
+
+
${project.groupId}
pulsar-client-original
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerOpenTelemetry.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerOpenTelemetry.java
new file mode 100644
index 0000000000000..be7c15dfd85e0
--- /dev/null
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerOpenTelemetry.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker;
+
+import io.opentelemetry.api.metrics.Meter;
+import java.io.Closeable;
+import lombok.Getter;
+import org.apache.pulsar.PulsarVersion;
+import org.apache.pulsar.opentelemetry.OpenTelemetryService;
+
+public class PulsarWorkerOpenTelemetry implements Closeable {
+
+ public static final String SERVICE_NAME = "pulsar-function-worker";
+ private final OpenTelemetryService openTelemetryService;
+
+ @Getter
+ private final Meter meter;
+
+ public PulsarWorkerOpenTelemetry(WorkerConfig workerConfig) {
+ openTelemetryService = OpenTelemetryService.builder()
+ .clusterName(workerConfig.getPulsarFunctionsCluster())
+ .serviceName(SERVICE_NAME)
+ .serviceVersion(PulsarVersion.getVersion())
+ .build();
+ meter = openTelemetryService.getOpenTelemetry().getMeter("org.apache.pulsar.function_worker");
+ }
+
+ @Override
+ public void close() {
+ openTelemetryService.close();
+ }
+}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
index 16cf778e07290..9f7d1996e0bb5 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
@@ -108,6 +108,7 @@ public interface PulsarClientCreator {
private PulsarAdmin brokerAdmin;
private PulsarAdmin functionAdmin;
private MetricsGenerator metricsGenerator;
+ private PulsarWorkerOpenTelemetry openTelemetry;
@VisibleForTesting
private URI dlogUri;
private LeaderService leaderService;
@@ -188,6 +189,7 @@ public void init(WorkerConfig workerConfig,
this.statsUpdater = Executors
.newSingleThreadScheduledExecutor(new DefaultThreadFactory("worker-stats-updater"));
this.metricsGenerator = new MetricsGenerator(this.statsUpdater, workerConfig);
+ this.openTelemetry = new PulsarWorkerOpenTelemetry(workerConfig);
this.workerConfig = workerConfig;
this.dlogUri = dlogUri;
this.workerStatsManager = new WorkerStatsManager(workerConfig, runAsStandalone);
@@ -659,6 +661,10 @@ public void stop() {
if (null != stateStoreProvider) {
stateStoreProvider.close();
}
+
+ if (null != openTelemetry) {
+ openTelemetry.close();
+ }
}
}
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailerTest.java
index 022ebd6ba48b8..c78c68f8923d8 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailerTest.java
@@ -60,6 +60,8 @@
@Slf4j
public class FunctionAssignmentTailerTest {
+ private static final String CLUSTER_NAME = "test-cluster";
+
@Test(timeOut = 10000)
public void testErrorNotifier() throws Exception {
WorkerConfig workerConfig = new WorkerConfig();
@@ -71,6 +73,7 @@ public void testErrorNotifier() throws Exception {
workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
workerConfig.setStateStorageServiceUrl("foo");
workerConfig.setFunctionAssignmentTopicName("assignments");
+ workerConfig.setPulsarFunctionsCluster(CLUSTER_NAME);
Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
Function.FunctionDetails.newBuilder()
@@ -183,6 +186,7 @@ public void testProcessingAssignments() throws Exception {
workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
workerConfig.setStateStorageServiceUrl("foo");
workerConfig.setFunctionAssignmentTopicName("assignments");
+ workerConfig.setPulsarFunctionsCluster(CLUSTER_NAME);
Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
Function.FunctionDetails.newBuilder()
@@ -307,6 +311,7 @@ public void testTriggerReadToTheEndAndExit() throws Exception {
workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
workerConfig.setStateStorageServiceUrl("foo");
workerConfig.setFunctionAssignmentTopicName("assignments");
+ workerConfig.setPulsarFunctionsCluster(CLUSTER_NAME);
Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
Function.FunctionDetails.newBuilder()
diff --git a/pulsar-opentelemetry/pom.xml b/pulsar-opentelemetry/pom.xml
new file mode 100644
index 0000000000000..82a9658cc9d31
--- /dev/null
+++ b/pulsar-opentelemetry/pom.xml
@@ -0,0 +1,135 @@
+
+
+
+ 4.0.0
+
+ org.apache.pulsar
+ pulsar
+ 3.3.0-SNAPSHOT
+
+
+ pulsar-opentelemetry
+ OpenTelemetry Integration
+
+
+
+
+ io.opentelemetry
+ opentelemetry-exporter-otlp
+
+
+ io.opentelemetry
+ opentelemetry-exporter-prometheus
+
+
+ io.opentelemetry
+ opentelemetry-sdk
+
+
+ io.opentelemetry
+ opentelemetry-sdk-extension-autoconfigure
+
+
+ io.opentelemetry.instrumentation
+ opentelemetry-resources
+
+
+ io.opentelemetry.semconv
+ opentelemetry-semconv
+
+
+
+ com.google.guava
+ guava
+
+
+
+ org.apache.commons
+ commons-lang3
+
+
+
+
+ org.apache.pulsar
+ pulsar-broker-common
+ ${project.version}
+ test-jar
+ test
+
+
+
+ io.rest-assured
+ rest-assured
+ test
+
+
+
+ org.awaitility
+ awaitility
+ test
+
+
+
+ io.opentelemetry
+ opentelemetry-sdk-testing
+ test
+
+
+
+
+
+
+ org.gaul
+ modernizer-maven-plugin
+
+ true
+ 8
+
+
+
+ modernizer
+ verify
+
+ modernizer
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-checkstyle-plugin
+
+
+ checkstyle
+ verify
+
+ check
+
+
+
+
+
+
+
diff --git a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java
new file mode 100644
index 0000000000000..bdb002cb359ff
--- /dev/null
+++ b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.opentelemetry;
+
+import io.opentelemetry.api.common.AttributeKey;
+
+/**
+ * Common OpenTelemetry attributes to be used by Pulsar components.
+ */
+public interface OpenTelemetryAttributes {
+ /**
+ * The name of the Pulsar cluster. This attribute is automatically added to all signals by
+ * {@link OpenTelemetryService}.
+ */
+ AttributeKey PULSAR_CLUSTER = AttributeKey.stringKey("pulsar.cluster");
+}
diff --git a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryService.java b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryService.java
new file mode 100644
index 0000000000000..5ead1ff265c83
--- /dev/null
+++ b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryService.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.opentelemetry;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import com.google.common.annotations.VisibleForTesting;
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.sdk.OpenTelemetrySdk;
+import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk;
+import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdkBuilder;
+import io.opentelemetry.sdk.resources.Resource;
+import io.opentelemetry.semconv.ResourceAttributes;
+import java.io.Closeable;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Consumer;
+import lombok.Builder;
+import org.apache.commons.lang3.StringUtils;
+
+/**
+ * Provides a common OpenTelemetry service for Pulsar components to use. Responsible for instantiating the OpenTelemetry
+ * SDK with a set of override properties. Once initialized, furnishes access to OpenTelemetry.
+ */
+public class OpenTelemetryService implements Closeable {
+
+ static final String OTEL_SDK_DISABLED_KEY = "otel.sdk.disabled";
+ static final int MAX_CARDINALITY_LIMIT = 10000;
+
+ private final OpenTelemetrySdk openTelemetrySdk;
+
+ /**
+ * Instantiates the OpenTelemetry SDK. All attributes are overridden by system properties or environment
+ * variables.
+ *
+ * @param clusterName
+ * The name of the Pulsar cluster. Cannot be null or blank.
+ * @param serviceName
+ * The name of the service. Optional.
+ * @param serviceVersion
+ * The version of the service. Optional.
+ * @param sdkBuilderConsumer
+ * Allows customizing the SDK builder; for testing purposes only.
+ */
+ @Builder
+ public OpenTelemetryService(String clusterName,
+ String serviceName,
+ String serviceVersion,
+ @VisibleForTesting Consumer sdkBuilderConsumer) {
+ checkArgument(StringUtils.isNotBlank(clusterName), "Cluster name cannot be empty");
+ var sdkBuilder = AutoConfiguredOpenTelemetrySdk.builder();
+
+ sdkBuilder.addPropertiesSupplier(() -> Map.of(
+ OTEL_SDK_DISABLED_KEY, "true",
+ // Cardinality limit includes the overflow attribute set, so we need to add 1.
+ "otel.experimental.metrics.cardinality.limit", Integer.toString(MAX_CARDINALITY_LIMIT + 1)
+ ));
+
+ sdkBuilder.addResourceCustomizer(
+ (resource, __) -> {
+ var resourceBuilder = Resource.builder();
+ // Do not override attributes if already set (via system properties or environment variables).
+ if (resource.getAttribute(OpenTelemetryAttributes.PULSAR_CLUSTER) == null) {
+ resourceBuilder.put(OpenTelemetryAttributes.PULSAR_CLUSTER, clusterName);
+ }
+ if (StringUtils.isNotBlank(serviceName)
+ && Objects.equals(Resource.getDefault().getAttribute(ResourceAttributes.SERVICE_NAME),
+ resource.getAttribute(ResourceAttributes.SERVICE_NAME))) {
+ resourceBuilder.put(ResourceAttributes.SERVICE_NAME, serviceName);
+ }
+ if (StringUtils.isNotBlank(serviceVersion)
+ && resource.getAttribute(ResourceAttributes.SERVICE_VERSION) == null) {
+ resourceBuilder.put(ResourceAttributes.SERVICE_VERSION, serviceVersion);
+ }
+ return resource.merge(resourceBuilder.build());
+ });
+
+ if (sdkBuilderConsumer != null) {
+ sdkBuilderConsumer.accept(sdkBuilder);
+ }
+
+ openTelemetrySdk = sdkBuilder.build().getOpenTelemetrySdk();
+ }
+
+ public OpenTelemetry getOpenTelemetry() {
+ return openTelemetrySdk;
+ }
+
+ @Override
+ public void close() {
+ openTelemetrySdk.close();
+ }
+}
diff --git a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/package-info.java b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/package-info.java
new file mode 100644
index 0000000000000..9a7426aa0471d
--- /dev/null
+++ b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/package-info.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Provides a wrapper layer for the OpenTelemetry API to be used in Pulsar.
+ * @since 3.3.0
+ */
+package org.apache.pulsar.opentelemetry;
\ No newline at end of file
diff --git a/pulsar-opentelemetry/src/test/java/org/apache/pulsar/opentelemetry/OpenTelemetryServiceTest.java b/pulsar-opentelemetry/src/test/java/org/apache/pulsar/opentelemetry/OpenTelemetryServiceTest.java
new file mode 100644
index 0000000000000..e5c893794a069
--- /dev/null
+++ b/pulsar-opentelemetry/src/test/java/org/apache/pulsar/opentelemetry/OpenTelemetryServiceTest.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.opentelemetry;
+
+import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat;
+import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
+import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.LongCounterBuilder;
+import io.opentelemetry.api.metrics.Meter;
+import io.opentelemetry.instrumentation.resources.JarServiceNameDetector;
+import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdkBuilder;
+import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
+import io.opentelemetry.sdk.metrics.export.MetricReader;
+import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
+import io.opentelemetry.semconv.ResourceAttributes;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+import lombok.Cleanup;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient;
+import org.assertj.core.api.AbstractCharSequenceAssert;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class OpenTelemetryServiceTest {
+
+ private OpenTelemetryService openTelemetryService;
+ private InMemoryMetricReader reader;
+ private Meter meter;
+
+ @BeforeMethod
+ public void setup() throws Exception {
+ reader = InMemoryMetricReader.create();
+ openTelemetryService = OpenTelemetryService.builder().
+ sdkBuilderConsumer(getSdkBuilderConsumer(reader,
+ Map.of(OpenTelemetryService.OTEL_SDK_DISABLED_KEY, "false"))).
+ clusterName("openTelemetryServiceTestCluster").
+ build();
+ meter = openTelemetryService.getOpenTelemetry().getMeter("openTelemetryServiceTestInstrument");
+ }
+
+ @AfterMethod
+ public void teardown() throws Exception {
+ openTelemetryService.close();
+ reader.close();
+ }
+
+ // Customizes the SDK builder to include the MetricReader and extra properties for testing purposes.
+ private static Consumer getSdkBuilderConsumer(MetricReader extraReader,
+ Map extraProperties) {
+ return autoConfigurationCustomizer -> {
+ if (extraReader != null) {
+ autoConfigurationCustomizer.addMeterProviderCustomizer(
+ (sdkMeterProviderBuilder, __) -> sdkMeterProviderBuilder.registerMetricReader(extraReader));
+ }
+ autoConfigurationCustomizer.addPropertiesSupplier(() -> extraProperties);
+ };
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testClusterNameCannotBeNull() {
+ @Cleanup
+ var ots = OpenTelemetryService.builder().build();
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testClusterNameCannotBeEmpty() {
+ @Cleanup
+ var ots = OpenTelemetryService.builder().clusterName(StringUtils.EMPTY).build();
+ }
+
+ @Test
+ public void testResourceAttributesAreSet() throws Exception {
+ @Cleanup
+ var reader = InMemoryMetricReader.create();
+
+ @Cleanup
+ var ots = OpenTelemetryService.builder().
+ sdkBuilderConsumer(getSdkBuilderConsumer(reader,
+ Map.of(OpenTelemetryService.OTEL_SDK_DISABLED_KEY, "false",
+ "otel.java.disabled.resource.providers", JarServiceNameDetector.class.getName()))).
+ clusterName("testServiceNameAndVersion").
+ serviceName("openTelemetryServiceTestService").
+ serviceVersion("1.0.0").
+ build();
+
+ assertThat(reader.collectAllMetrics())
+ .allSatisfy(metric -> assertThat(metric)
+ .hasResourceSatisfying(resource -> resource
+ .hasAttribute(OpenTelemetryAttributes.PULSAR_CLUSTER, "testServiceNameAndVersion")
+ .hasAttribute(ResourceAttributes.SERVICE_NAME, "openTelemetryServiceTestService")
+ .hasAttribute(ResourceAttributes.SERVICE_VERSION, "1.0.0")
+ .hasAttribute(satisfies(ResourceAttributes.HOST_NAME, AbstractCharSequenceAssert::isNotBlank))));
+ }
+
+ @Test
+ public void testIsInstrumentationNameSetOnMeter() {
+ var meter = openTelemetryService.getOpenTelemetry().getMeter("testInstrumentationScope");
+ meter.counterBuilder("dummyCounter").build().add(1);
+ assertThat(reader.collectAllMetrics())
+ .anySatisfy(metricData -> assertThat(metricData)
+ .hasInstrumentationScope(InstrumentationScopeInfo.create("testInstrumentationScope")));
+ }
+
+ @Test
+ public void testMetricCardinalityIsSet() {
+ var prometheusExporterPort = 9464;
+ @Cleanup
+ var ots = OpenTelemetryService.builder().
+ sdkBuilderConsumer(getSdkBuilderConsumer(null,
+ Map.of(OpenTelemetryService.OTEL_SDK_DISABLED_KEY, "false",
+ "otel.metrics.exporter", "prometheus",
+ "otel.exporter.prometheus.port", Integer.toString(prometheusExporterPort)))).
+ clusterName("openTelemetryServiceCardinalityTestCluster").
+ build();
+ var meter = ots.getOpenTelemetry().getMeter("openTelemetryMetricCardinalityTest");
+ var counter = meter.counterBuilder("dummyCounter").build();
+ for (int i = 0; i < OpenTelemetryService.MAX_CARDINALITY_LIMIT + 100; i++) {
+ counter.add(1, Attributes.of(AttributeKey.stringKey("attribute"), "value" + i));
+ }
+
+ Awaitility.waitAtMost(30, TimeUnit.SECONDS).ignoreExceptions().until(() -> {
+ var client = new PrometheusMetricsClient("localhost", prometheusExporterPort);
+ var allMetrics = client.getMetrics();
+ var actualMetrics = allMetrics.findByNameAndLabels("dummyCounter_total");
+ var overflowMetric = allMetrics.findByNameAndLabels("dummyCounter_total", "otel_metric_overflow", "true");
+ return actualMetrics.size() == OpenTelemetryService.MAX_CARDINALITY_LIMIT + 1 && overflowMetric.size() == 1;
+ });
+ }
+
+ @Test
+ public void testLongCounter() {
+ var longCounter = meter.counterBuilder("dummyLongCounter").build();
+ var attributes = Attributes.of(AttributeKey.stringKey("dummyAttr"), "dummyValue");
+ longCounter.add(1, attributes);
+ longCounter.add(2, attributes);
+
+ assertThat(reader.collectAllMetrics())
+ .anySatisfy(metric -> assertThat(metric)
+ .hasName("dummyLongCounter")
+ .hasLongSumSatisfying(sum -> sum
+ .hasPointsSatisfying(point -> point
+ .hasAttributes(attributes)
+ .hasValue(3))));
+ }
+
+ @Test
+ public void testServiceIsDisabledByDefault() throws Exception {
+ @Cleanup
+ var metricReader = InMemoryMetricReader.create();
+
+ @Cleanup
+ var ots = OpenTelemetryService.builder().
+ sdkBuilderConsumer(getSdkBuilderConsumer(metricReader, Map.of())).
+ clusterName("openTelemetryServiceTestCluster").
+ build();
+ var meter = ots.getOpenTelemetry().getMeter("openTelemetryServiceTestInstrument");
+
+ var builders = List.of(
+ meter.counterBuilder("dummyCounterA"),
+ meter.counterBuilder("dummyCounterB").setDescription("desc"),
+ meter.counterBuilder("dummyCounterC").setDescription("desc").setUnit("unit"),
+ meter.counterBuilder("dummyCounterD").setUnit("unit")
+ );
+
+ var callback = new AtomicBoolean();
+ // Validate that no matter how the counters are being built, they are all backed by the same underlying object.
+ // This ensures we conserve memory when the SDK is disabled.
+ assertThat(builders.stream().map(LongCounterBuilder::build).distinct()).hasSize(1);
+ assertThat(builders.stream().map(LongCounterBuilder::buildObserver).distinct()).hasSize(1);
+ assertThat(builders.stream().map(b -> b.buildWithCallback(__ -> callback.set(true))).distinct()).hasSize(1);
+
+ // Validate that no metrics are being emitted at all.
+ assertThat(metricReader.collectAllMetrics()).isEmpty();
+
+ // Validate that the callback has not being called.
+ assertThat(callback).isFalse();
+ }
+}
diff --git a/pulsar-proxy/pom.xml b/pulsar-proxy/pom.xml
index 8fb1313f9ce32..55dfd11e40e93 100644
--- a/pulsar-proxy/pom.xml
+++ b/pulsar-proxy/pom.xml
@@ -49,6 +49,12 @@
${project.version}
+
+ ${project.groupId}
+ pulsar-opentelemetry
+ ${project.version}
+
+
${project.groupId}
pulsar-docs-tools
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index 719c7c2cbdade..61b00871cecdb 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -73,6 +73,7 @@
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.proxy.extensions.ProxyExtensions;
+import org.apache.pulsar.proxy.stats.PulsarProxyOpenTelemetry;
import org.apache.pulsar.proxy.stats.TopicStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -148,6 +149,8 @@ public class ProxyService implements Closeable {
private PrometheusMetricsServlet metricsServlet;
private List pendingMetricsProviders;
+ @Getter
+ private PulsarProxyOpenTelemetry openTelemetry;
@Getter
private final ConnectionController connectionController;
@@ -284,6 +287,7 @@ public void start() throws Exception {
}
createMetricsServlet();
+ openTelemetry = new PulsarProxyOpenTelemetry(proxyConfig);
// Initialize the message protocol handlers.
// start the protocol handlers only after the broker is ready,
@@ -399,6 +403,9 @@ public void close() throws IOException {
proxyAdditionalServlets = null;
}
+ if (openTelemetry != null) {
+ openTelemetry.close();
+ }
resetMetricsServlet();
if (localMetadataStore != null) {
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/PulsarProxyOpenTelemetry.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/PulsarProxyOpenTelemetry.java
new file mode 100644
index 0000000000000..14bbc649466bb
--- /dev/null
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/PulsarProxyOpenTelemetry.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.stats;
+
+import io.opentelemetry.api.metrics.Meter;
+import java.io.Closeable;
+import lombok.Getter;
+import org.apache.pulsar.PulsarVersion;
+import org.apache.pulsar.opentelemetry.OpenTelemetryService;
+import org.apache.pulsar.proxy.server.ProxyConfiguration;
+
+public class PulsarProxyOpenTelemetry implements Closeable {
+
+ public static final String SERVICE_NAME = "pulsar-proxy";
+ private final OpenTelemetryService openTelemetryService;
+
+ @Getter
+ private final Meter meter;
+
+ public PulsarProxyOpenTelemetry(ProxyConfiguration config) {
+ openTelemetryService = OpenTelemetryService.builder()
+ .clusterName(config.getClusterName())
+ .serviceName(SERVICE_NAME)
+ .serviceVersion(PulsarVersion.getVersion())
+ .build();
+ meter = openTelemetryService.getOpenTelemetry().getMeter("org.apache.pulsar.proxy");
+ }
+
+ @Override
+ public void close() {
+ openTelemetryService.close();
+ }
+}
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java
index fde7c938d0a62..f9ace716ecd06 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java
@@ -140,6 +140,7 @@ protected void setup() throws Exception {
proxyConfig.setBrokerProxyAllowedTargetPorts("*");
proxyConfig.setMetadataStoreUrl(DUMMY_VALUE);
proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
+ proxyConfig.setClusterName(configClusterName);
proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig))));
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AdminProxyHandlerKeystoreTLSTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AdminProxyHandlerKeystoreTLSTest.java
index bc2029861f415..92c644b470dcd 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AdminProxyHandlerKeystoreTLSTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AdminProxyHandlerKeystoreTLSTest.java
@@ -101,6 +101,7 @@ protected void setup() throws Exception {
proxyConfig.setBrokerClientAuthenticationPlugin(AuthenticationKeyStoreTls.class.getName());
proxyConfig.setBrokerClientAuthenticationParameters(String.format("keyStoreType:%s,keyStorePath:%s,keyStorePassword:%s",
KEYSTORE_TYPE, BROKER_KEYSTORE_FILE_PATH, BROKER_KEYSTORE_PW));
+ proxyConfig.setClusterName(configClusterName);
resource = new PulsarResources(registerCloseable(new ZKMetadataStore(mockZooKeeper)),
registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal)));
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java
index d83de9652cfde..ef58648e35a25 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java
@@ -85,6 +85,7 @@ protected void setup() throws Exception {
proxyConfig.setWebServicePortTls(Optional.of(0));
proxyConfig.setTlsEnabledWithBroker(true);
proxyConfig.setHttpMaxRequestHeaderSize(20000);
+ proxyConfig.setClusterName(configClusterName);
// enable tls and auth&auth at proxy
proxyConfig.setTlsCertificateFilePath(PROXY_CERT_FILE_PATH);
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java
index 9f8efa1ec7935..f61a73bbf9177 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java
@@ -78,6 +78,7 @@ protected void setup() throws Exception {
proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
// enable full parsing feature
proxyConfig.setProxyLogLevel(Optional.of(2));
+ proxyConfig.setClusterName(configClusterName);
// this is for nar package test
// addServletNar();
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
index 1c93cb20c70df..4083c984d9874 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
@@ -137,6 +137,7 @@ protected void setup() throws Exception {
proxyConfig.setMetadataStoreUrl(DUMMY_VALUE);
proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
+ proxyConfig.setClusterName(configClusterName);
proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig))));
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
index fec0673ff9b56..662b8305c0e26 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
@@ -58,6 +58,7 @@
public class ProxyAuthenticationTest extends ProducerConsumerBase {
private static final Logger log = LoggerFactory.getLogger(ProxyAuthenticationTest.class);
+ private static final String CLUSTER_NAME = "test";
public static class BasicAuthenticationData implements AuthenticationDataProvider {
private final String authParam;
@@ -178,7 +179,7 @@ protected void setup() throws Exception {
providers.add(BasicAuthenticationProvider.class.getName());
conf.setAuthenticationProviders(providers);
- conf.setClusterName("test");
+ conf.setClusterName(CLUSTER_NAME);
Set proxyRoles = new HashSet<>();
proxyRoles.add("proxy");
conf.setProxyRoles(proxyRoles);
@@ -222,6 +223,7 @@ void testAuthentication() throws Exception {
proxyConfig.setBrokerProxyAllowedTargetPorts("*");
proxyConfig.setWebServicePort(Optional.of(0));
proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
+ proxyConfig.setClusterName(CLUSTER_NAME);
proxyConfig.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName());
proxyConfig.setBrokerClientAuthenticationParameters(proxyAuthParams);
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
index a070d1e84d339..78ab9bd0d9581 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
@@ -59,6 +59,7 @@ protected void setup() throws Exception {
proxyConfig.setMaxConcurrentLookupRequests(NUM_CONCURRENT_LOOKUP);
proxyConfig.setMaxConcurrentInboundConnections(NUM_CONCURRENT_INBOUND_CONNECTION);
proxyConfig.setMaxConcurrentInboundConnectionsPerIp(NUM_CONCURRENT_INBOUND_CONNECTION);
+ proxyConfig.setClusterName(configClusterName);
proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig))));
doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java
index 5704ba55fed86..413774daf2cd1 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java
@@ -60,6 +60,7 @@ protected void setup() throws Exception {
proxyConfig.setMetadataStoreUrl(DUMMY_VALUE);
proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
proxyConfig.setHaProxyProtocolEnabled(true);
+ proxyConfig.setClusterName(configClusterName);
proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig))));
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
index 477fe597f2661..5e969ca26e4fd 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
@@ -46,6 +46,7 @@
public class ProxyForwardAuthDataTest extends ProducerConsumerBase {
private static final Logger log = LoggerFactory.getLogger(ProxyForwardAuthDataTest.class);
+ private static final String CLUSTER_NAME = "test";
@BeforeMethod
@Override
@@ -64,7 +65,7 @@ protected void setup() throws Exception {
providers.add(BasicAuthenticationProvider.class.getName());
conf.setAuthenticationProviders(providers);
- conf.setClusterName("test");
+ conf.setClusterName(CLUSTER_NAME);
Set proxyRoles = new HashSet();
proxyRoles.add("proxy");
conf.setProxyRoles(proxyRoles);
@@ -109,6 +110,7 @@ public void testForwardAuthData() throws Exception {
proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
proxyConfig.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName());
proxyConfig.setBrokerClientAuthenticationParameters(proxyAuthParams);
+ proxyConfig.setClusterName(CLUSTER_NAME);
Set providers = new HashSet<>();
providers.add(BasicAuthenticationProvider.class.getName());
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTransportTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTransportTest.java
index 5ee03395b80c8..5671c527f68f9 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTransportTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTransportTest.java
@@ -77,6 +77,7 @@ protected void setup() throws Exception {
proxyConfig.setMetadataStoreUrl(DUMMY_VALUE);
proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
+ proxyConfig.setClusterName(configClusterName);
proxyConfig.setTlsRequireTrustedClientCertOnConnect(false);
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsWithAuthTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsWithAuthTest.java
index 1f21281a6f6ab..99fb8c03a819f 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsWithAuthTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsWithAuthTest.java
@@ -77,6 +77,7 @@ protected void setup() throws Exception {
proxyConfig.setMetadataStoreUrl(DUMMY_VALUE);
proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
+ proxyConfig.setClusterName(configClusterName);
// config for authentication and authorization.
proxyConfig.setTlsRequireTrustedClientCertOnConnect(true);
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsWithoutAuthTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsWithoutAuthTest.java
index d7935755ce040..1dcebda7935d7 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsWithoutAuthTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsWithoutAuthTest.java
@@ -74,6 +74,7 @@ protected void setup() throws Exception {
proxyConfig.setMetadataStoreUrl(DUMMY_VALUE);
proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
+ proxyConfig.setClusterName(configClusterName);
proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig))));
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
index 167c3b196465a..a9017404d0e9f 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
@@ -65,6 +65,7 @@ protected void setup() throws Exception {
proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
proxyConfig.setMaxConcurrentLookupRequests(NUM_CONCURRENT_LOOKUP);
proxyConfig.setMaxConcurrentInboundConnections(NUM_CONCURRENT_INBOUND_CONNECTION);
+ proxyConfig.setClusterName(configClusterName);
AuthenticationService authenticationService = new AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig));
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyMutualTlsTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyMutualTlsTest.java
index 08066f2e5bf53..fae44c00ada42 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyMutualTlsTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyMutualTlsTest.java
@@ -66,6 +66,7 @@ protected void setup() throws Exception {
proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
proxyConfig.setTlsRequireTrustedClientCertOnConnect(true);
proxyConfig.setTlsAllowInsecureConnection(false);
+ proxyConfig.setClusterName(configClusterName);
proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig))));
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java
index 0d93185f5e899..3f58250e6d68a 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java
@@ -71,6 +71,7 @@ protected void setup() throws Exception {
proxyConfig.setBrokerProxyAllowedTargetPorts("*");
proxyConfig.setMetadataStoreUrl(DUMMY_VALUE);
proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
+ proxyConfig.setClusterName(configClusterName);
//enable full parsing feature
proxyConfig.setProxyLogLevel(Optional.ofNullable(2));
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java
index 6beed27cb6622..d06cf4201ff6f 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java
@@ -52,6 +52,7 @@
@Slf4j
public class ProxyRefreshAuthTest extends ProducerConsumerBase {
+ private static final String CLUSTER_NAME = "proxy-authorization";
private final SecretKey SECRET_KEY = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
private ProxyService proxyService;
@@ -84,7 +85,7 @@ protected void doInitConf() throws Exception {
properties.setProperty("tokenAllowedClockSkewSeconds", "2");
conf.setProperties(properties);
- conf.setClusterName("proxy-authorization");
+ conf.setClusterName(CLUSTER_NAME);
conf.setNumExecutorThreadPoolSize(5);
conf.setAuthenticationRefreshCheckSeconds(1);
@@ -116,6 +117,7 @@ protected void setup() throws Exception {
proxyConfig.setServicePort(Optional.of(0));
proxyConfig.setBrokerProxyAllowedTargetPorts("*");
proxyConfig.setWebServicePort(Optional.of(0));
+ proxyConfig.setClusterName(CLUSTER_NAME);
proxyConfig.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
proxyConfig.setBrokerClientAuthenticationParameters(
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
index 137ea82951519..a1ffc13ee9350 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
@@ -49,6 +49,7 @@
public class ProxyRolesEnforcementTest extends ProducerConsumerBase {
private static final Logger log = LoggerFactory.getLogger(ProxyRolesEnforcementTest.class);
+ private static final String CLUSTER_NAME = "test";
public static class BasicAuthenticationData implements AuthenticationDataProvider {
private final String authParam;
@@ -154,7 +155,7 @@ protected void setup() throws Exception {
providers.add(BasicAuthenticationProvider.class.getName());
conf.setAuthenticationProviders(providers);
- conf.setClusterName("test");
+ conf.setClusterName(CLUSTER_NAME);
Set proxyRoles = new HashSet<>();
proxyRoles.add("proxy");
conf.setProxyRoles(proxyRoles);
@@ -209,6 +210,7 @@ public void testIncorrectRoles() throws Exception {
proxyConfig.setBrokerProxyAllowedTargetPorts("*");
proxyConfig.setWebServicePort(Optional.of(0));
proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
+ proxyConfig.setClusterName(CLUSTER_NAME);
proxyConfig.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName());
proxyConfig.setBrokerClientAuthenticationParameters(proxyAuthParams);
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterDisableZeroCopyTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterDisableZeroCopyTest.java
index 0c9fa5c7ac322..3e598a57277a2 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterDisableZeroCopyTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterDisableZeroCopyTest.java
@@ -21,7 +21,7 @@
import java.util.Optional;
import org.testng.annotations.BeforeClass;
-public class ProxyServiceStarterDisableZeroCopyTest extends ProxyServiceStarterTest{
+public class ProxyServiceStarterDisableZeroCopyTest extends ProxyServiceStarterTest {
@Override
@BeforeClass
@@ -35,6 +35,7 @@ protected void setup() throws Exception {
serviceStarter.getConfig().setWebSocketServiceEnabled(true);
serviceStarter.getConfig().setBrokerProxyAllowedTargetPorts("*");
serviceStarter.getConfig().setProxyZeroCopyModeEnabled(false);
+ serviceStarter.getConfig().setClusterName(configClusterName);
serviceStarter.start();
serviceUrl = serviceStarter.getProxyService().getServiceUrl();
}
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
index 71b1087ee64b2..f263286125353 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
@@ -61,6 +61,7 @@ protected void setup() throws Exception {
serviceStarter.getConfig().setServicePort(Optional.of(0));
serviceStarter.getConfig().setWebSocketServiceEnabled(true);
serviceStarter.getConfig().setBrokerProxyAllowedTargetPorts("*");
+ serviceStarter.getConfig().setClusterName(configClusterName);
serviceStarter.start();
serviceUrl = serviceStarter.getProxyService().getServiceUrl();
}
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java
index b21162577a25e..61718bbac3ab0 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java
@@ -68,6 +68,7 @@ protected void setup() throws Exception {
serviceStarter.getConfig().setTlsCertificateFilePath(PROXY_CERT_FILE_PATH);
serviceStarter.getConfig().setTlsKeyFilePath(PROXY_KEY_FILE_PATH);
serviceStarter.getConfig().setBrokerProxyAllowedTargetPorts("*");
+ serviceStarter.getConfig().setClusterName(configClusterName);
serviceStarter.start();
serviceUrl = serviceStarter.getProxyService().getServiceUrlTls();
webPort = serviceStarter.getServer().getListenPortHTTP().get();
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java
index 155fbf616b0d5..2866c6c26907c 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java
@@ -72,6 +72,7 @@ protected void setup() throws Exception {
proxyConfig.setWebServicePort(Optional.of(0));
proxyConfig.setMetadataStoreUrl(DUMMY_VALUE);
proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
+ proxyConfig.setClusterName(configClusterName);
// enable full parsing feature
proxyConfig.setProxyLogLevel(Optional.of(2));
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStuckConnectionTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStuckConnectionTest.java
index 79ea7c5d6a31c..6e66008c15aef 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStuckConnectionTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStuckConnectionTest.java
@@ -79,6 +79,7 @@ protected void setup() throws Exception {
proxyConfig.setServicePort(Optional.ofNullable(0));
proxyConfig.setBrokerProxyAllowedTargetPorts("*");
proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
+ proxyConfig.setClusterName(configClusterName);
startProxyService();
// use the same port for subsequent restarts
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
index ac08052aaf153..9bc12dcc6fcb2 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
@@ -106,6 +106,7 @@ protected void initializeProxyConfig() {
proxyConfig.setBrokerProxyAllowedTargetPorts("*");
proxyConfig.setMetadataStoreUrl(DUMMY_VALUE);
proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
+ proxyConfig.setClusterName(configClusterName);
}
@Override
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java
index a1b27abece4d1..4e300d39741c3 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java
@@ -61,6 +61,7 @@ protected void setup() throws Exception {
proxyConfig.setTlsKeyFilePath(PROXY_KEY_FILE_PATH);
proxyConfig.setMetadataStoreUrl(DUMMY_VALUE);
proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
+ proxyConfig.setClusterName(configClusterName);
proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig))));
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsWithAuthTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsWithAuthTest.java
index ec5cace8a06df..16f610d6d0a3a 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsWithAuthTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsWithAuthTest.java
@@ -73,6 +73,7 @@ protected void setup() throws Exception {
" \"issuerUrl\":\"" + server.getIssuer() + "\"," +
" \"audience\": \"an-audience\"," +
" \"privateKey\":\"file://" + tempFile.getAbsolutePath() + "\"}");
+ proxyConfig.setClusterName(configClusterName);
proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig))));
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java
index e0dcefe2714be..cf9ad5831ec0a 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java
@@ -57,6 +57,7 @@
public class ProxyWithAuthorizationNegTest extends ProducerConsumerBase {
private static final Logger log = LoggerFactory.getLogger(ProxyWithAuthorizationNegTest.class);
+ private static final String CLUSTER_NAME = "proxy-authorization-neg";
private final String TLS_PROXY_TRUST_CERT_FILE_PATH = "./src/test/resources/authentication/tls/ProxyWithAuthorizationTest/proxy-cacert.pem";
private final String TLS_PROXY_CERT_FILE_PATH = "./src/test/resources/authentication/tls/ProxyWithAuthorizationTest/proxy-cert.pem";
@@ -104,7 +105,7 @@ protected void setup() throws Exception {
providers.add(AuthenticationProviderTls.class.getName());
conf.setAuthenticationProviders(providers);
- conf.setClusterName("proxy-authorization-neg");
+ conf.setClusterName(CLUSTER_NAME);
conf.setNumExecutorThreadPoolSize(5);
super.init();
@@ -121,6 +122,7 @@ protected void setup() throws Exception {
proxyConfig.setWebServicePort(Optional.of(0));
proxyConfig.setWebServicePortTls(Optional.of(0));
proxyConfig.setTlsEnabledWithBroker(true);
+ proxyConfig.setClusterName(CLUSTER_NAME);
// enable tls and auth&auth at proxy
proxyConfig.setTlsCertificateFilePath(TLS_PROXY_CERT_FILE_PATH);
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java
index 4e4c3c550cfd6..bc96c7ea51041 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java
@@ -64,6 +64,7 @@
public class ProxyWithAuthorizationTest extends ProducerConsumerBase {
private static final Logger log = LoggerFactory.getLogger(ProxyWithAuthorizationTest.class);
+ private static final String CLUSTER_NAME = "proxy-authorization";
private final SecretKey SECRET_KEY = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
private final String CLIENT_TOKEN = AuthTokenUtils.createToken(SECRET_KEY, "Client", Optional.empty());
@@ -189,7 +190,7 @@ protected void doInitConf() throws Exception {
properties.setProperty("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(SECRET_KEY));
conf.setProperties(properties);
- conf.setClusterName("proxy-authorization");
+ conf.setClusterName(CLUSTER_NAME);
conf.setNumExecutorThreadPoolSize(5);
}
@@ -206,6 +207,7 @@ protected void setup() throws Exception {
proxyConfig.setBrokerServiceURLTLS(pulsar.getBrokerServiceUrlTls());
proxyConfig.setBrokerWebServiceURLTLS(pulsar.getWebServiceAddressTls());
proxyConfig.setAdvertisedAddress(null);
+ proxyConfig.setClusterName(CLUSTER_NAME);
proxyConfig.setBrokerProxyAllowedTargetPorts("*");
proxyConfig.setServicePortTls(Optional.of(0));
@@ -432,6 +434,7 @@ public void tlsCiphersAndProtocols(Set tlsCiphers, Set tlsProtoc
proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
proxyConfig.setBrokerServiceURLTLS(pulsar.getBrokerServiceUrlTls());
proxyConfig.setAdvertisedAddress(null);
+ proxyConfig.setClusterName(CLUSTER_NAME);
proxyConfig.setServicePort(Optional.of(0));
proxyConfig.setBrokerProxyAllowedTargetPorts("*");
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithExtensibleLoadManagerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithExtensibleLoadManagerTest.java
index f997532b2734c..d3c05fec721b0 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithExtensibleLoadManagerTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithExtensibleLoadManagerTest.java
@@ -110,6 +110,7 @@ private ProxyConfiguration initializeProxyConfig() {
proxyConfig.setBrokerProxyAllowedTargetPorts("*");
proxyConfig.setMetadataStoreUrl(DUMMY_VALUE);
proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
+ proxyConfig.setClusterName(configClusterName);
return proxyConfig;
}
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java
index 14be7dadc4147..5fb3e04682421 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java
@@ -67,6 +67,7 @@
public class ProxyWithJwtAuthorizationTest extends ProducerConsumerBase {
private static final Logger log = LoggerFactory.getLogger(ProxyWithJwtAuthorizationTest.class);
+ private static final String CLUSTER_NAME = "proxy-authorization";
private final String ADMIN_ROLE = "admin";
private final String PROXY_ROLE = "proxy";
@@ -104,7 +105,7 @@ protected void setup() throws Exception {
providers.add(AuthenticationProviderToken.class.getName());
conf.setAuthenticationProviders(providers);
- conf.setClusterName("proxy-authorization");
+ conf.setClusterName(CLUSTER_NAME);
conf.setNumExecutorThreadPoolSize(5);
super.init();
@@ -119,6 +120,7 @@ protected void setup() throws Exception {
proxyConfig.setServicePort(Optional.of(0));
proxyConfig.setBrokerProxyAllowedTargetPorts("*");
proxyConfig.setWebServicePort(Optional.of(0));
+ proxyConfig.setClusterName(CLUSTER_NAME);
// enable auth&auth and use JWT at proxy
proxyConfig.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java
index e09194bb21dfc..9d9490e74b5ad 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java
@@ -54,9 +54,11 @@
public class ProxyWithoutServiceDiscoveryTest extends ProducerConsumerBase {
private static final Logger log = LoggerFactory.getLogger(ProxyWithoutServiceDiscoveryTest.class);
+ private static final String CLUSTER_NAME = "without-service-discovery";
private ProxyService proxyService;
private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+
@BeforeMethod
@Override
protected void setup() throws Exception {
@@ -89,7 +91,7 @@ protected void setup() throws Exception {
providers.add(AuthenticationProviderTls.class.getName());
conf.setAuthenticationProviders(providers);
- conf.setClusterName("without-service-discovery");
+ conf.setClusterName(CLUSTER_NAME);
conf.setNumExecutorThreadPoolSize(5);
super.init();
@@ -106,6 +108,7 @@ protected void setup() throws Exception {
proxyConfig.setWebServicePort(Optional.of(0));
proxyConfig.setWebServicePortTls(Optional.of(0));
proxyConfig.setTlsEnabledWithBroker(true);
+ proxyConfig.setClusterName(CLUSTER_NAME);
// enable tls and auth&auth at proxy
proxyConfig.setTlsCertificateFilePath(PROXY_CERT_FILE_PATH);
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java
index a44e2a85efa61..57522186c8f16 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java
@@ -80,6 +80,7 @@ protected void setup() throws Exception {
proxyConfig.setWebServicePort(Optional.of(0));
proxyConfig.setWebServicePortTls(Optional.of(0));
proxyConfig.setTlsEnabledWithBroker(true);
+ proxyConfig.setClusterName(configClusterName);
// enable tls and auth&auth at proxy
proxyConfig.setTlsCertificateFilePath(BROKER_CERT_FILE_PATH);
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java
index d239815ae81e8..fe8b1f45385e4 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java
@@ -75,6 +75,7 @@ protected void setup() throws Exception {
proxyConfig.setStatusFilePath(STATUS_FILE_PATH);
proxyConfig.setMetadataStoreUrl(DUMMY_VALUE);
proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
+ proxyConfig.setClusterName(configClusterName);
webServer = new WebServer(proxyConfig, new AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig)));
diff --git a/tests/integration/pom.xml b/tests/integration/pom.xml
index df36c35a19113..5582931851bae 100644
--- a/tests/integration/pom.xml
+++ b/tests/integration/pom.xml
@@ -55,6 +55,13 @@
${project.version}
test
+
+ org.apache.pulsar
+ pulsar-broker-common
+ ${project.version}
+ test-jar
+ test
+
org.apache.pulsar
pulsar-common
@@ -73,6 +80,12 @@
${project.version}
test
+
+ org.apache.pulsar
+ pulsar-proxy
+ ${project.version}
+ test
+
org.apache.pulsar
managed-ledger
@@ -169,7 +182,6 @@
test
-
com.rabbitmq
amqp-client
@@ -189,6 +201,12 @@
test
+
+ io.rest-assured
+ rest-assured
+ test
+
+
org.testcontainers
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/OpenTelemetryCollectorContainer.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/OpenTelemetryCollectorContainer.java
new file mode 100644
index 0000000000000..2b115ca6b95bf
--- /dev/null
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/OpenTelemetryCollectorContainer.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.containers;
+
+import java.time.Duration;
+import org.apache.http.HttpStatus;
+import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
+import org.testcontainers.utility.MountableFile;
+
+public class OpenTelemetryCollectorContainer extends ChaosContainer {
+
+ private static final String IMAGE_NAME = "otel/opentelemetry-collector-contrib:latest";
+ private static final String NAME = "otel-collector";
+
+ public static final int PROMETHEUS_EXPORTER_PORT = 8889;
+ private static final int OTLP_RECEIVER_PORT = 4317;
+ private static final int ZPAGES_PORT = 55679;
+
+ public OpenTelemetryCollectorContainer(String clusterName) {
+ super(clusterName, IMAGE_NAME);
+ }
+
+ @Override
+ protected void configure() {
+ super.configure();
+
+ this.withCopyFileToContainer(
+ MountableFile.forClasspathResource("containers/otel-collector-config.yaml", 0644),
+ "/etc/otel-collector-config.yaml")
+ .withCommand("--config=/etc/otel-collector-config.yaml")
+ .withExposedPorts(OTLP_RECEIVER_PORT, PROMETHEUS_EXPORTER_PORT, ZPAGES_PORT)
+ .waitingFor(new HttpWaitStrategy()
+ .forPath("/debug/servicez")
+ .forPort(ZPAGES_PORT)
+ .forStatusCode(HttpStatus.SC_OK)
+ .withStartupTimeout(Duration.ofSeconds(300)));
+ }
+
+ @Override
+ public String getContainerName() {
+ return clusterName + "-" + NAME;
+ }
+
+ public String getOtlpEndpoint() {
+ return String.format("http://%s:%d", NAME, OTLP_RECEIVER_PORT);
+ }
+}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java
index 56d64ce5b2c8e..77cdc1bfd28a9 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java
@@ -26,6 +26,7 @@
import java.time.Duration;
import java.util.Objects;
import java.util.UUID;
+import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
@@ -70,6 +71,7 @@ public abstract class PulsarContainer> exte
public static final boolean PULSAR_CONTAINERS_LEAVE_RUNNING =
Boolean.parseBoolean(System.getenv("PULSAR_CONTAINERS_LEAVE_RUNNING"));
+ @Getter
protected final String hostname;
private final String serviceName;
private final String serviceEntryPoint;
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/metrics/OpenTelemetrySanityTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/metrics/OpenTelemetrySanityTest.java
new file mode 100644
index 0000000000000..38afc1f127d18
--- /dev/null
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/metrics/OpenTelemetrySanityTest.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.metrics;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.PulsarVersion;
+import org.apache.pulsar.broker.stats.PulsarBrokerOpenTelemetry;
+import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient;
+import org.apache.pulsar.functions.worker.PulsarWorkerOpenTelemetry;
+import org.apache.pulsar.proxy.stats.PulsarProxyOpenTelemetry;
+import org.apache.pulsar.tests.integration.containers.ChaosContainer;
+import org.apache.pulsar.tests.integration.containers.OpenTelemetryCollectorContainer;
+import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType;
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
+import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
+import org.apache.pulsar.tests.integration.topologies.PulsarTestBase;
+import org.awaitility.Awaitility;
+import org.testng.annotations.Test;
+
+public class OpenTelemetrySanityTest {
+
+ // Validate that the OpenTelemetry metrics can be exported to a remote OpenTelemetry collector.
+ @Test(timeOut = 360_000)
+ public void testOpenTelemetryMetricsOtlpExport() throws Exception {
+ var clusterName = "testOpenTelemetryMetrics-" + UUID.randomUUID();
+ var openTelemetryCollectorContainer = new OpenTelemetryCollectorContainer(clusterName);
+
+ var exporter = "otlp";
+ var otlpEndpointProp =
+ Pair.of("OTEL_EXPORTER_OTLP_ENDPOINT", openTelemetryCollectorContainer.getOtlpEndpoint());
+
+ var brokerCollectorProps = getOpenTelemetryProps(exporter, otlpEndpointProp);
+ var proxyCollectorProps = getOpenTelemetryProps(exporter, otlpEndpointProp);
+ var functionWorkerCollectorProps = getOpenTelemetryProps(exporter, otlpEndpointProp);
+
+ var spec = PulsarClusterSpec.builder()
+ .clusterName(clusterName)
+ .brokerEnvs(brokerCollectorProps)
+ .proxyEnvs(proxyCollectorProps)
+ .externalService("otel-collector", openTelemetryCollectorContainer)
+ .functionWorkerEnvs(functionWorkerCollectorProps)
+ .build();
+ @Cleanup("stop")
+ var pulsarCluster = PulsarCluster.forSpec(spec);
+ pulsarCluster.start();
+ pulsarCluster.setupFunctionWorkers(PulsarTestBase.randomName(), FunctionRuntimeType.PROCESS, 1);
+
+ // TODO: Validate cluster name and service version are present once
+ // https://github.com/open-telemetry/opentelemetry-java/issues/6108 is solved.
+ var metricName = "queueSize_ratio"; // Sent automatically by the OpenTelemetry SDK.
+ Awaitility.waitAtMost(90, TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(() -> {
+ var metrics = getMetricsFromPrometheus(
+ openTelemetryCollectorContainer, OpenTelemetryCollectorContainer.PROMETHEUS_EXPORTER_PORT);
+ return !metrics.findByNameAndLabels(metricName, "job", PulsarBrokerOpenTelemetry.SERVICE_NAME).isEmpty();
+ });
+ Awaitility.waitAtMost(90, TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(() -> {
+ var metrics = getMetricsFromPrometheus(
+ openTelemetryCollectorContainer, OpenTelemetryCollectorContainer.PROMETHEUS_EXPORTER_PORT);
+ return !metrics.findByNameAndLabels(metricName, "job", PulsarProxyOpenTelemetry.SERVICE_NAME).isEmpty();
+ });
+ Awaitility.waitAtMost(90, TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(() -> {
+ var metrics = getMetricsFromPrometheus(
+ openTelemetryCollectorContainer, OpenTelemetryCollectorContainer.PROMETHEUS_EXPORTER_PORT);
+ return !metrics.findByNameAndLabels(metricName, "job", PulsarWorkerOpenTelemetry.SERVICE_NAME).isEmpty();
+ });
+ }
+
+ /*
+ * Validate that the OpenTelemetry metrics can be exported to a local Prometheus endpoint running in the same
+ * process space as the broker/proxy/function-worker.
+ * https://github.com/open-telemetry/opentelemetry-java/blob/main/sdk-extensions/autoconfigure/README.md#prometheus-exporter
+ */
+ @Test(timeOut = 360_000)
+ public void testOpenTelemetryMetricsPrometheusExport() throws Exception {
+ var prometheusExporterPort = 9464;
+ var clusterName = "testOpenTelemetryMetrics-" + UUID.randomUUID();
+
+ var exporter = "prometheus";
+ var prometheusExporterPortProp =
+ Pair.of("OTEL_EXPORTER_PROMETHEUS_PORT", Integer.toString(prometheusExporterPort));
+
+ var brokerCollectorProps = getOpenTelemetryProps(exporter, prometheusExporterPortProp);
+ var proxyCollectorProps = getOpenTelemetryProps(exporter, prometheusExporterPortProp);
+ var functionWorkerCollectorProps = getOpenTelemetryProps(exporter, prometheusExporterPortProp);
+
+ var spec = PulsarClusterSpec.builder()
+ .clusterName(clusterName)
+ .brokerEnvs(brokerCollectorProps)
+ .brokerAdditionalPorts(List.of(prometheusExporterPort))
+ .proxyEnvs(proxyCollectorProps)
+ .proxyAdditionalPorts(List.of(prometheusExporterPort))
+ .functionWorkerEnvs(functionWorkerCollectorProps)
+ .functionWorkerAdditionalPorts(List.of(prometheusExporterPort))
+ .build();
+ @Cleanup("stop")
+ var pulsarCluster = PulsarCluster.forSpec(spec);
+ pulsarCluster.start();
+ pulsarCluster.setupFunctionWorkers(PulsarTestBase.randomName(), FunctionRuntimeType.PROCESS, 1);
+
+ var metricName = "target_info"; // Sent automatically by the OpenTelemetry SDK.
+ Awaitility.waitAtMost(90, TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(() -> {
+ var metrics = getMetricsFromPrometheus(pulsarCluster.getBroker(0), prometheusExporterPort);
+ return !metrics.findByNameAndLabels(metricName,
+ Pair.of("pulsar_cluster", clusterName),
+ Pair.of("service_name", PulsarBrokerOpenTelemetry.SERVICE_NAME),
+ Pair.of("service_version", PulsarVersion.getVersion()),
+ Pair.of("host_name", pulsarCluster.getBroker(0).getHostname())).isEmpty();
+ });
+ Awaitility.waitAtMost(90, TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(() -> {
+ var metrics = getMetricsFromPrometheus(pulsarCluster.getProxy(), prometheusExporterPort);
+ return !metrics.findByNameAndLabels(metricName,
+ Pair.of("pulsar_cluster", clusterName),
+ Pair.of("service_name", PulsarProxyOpenTelemetry.SERVICE_NAME),
+ Pair.of("service_version", PulsarVersion.getVersion()),
+ Pair.of("host_name", pulsarCluster.getProxy().getHostname())).isEmpty();
+ });
+ Awaitility.waitAtMost(90, TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(() -> {
+ var metrics = getMetricsFromPrometheus(pulsarCluster.getAnyWorker(), prometheusExporterPort);
+ return !metrics.findByNameAndLabels(metricName,
+ Pair.of("pulsar_cluster", clusterName),
+ Pair.of("service_name", PulsarWorkerOpenTelemetry.SERVICE_NAME),
+ Pair.of("service_version", PulsarVersion.getVersion()),
+ Pair.of("host_name", pulsarCluster.getAnyWorker().getHostname())).isEmpty();
+ });
+ }
+
+ private static PrometheusMetricsClient.Metrics getMetricsFromPrometheus(ChaosContainer> container, int port) {
+ var client = new PrometheusMetricsClient(container.getHost(), container.getMappedPort(port));
+ return client.getMetrics();
+ }
+
+ private static Map getOpenTelemetryProps(String exporter, Pair ... extraProps) {
+ var defaultProps = Map.of(
+ "OTEL_SDK_DISABLED", "false",
+ "OTEL_METRIC_EXPORT_INTERVAL", "1000",
+ "OTEL_METRICS_EXPORTER", exporter
+ );
+ var props = new HashMap<>(defaultProps);
+ Arrays.stream(extraProps).forEach(p -> props.put(p.getKey(), p.getValue()));
+ return props;
+ }
+}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
index bc9b1e267b9b2..5f893f67f74bb 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
@@ -102,6 +102,8 @@ public static PulsarCluster forSpec(PulsarClusterSpec spec, CSContainer csContai
private final ProxyContainer proxyContainer;
private Map> externalServices = Collections.emptyMap();
private Map> externalServiceEnvs;
+ private final Map functionWorkerEnvs;
+ private final List functionWorkerAdditionalPorts;
private final String metadataStoreUrl;
private final String configurationMetadataStoreUrl;
@@ -182,6 +184,9 @@ private PulsarCluster(PulsarClusterSpec spec, CSContainer csContainer, boolean s
if (spec.proxyMountFiles != null) {
spec.proxyMountFiles.forEach(this.proxyContainer::withFileSystemBind);
}
+ if (spec.proxyAdditionalPorts != null) {
+ spec.proxyAdditionalPorts.forEach(this.proxyContainer::addExposedPort);
+ }
// create bookies
bookieContainers.putAll(
@@ -268,6 +273,8 @@ private PulsarCluster(PulsarClusterSpec spec, CSContainer csContainer, boolean s
workerContainers.values().forEach(c -> c.withClasspathResourceMapping(key, value, BindMode.READ_WRITE));
});
+ functionWorkerEnvs = spec.functionWorkerEnvs;
+ functionWorkerAdditionalPorts = spec.functionWorkerAdditionalPorts;
}
public String getPlainTextServiceUrl() {
@@ -475,23 +482,25 @@ private void startFunctionWorkersWithProcessContainerFactory(String suffix, int
String serviceUrl = "pulsar://pulsar-broker-0:" + PulsarContainer.BROKER_PORT;
String httpServiceUrl = "http://pulsar-broker-0:" + PulsarContainer.BROKER_HTTP_PORT;
workerContainers.putAll(runNumContainers(
- "functions-worker-process-" + suffix,
- numFunctionWorkers,
- (name) -> new WorkerContainer(clusterName, name)
- .withNetwork(network)
- .withNetworkAliases(name)
- // worker settings
- .withEnv("PF_workerId", name)
- .withEnv("PF_workerHostname", name)
- .withEnv("PF_workerPort", "" + PulsarContainer.BROKER_HTTP_PORT)
- .withEnv("PF_pulsarFunctionsCluster", clusterName)
- .withEnv("PF_pulsarServiceUrl", serviceUrl)
- .withEnv("PF_pulsarWebServiceUrl", httpServiceUrl)
- // script
- .withEnv("clusterName", clusterName)
- .withEnv("zookeeperServers", ZKContainer.NAME)
- // bookkeeper tools
- .withEnv("zkServers", ZKContainer.NAME)
+ "functions-worker-process-" + suffix,
+ numFunctionWorkers,
+ (name) -> new WorkerContainer(clusterName, name)
+ .withNetwork(network)
+ .withNetworkAliases(name)
+ // worker settings
+ .withEnv("PF_workerId", name)
+ .withEnv("PF_workerHostname", name)
+ .withEnv("PF_workerPort", "" + PulsarContainer.BROKER_HTTP_PORT)
+ .withEnv("PF_pulsarFunctionsCluster", clusterName)
+ .withEnv("PF_pulsarServiceUrl", serviceUrl)
+ .withEnv("PF_pulsarWebServiceUrl", httpServiceUrl)
+ // script
+ .withEnv("clusterName", clusterName)
+ .withEnv("zookeeperServers", ZKContainer.NAME)
+ // bookkeeper tools
+ .withEnv("zkServers", ZKContainer.NAME)
+ .withEnv(functionWorkerEnvs)
+ .withExposedPorts(functionWorkerAdditionalPorts.toArray(new Integer[0]))
));
this.startWorkers();
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java
index b705b347cffab..8a991be49fad0 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java
@@ -147,6 +147,12 @@ public class PulsarClusterSpec {
*/
Map bookkeeperEnvs;
+ /**
+ * Specify envs for function workers.
+ */
+ @Singular
+ Map functionWorkerEnvs;
+
/**
* Specify mount files.
*/
@@ -170,6 +176,17 @@ public class PulsarClusterSpec {
*/
List bookieAdditionalPorts;
+ /**
+ * Additional ports to expose on proxy containers.
+ */
+ List proxyAdditionalPorts;
+
+ /**
+ * Additional ports to expose on function workers.
+ */
+ @Singular
+ List functionWorkerAdditionalPorts;
+
/**
* Enable TLS for connection.
*/
diff --git a/tests/integration/src/test/resources/containers/otel-collector-config.yaml b/tests/integration/src/test/resources/containers/otel-collector-config.yaml
new file mode 100644
index 0000000000000..bd332f0428307
--- /dev/null
+++ b/tests/integration/src/test/resources/containers/otel-collector-config.yaml
@@ -0,0 +1,43 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+receivers:
+ otlp:
+ protocols:
+ grpc:
+
+exporters:
+ prometheus:
+ endpoint: "0.0.0.0:8889"
+
+processors:
+ batch:
+
+extensions:
+ health_check:
+ zpages:
+ endpoint: :55679
+
+service:
+ extensions: [zpages, health_check]
+ pipelines:
+ metrics:
+ receivers: [otlp]
+ processors: [batch]
+ exporters: [prometheus]
\ No newline at end of file
diff --git a/tests/integration/src/test/resources/pulsar-metrics.xml b/tests/integration/src/test/resources/pulsar-metrics.xml
new file mode 100644
index 0000000000000..1c87f2bdf0d06
--- /dev/null
+++ b/tests/integration/src/test/resources/pulsar-metrics.xml
@@ -0,0 +1,28 @@
+
+
+
+
+
+
+
+
+
diff --git a/tests/integration/src/test/resources/pulsar.xml b/tests/integration/src/test/resources/pulsar.xml
index bdc5f27cc78fb..aa9a59a6cda64 100644
--- a/tests/integration/src/test/resources/pulsar.xml
+++ b/tests/integration/src/test/resources/pulsar.xml
@@ -37,5 +37,6 @@
+