diff --git a/pulsar-rpc-contrib/src/main/resources/pulsar-container.properties b/pulsar-rpc-contrib/src/main/resources/pulsar-container.properties index 316b729..fb8f38a 100644 --- a/pulsar-rpc-contrib/src/main/resources/pulsar-container.properties +++ b/pulsar-rpc-contrib/src/main/resources/pulsar-container.properties @@ -1 +1,15 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + pulsar.version=${pulsar.version} \ No newline at end of file diff --git a/pulsar-rpc-contrib/src/test/java/org/apache/pulsar/rpc/contrib/PulsarRpcClientTest.java b/pulsar-rpc-contrib/src/test/java/org/apache/pulsar/rpc/contrib/PulsarRpcClientTest.java index 6f6a3de..f8de3ef 100644 --- a/pulsar-rpc-contrib/src/test/java/org/apache/pulsar/rpc/contrib/PulsarRpcClientTest.java +++ b/pulsar-rpc-contrib/src/test/java/org/apache/pulsar/rpc/contrib/PulsarRpcClientTest.java @@ -25,6 +25,7 @@ import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.TypedMessageBuilder; +import org.apache.pulsar.rpc.contrib.base.PulsarRpcBase; import org.apache.pulsar.rpc.contrib.client.PulsarRpcClient; import org.apache.pulsar.rpc.contrib.client.RequestCallBack; import org.awaitility.Awaitility; diff --git a/pulsar-rpc-contrib/src/test/java/org/apache/pulsar/rpc/contrib/PulsarRpcServerTest.java b/pulsar-rpc-contrib/src/test/java/org/apache/pulsar/rpc/contrib/PulsarRpcServerTest.java index 982a6a8..e970f57 100644 --- a/pulsar-rpc-contrib/src/test/java/org/apache/pulsar/rpc/contrib/PulsarRpcServerTest.java +++ b/pulsar-rpc-contrib/src/test/java/org/apache/pulsar/rpc/contrib/PulsarRpcServerTest.java @@ -19,6 +19,7 @@ import java.util.function.BiConsumer; import java.util.function.Function; import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.rpc.contrib.base.PulsarRpcBase; import org.apache.pulsar.rpc.contrib.server.PulsarRpcServer; import org.apache.pulsar.rpc.contrib.server.PulsarRpcServerBuilder; import org.testng.annotations.AfterMethod; diff --git a/pulsar-rpc-contrib/src/test/java/org/apache/pulsar/rpc/contrib/SimpleRpcCallTest.java b/pulsar-rpc-contrib/src/test/java/org/apache/pulsar/rpc/contrib/SimpleRpcCallTest.java index 6ff0454..8acffe1 100644 --- a/pulsar-rpc-contrib/src/test/java/org/apache/pulsar/rpc/contrib/SimpleRpcCallTest.java +++ b/pulsar-rpc-contrib/src/test/java/org/apache/pulsar/rpc/contrib/SimpleRpcCallTest.java @@ -30,6 +30,7 @@ import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.TypedMessageBuilder; +import org.apache.pulsar.rpc.contrib.base.PulsarRpcBase; import org.apache.pulsar.rpc.contrib.client.PulsarRpcClient; import org.apache.pulsar.rpc.contrib.client.PulsarRpcClientBuilder; import org.apache.pulsar.rpc.contrib.client.RequestCallBack; @@ -128,7 +129,8 @@ public void testRpcCallWithCallBack() throws Exception { requestProducerConfigMap.put("producerName", "requestProducer"); requestProducerConfigMap.put("messageRoutingMode", MessageRoutingMode.RoundRobinPartition); - AtomicInteger counter = new AtomicInteger(); + Map resultMap = new ConcurrentHashMap<>(); + final int ackNums = 2; RequestCallBack callBack = new RequestCallBack<>() { @Override @@ -150,7 +152,9 @@ public void onReplySuccess(String correlationId, String subscription, TestReply value, CompletableFuture replyFuture) { log.info(" CorrelationId[{}] Subscription[{}] Receive reply message success. Value: {}", correlationId, subscription, value); - counter.incrementAndGet(); + if (resultMap.get(correlationId).getAndIncrement() == ackNums - 1) { + rpcClient.removeRequest(correlationId); + } replyFuture.complete(value); } @@ -208,9 +212,14 @@ public void onReplyMessageAckFailed(String correlationId, Consumer co String correlationId = correlationIdSupplier.get(); TestRequest message = new TestRequest(asynchronousMessage + i); requestMessageConfigMap.put(TypedMessageBuilder.CONF_EVENT_TIME, System.currentTimeMillis()); + resultMap.put(correlationId, new AtomicInteger()); rpcClient.requestAsync(correlationId, message, requestMessageConfigMap); } - Awaitility.await().atMost(20, TimeUnit.SECONDS).until(() -> counter.get() == messageNum * 3); + Awaitility.await().atMost(20, TimeUnit.SECONDS).until(() -> { + AtomicInteger success = new AtomicInteger(); + resultMap.forEach((__, count) -> success.getAndAdd(count.get())); + return success.get() == messageNum * ackNums; + }); rpcServer1.close(); rpcServer2.close(); rpcServer3.close(); diff --git a/pulsar-rpc-contrib/src/test/java/org/apache/pulsar/rpc/contrib/PulsarRpcBase.java b/pulsar-rpc-contrib/src/test/java/org/apache/pulsar/rpc/contrib/base/PulsarRpcBase.java similarity index 96% rename from pulsar-rpc-contrib/src/test/java/org/apache/pulsar/rpc/contrib/PulsarRpcBase.java rename to pulsar-rpc-contrib/src/test/java/org/apache/pulsar/rpc/contrib/base/PulsarRpcBase.java index e3d8758..b70fcca 100644 --- a/pulsar-rpc-contrib/src/test/java/org/apache/pulsar/rpc/contrib/PulsarRpcBase.java +++ b/pulsar-rpc-contrib/src/test/java/org/apache/pulsar/rpc/contrib/base/PulsarRpcBase.java @@ -11,7 +11,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.pulsar.rpc.contrib; +package org.apache.pulsar.rpc.contrib.base; import static java.util.UUID.randomUUID; import java.time.Duration; @@ -31,8 +31,8 @@ public abstract class PulsarRpcBase { // protected final String topicPrefix = "public/default/"; protected String requestTopic; protected String replyTopic; - Pattern requestTopicPattern; - Pattern replyTopicPattern; + protected Pattern requestTopicPattern; + protected Pattern replyTopicPattern; protected String requestSubBase; protected String replySubBase; protected Duration replyTimeout = Duration.ofSeconds(3); diff --git a/pulsar-rpc-contrib/src/test/java/org/apache/pulsar/rpc/contrib/SingletonPulsarContainer.java b/pulsar-rpc-contrib/src/test/java/org/apache/pulsar/rpc/contrib/base/SingletonPulsarContainer.java similarity index 76% rename from pulsar-rpc-contrib/src/test/java/org/apache/pulsar/rpc/contrib/SingletonPulsarContainer.java rename to pulsar-rpc-contrib/src/test/java/org/apache/pulsar/rpc/contrib/base/SingletonPulsarContainer.java index 6537118..be34654 100644 --- a/pulsar-rpc-contrib/src/test/java/org/apache/pulsar/rpc/contrib/SingletonPulsarContainer.java +++ b/pulsar-rpc-contrib/src/test/java/org/apache/pulsar/rpc/contrib/base/SingletonPulsarContainer.java @@ -1,4 +1,17 @@ -package org.apache.pulsar.rpc.contrib; +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pulsar.rpc.contrib.base; import java.io.IOException; import java.time.Duration;