diff --git a/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java b/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java index ef05fa6adbb..e45f99799d3 100644 --- a/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java +++ b/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java @@ -26,6 +26,7 @@ import apache.rocketmq.v2.NotifyClientTerminationRequest; import apache.rocketmq.v2.QueryAssignmentRequest; import apache.rocketmq.v2.QueryRouteRequest; +import apache.rocketmq.v2.RecallMessageRequest; import apache.rocketmq.v2.ReceiveMessageRequest; import apache.rocketmq.v2.Resource; import apache.rocketmq.v2.SendMessageRequest; @@ -128,6 +129,9 @@ public static PlainAccessResource parse(RemotingCommand request, String remoteAd final String topicV2 = request.getExtFields().get("b"); accessResource.addResourceAndPerm(topicV2, PlainAccessResource.isRetryTopic(topicV2) ? Permission.SUB : Permission.PUB); break; + case RequestCode.RECALL_MESSAGE: + accessResource.addResourceAndPerm(request.getExtFields().get("topic"), Permission.PUB); + break; case RequestCode.CONSUMER_SEND_MSG_BACK: accessResource.addResourceAndPerm(getRetryTopic(request.getExtFields().get("group")), Permission.SUB); break; @@ -232,6 +236,9 @@ public static PlainAccessResource parse(GeneratedMessageV3 messageV3, Authentica } } accessResource.addResourceAndPerm(topic, Permission.PUB); + } else if (RecallMessageRequest.getDescriptor().getFullName().equals(rpcFullName)) { + RecallMessageRequest request = (RecallMessageRequest) messageV3; + accessResource.addResourceAndPerm(request.getTopic(), Permission.PUB); } else if (ReceiveMessageRequest.getDescriptor().getFullName().equals(rpcFullName)) { ReceiveMessageRequest request = (ReceiveMessageRequest) messageV3; accessResource.addGroupResourceAndPerm(request.getGroup(), Permission.SUB); diff --git a/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessResourceTest.java b/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessResourceTest.java index 8ff3d610486..bccd37e39ef 100644 --- a/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessResourceTest.java +++ b/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessResourceTest.java @@ -19,10 +19,15 @@ import java.util.HashMap; import java.util.Map; +import apache.rocketmq.v2.RecallMessageRequest; +import apache.rocketmq.v2.Resource; +import com.google.protobuf.GeneratedMessageV3; +import org.apache.rocketmq.acl.common.AuthenticationHeader; import org.apache.rocketmq.acl.common.Permission; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RequestCode; +import org.apache.rocketmq.remoting.protocol.header.RecallMessageRequestHeader; import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader; import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeaderV2; import org.junit.Assert; @@ -33,6 +38,8 @@ public class PlainAccessResourceTest { public static final String DEFAULT_PRODUCER_GROUP = "PID_acl"; public static final String DEFAULT_CONSUMER_GROUP = "GID_acl"; public static final String DEFAULT_REMOTE_ADDR = "192.128.1.1"; + public static final String AUTH_HEADER = + "Signature Credential=1234567890/test, SignedHeaders=host, Signature=1234567890"; @Test public void testParseSendNormal() { @@ -93,4 +100,34 @@ public void testParseSendRetryV2() { Assert.assertEquals(permMap, accessResource.getResourcePermMap()); } + + @Test + public void testParseRecallMessage() { + // remoting + RecallMessageRequestHeader requestHeader = new RecallMessageRequestHeader(); + requestHeader.setTopic(DEFAULT_TOPIC); + requestHeader.setProducerGroup(DEFAULT_PRODUCER_GROUP); + requestHeader.setRecallHandle("handle"); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.RECALL_MESSAGE, requestHeader); + request.makeCustomHeaderToNet(); + + PlainAccessResource accessResource = PlainAccessResource.parse(request, DEFAULT_REMOTE_ADDR); + Assert.assertTrue(Permission.PUB == accessResource.getResourcePermMap().get(DEFAULT_TOPIC)); + + // grpc + GeneratedMessageV3 grpcRequest = RecallMessageRequest.newBuilder() + .setTopic(Resource.newBuilder().setName(DEFAULT_TOPIC).build()) + .setRecallHandle("handle") + .build(); + accessResource = PlainAccessResource.parse(grpcRequest, mockAuthenticationHeader()); + Assert.assertTrue(Permission.PUB == accessResource.getResourcePermMap().get(DEFAULT_TOPIC)); + } + + private AuthenticationHeader mockAuthenticationHeader() { + return AuthenticationHeader.builder() + .remoteAddress(DEFAULT_REMOTE_ADDR) + .authorization(AUTH_HEADER) + .datetime("datetime") + .build(); + } } diff --git a/auth/src/main/java/org/apache/rocketmq/auth/authorization/builder/DefaultAuthorizationContextBuilder.java b/auth/src/main/java/org/apache/rocketmq/auth/authorization/builder/DefaultAuthorizationContextBuilder.java index e69abdaf805..bf86892ea61 100644 --- a/auth/src/main/java/org/apache/rocketmq/auth/authorization/builder/DefaultAuthorizationContextBuilder.java +++ b/auth/src/main/java/org/apache/rocketmq/auth/authorization/builder/DefaultAuthorizationContextBuilder.java @@ -25,6 +25,7 @@ import apache.rocketmq.v2.NotifyClientTerminationRequest; import apache.rocketmq.v2.QueryAssignmentRequest; import apache.rocketmq.v2.QueryRouteRequest; +import apache.rocketmq.v2.RecallMessageRequest; import apache.rocketmq.v2.ReceiveMessageRequest; import apache.rocketmq.v2.SendMessageRequest; import apache.rocketmq.v2.Subscription; @@ -101,6 +102,10 @@ public List build(Metadata metadata, GeneratedMessa } result = newPubContext(metadata, request.getMessages(0).getTopic()); } + if (message instanceof RecallMessageRequest) { + RecallMessageRequest request = (RecallMessageRequest) message; + result = newPubContext(metadata, request.getTopic()); + } if (message instanceof EndTransactionRequest) { EndTransactionRequest request = (EndTransactionRequest) message; result = newPubContext(metadata, request.getTopic()); @@ -207,6 +212,10 @@ public List build(ChannelHandlerContext context, Re result.add(DefaultAuthorizationContext.of(subject, topic, Action.PUB, sourceIp)); } break; + case RequestCode.RECALL_MESSAGE: + topic = Resource.ofTopic(fields.get(TOPIC)); + result.add(DefaultAuthorizationContext.of(subject, topic, Action.PUB, sourceIp)); + break; case RequestCode.END_TRANSACTION: if (StringUtils.isNotBlank(fields.get(TOPIC))) { topic = Resource.ofTopic(fields.get(TOPIC)); diff --git a/auth/src/test/java/org/apache/rocketmq/auth/authorization/builder/DefaultAuthorizationContextBuilderTest.java b/auth/src/test/java/org/apache/rocketmq/auth/authorization/builder/DefaultAuthorizationContextBuilderTest.java index 4ee73f3d797..c73e07d7529 100644 --- a/auth/src/test/java/org/apache/rocketmq/auth/authorization/builder/DefaultAuthorizationContextBuilderTest.java +++ b/auth/src/test/java/org/apache/rocketmq/auth/authorization/builder/DefaultAuthorizationContextBuilderTest.java @@ -28,6 +28,7 @@ import apache.rocketmq.v2.Publishing; import apache.rocketmq.v2.QueryAssignmentRequest; import apache.rocketmq.v2.QueryRouteRequest; +import apache.rocketmq.v2.RecallMessageRequest; import apache.rocketmq.v2.ReceiveMessageRequest; import apache.rocketmq.v2.Resource; import apache.rocketmq.v2.SendMessageRequest; @@ -65,6 +66,7 @@ import org.apache.rocketmq.remoting.protocol.header.PullMessageRequestHeader; import org.apache.rocketmq.remoting.protocol.header.QueryConsumerOffsetRequestHeader; import org.apache.rocketmq.remoting.protocol.header.QueryMessageRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.RecallMessageRequestHeader; import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader; import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeaderV2; import org.apache.rocketmq.remoting.protocol.header.UnregisterClientRequestHeader; @@ -122,6 +124,19 @@ public void buildGrpc() { Assert.assertEquals(result.get(0).getChannelId(), "channel-id"); Assert.assertEquals(result.get(0).getRpcCode(), SendMessageRequest.getDescriptor().getFullName()); + request = RecallMessageRequest.newBuilder() + .setTopic(Resource.newBuilder().setName("topic").build()) + .setRecallHandle("handle") + .build(); + result = builder.build(metadata, request); + Assert.assertEquals(1, result.size()); + Assert.assertEquals(result.get(0).getSubject().getSubjectKey(), "User:rocketmq"); + Assert.assertEquals(result.get(0).getResource().getResourceKey(), "Topic:topic"); + Assert.assertTrue(result.get(0).getActions().containsAll(Arrays.asList(Action.PUB))); + Assert.assertEquals(result.get(0).getSourceIp(), "192.168.0.1"); + Assert.assertEquals(result.get(0).getChannelId(), "channel-id"); + Assert.assertEquals(result.get(0).getRpcCode(), RecallMessageRequest.getDescriptor().getFullName()); + request = EndTransactionRequest.newBuilder() .setTopic(Resource.newBuilder().setName("topic").build()) .build(); @@ -315,6 +330,22 @@ public void buildRemoting() { Assert.assertEquals("Group:group", result.get(0).getResource().getResourceKey()); Assert.assertTrue(result.get(0).getActions().containsAll(Arrays.asList(Action.SUB))); + RecallMessageRequestHeader recallMessageRequestHeader = new RecallMessageRequestHeader(); + recallMessageRequestHeader.setTopic("topic"); + recallMessageRequestHeader.setRecallHandle("handle"); + request = RemotingCommand.createRequestCommand(RequestCode.RECALL_MESSAGE, recallMessageRequestHeader); + request.setVersion(441); + request.addExtField("AccessKey", "rocketmq"); + request.makeCustomHeaderToNet(); + result = builder.build(channelHandlerContext, request); + Assert.assertEquals(1, result.size()); + Assert.assertEquals("User:rocketmq", result.get(0).getSubject().getSubjectKey()); + Assert.assertEquals("Topic:topic", result.get(0).getResource().getResourceKey()); + Assert.assertTrue(result.get(0).getActions().containsAll(Arrays.asList(Action.PUB))); + Assert.assertEquals("192.168.0.1", result.get(0).getSourceIp()); + Assert.assertEquals("channel-id", result.get(0).getChannelId()); + Assert.assertEquals(RequestCode.RECALL_MESSAGE + "", result.get(0).getRpcCode()); + EndTransactionRequestHeader endTransactionRequestHeader = new EndTransactionRequestHeader(); endTransactionRequestHeader.setTopic("topic"); request = RemotingCommand.createRequestCommand(RequestCode.END_TRANSACTION, endTransactionRequestHeader); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index e1edd2f5126..744aba19118 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -93,6 +93,7 @@ import org.apache.rocketmq.broker.processor.PullMessageProcessor; import org.apache.rocketmq.broker.processor.QueryAssignmentProcessor; import org.apache.rocketmq.broker.processor.QueryMessageProcessor; +import org.apache.rocketmq.broker.processor.RecallMessageProcessor; import org.apache.rocketmq.broker.processor.ReplyMessageProcessor; import org.apache.rocketmq.broker.processor.SendMessageProcessor; import org.apache.rocketmq.broker.schedule.ScheduleMessageService; @@ -210,6 +211,7 @@ public class BrokerController { protected final QueryAssignmentProcessor queryAssignmentProcessor; protected final ClientManageProcessor clientManageProcessor; protected final SendMessageProcessor sendMessageProcessor; + protected final RecallMessageProcessor recallMessageProcessor; protected final ReplyMessageProcessor replyMessageProcessor; protected final PullRequestHoldService pullRequestHoldService; protected final MessageArrivingListener messageArrivingListener; @@ -369,6 +371,7 @@ public BrokerController( this.ackMessageProcessor = new AckMessageProcessor(this); this.changeInvisibleTimeProcessor = new ChangeInvisibleTimeProcessor(this); this.sendMessageProcessor = new SendMessageProcessor(this); + this.recallMessageProcessor = new RecallMessageProcessor(this); this.replyMessageProcessor = new ReplyMessageProcessor(this); this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService, this.popMessageProcessor, this.notificationProcessor); this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this); @@ -1096,10 +1099,12 @@ public void registerProcessor() { this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendMessageProcessor, this.sendMessageExecutor); this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendMessageProcessor, this.sendMessageExecutor); this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendMessageProcessor, this.sendMessageExecutor); + this.remotingServer.registerProcessor(RequestCode.RECALL_MESSAGE, recallMessageProcessor, this.sendMessageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendMessageProcessor, this.sendMessageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendMessageProcessor, this.sendMessageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendMessageProcessor, this.sendMessageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendMessageProcessor, this.sendMessageExecutor); + this.fastRemotingServer.registerProcessor(RequestCode.RECALL_MESSAGE, recallMessageProcessor, this.sendMessageExecutor); /** * PullMessageProcessor */ @@ -2424,6 +2429,10 @@ public SendMessageProcessor getSendMessageProcessor() { return sendMessageProcessor; } + public RecallMessageProcessor getRecallMessageProcessor() { + return recallMessageProcessor; + } + public QueryAssignmentProcessor getQueryAssignmentProcessor() { return queryAssignmentProcessor; } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/RecallMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/RecallMessageProcessor.java new file mode 100644 index 00000000000..7a652f43151 --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/RecallMessageProcessor.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.processor; + +import io.netty.channel.ChannelHandlerContext; +import org.apache.commons.codec.DecoderException; +import org.apache.commons.lang3.math.NumberUtils; +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.common.constant.PermName; +import org.apache.rocketmq.common.message.MessageAccessor; +import org.apache.rocketmq.common.message.MessageClientIDSetter; +import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.common.message.MessageDecoder; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageExtBrokerInner; +import org.apache.rocketmq.common.producer.RecallMessageHandle; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.remoting.protocol.ResponseCode; +import org.apache.rocketmq.remoting.protocol.header.RecallMessageRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.RecallMessageResponseHeader; +import org.apache.rocketmq.store.PutMessageResult; +import org.apache.rocketmq.store.config.BrokerRole; +import org.apache.rocketmq.store.timer.TimerMessageStore; + +import java.nio.charset.StandardCharsets; + +public class RecallMessageProcessor implements NettyRequestProcessor { + private static final String RECALL_MESSAGE_TAG = "_RECALL_TAG_"; + private final BrokerController brokerController; + + public RecallMessageProcessor(final BrokerController brokerController) { + this.brokerController = brokerController; + } + + @Override + public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws + RemotingCommandException { + final RemotingCommand response = RemotingCommand.createResponseCommand(RecallMessageResponseHeader.class); + response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId()); + final RecallMessageRequestHeader requestHeader = + request.decodeCommandCustomHeader(RecallMessageRequestHeader.class); + + if (BrokerRole.SLAVE == brokerController.getMessageStoreConfig().getBrokerRole()) { + response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE); + response.setRemark("recall failed, broker service not available"); + return response; + } + + final long startTimestamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp(); + if (this.brokerController.getMessageStore().now() < startTimestamp) { + response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE); + response.setRemark("recall failed, broker service not available"); + return response; + } + + if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission()) + && !this.brokerController.getBrokerConfig().isAllowRecallWhenBrokerNotWriteable()) { + response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE); + response.setRemark("recall failed, broker service not available"); + return response; + } + + TopicConfig topicConfig = + this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); + if (null == topicConfig) { + response.setCode(ResponseCode.TOPIC_NOT_EXIST); + response.setRemark("recall failed, the topic[" + requestHeader.getTopic() + "] not exist"); + return response; + } + + RecallMessageHandle.HandleV1 handle; + try { + handle = (RecallMessageHandle.HandleV1) RecallMessageHandle.decodeHandle(requestHeader.getRecallHandle()); + } catch (DecoderException e) { + response.setCode(ResponseCode.ILLEGAL_OPERATION); + response.setRemark(e.getMessage()); + return response; + } + + if (!requestHeader.getTopic().equals(handle.getTopic())) { + response.setCode(ResponseCode.ILLEGAL_OPERATION); + response.setRemark("recall failed, topic not match"); + return response; + } + if (!brokerController.getBrokerConfig().getBrokerName().equals(handle.getBrokerName())) { + response.setCode(ResponseCode.ILLEGAL_OPERATION); + response.setRemark("recall failed, broker service not available"); + return response; + } + + long timestamp = NumberUtils.toLong(handle.getTimestampStr(), -1); + long timeLeft = timestamp - System.currentTimeMillis(); + if (timeLeft <= 0 + || timeLeft >= brokerController.getMessageStoreConfig().getTimerMaxDelaySec() * 1000L) { + response.setCode(ResponseCode.ILLEGAL_OPERATION); + response.setRemark("recall failed, timestamp invalid"); + return response; + } + + MessageExtBrokerInner msgInner = buildMessage(ctx, requestHeader, handle); + long beginTimeMillis = this.brokerController.getMessageStore().now(); + PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner); + handlePutMessageResult(putMessageResult, request, response, msgInner, ctx, beginTimeMillis); + return response; + } + + public MessageExtBrokerInner buildMessage(ChannelHandlerContext ctx, RecallMessageRequestHeader requestHeader, + RecallMessageHandle.HandleV1 handle) { + MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); + msgInner.setTopic(handle.getTopic()); + msgInner.setBody("0".getBytes(StandardCharsets.UTF_8)); + msgInner.setTags(RECALL_MESSAGE_TAG); + msgInner.setTagsCode(RECALL_MESSAGE_TAG.hashCode()); + msgInner.setQueueId(0); + MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_TIMER_DEL_UNIQKEY, + TimerMessageStore.buildDeleteKey(handle.getTopic(), handle.getMessageId())); + MessageAccessor.putProperty(msgInner, + MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, handle.getMessageId()); + MessageAccessor.putProperty(msgInner, + MessageConst.PROPERTY_TIMER_DELIVER_MS, String.valueOf(handle.getTimestampStr())); + MessageAccessor.putProperty(msgInner, + MessageConst.PROPERTY_BORN_TIMESTAMP, String.valueOf(System.currentTimeMillis())); + MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_TRACE_CONTEXT, ""); + MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_PRODUCER_GROUP, requestHeader.getProducerGroup()); + msgInner.setBornTimestamp(System.currentTimeMillis()); + msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); + msgInner.setBornHost(ctx.channel().remoteAddress()); + msgInner.setStoreHost(this.brokerController.getStoreHost()); + return msgInner; + } + + public void handlePutMessageResult(PutMessageResult putMessageResult, RemotingCommand request, + RemotingCommand response, MessageExt message, ChannelHandlerContext ctx, long beginTimeMillis) { + if (null == putMessageResult) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("recall failed, execute error"); + return; + } + RecallMessageResponseHeader responseHeader = (RecallMessageResponseHeader) response.readCustomHeader(); + switch (putMessageResult.getPutMessageStatus()) { + case PUT_OK: + this.brokerController.getBrokerStatsManager().incTopicPutNums( + message.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1); // system timer topic + this.brokerController.getBrokerStatsManager().incTopicPutSize( + message.getTopic(), putMessageResult.getAppendMessageResult().getWroteBytes()); + this.brokerController.getBrokerStatsManager().incBrokerPutNums( + message.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum()); + this.brokerController.getBrokerStatsManager().incTopicPutLatency( + message.getTopic(), 0, (int) (this.brokerController.getMessageStore().now() - beginTimeMillis)); + case FLUSH_DISK_TIMEOUT: + case FLUSH_SLAVE_TIMEOUT: + case SLAVE_NOT_AVAILABLE: + response.setCode(ResponseCode.SUCCESS); + responseHeader.setMsgId(MessageClientIDSetter.getUniqID(message)); + break; + default: + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("recall failed, execute error"); + break; + } + } + + @Override + public boolean rejectRequest() { + return false; + } +} diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java index db5b22888dc..669cd5e6771 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java @@ -40,6 +40,7 @@ import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExtBatch; import org.apache.rocketmq.common.message.MessageExtBrokerInner; +import org.apache.rocketmq.common.producer.RecallMessageHandle; import org.apache.rocketmq.common.sysflag.MessageSysFlag; import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.common.utils.CleanupPolicyUtils; @@ -483,6 +484,7 @@ private RemotingCommand handlePutMessageResult(PutMessageResult putMessageResult responseHeader.setQueueId(queueIdInt); responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset()); responseHeader.setTransactionId(MessageClientIDSetter.getUniqID(msg)); + attachRecallHandle(request, msg, responseHeader); RemotingCommand rewriteResult = rewriteResponseForStaticTopic(responseHeader, mappingContext); if (rewriteResult != null) { @@ -647,6 +649,21 @@ private RemotingCommand sendBatchMessage(final ChannelHandlerContext ctx, } } + public void attachRecallHandle(RemotingCommand request, MessageExt msg, SendMessageResponseHeader responseHeader) { + if (RequestCode.SEND_BATCH_MESSAGE == request.getCode() + || RequestCode.CONSUMER_SEND_MSG_BACK == request.getCode()) { + return; + } + String timestampStr = msg.getProperty(MessageConst.PROPERTY_TIMER_OUT_MS); + String realTopic = msg.getProperty(MessageConst.PROPERTY_REAL_TOPIC); + if (timestampStr != null && realTopic != null && !realTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { + timestampStr = String.valueOf(Long.parseLong(timestampStr) + 1); // consider of floor + String recallHandle = RecallMessageHandle.HandleV1.buildHandle(realTopic, + brokerController.getBrokerConfig().getBrokerName(), timestampStr, MessageClientIDSetter.getUniqID(msg)); + responseHeader.setRecallHandle(recallHandle); + } + } + private String diskUtil() { double physicRatio = 100; String storePath; diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/RecallMessageProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/RecallMessageProcessorTest.java new file mode 100644 index 00000000000..7bd260cc2c0 --- /dev/null +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/RecallMessageProcessorTest.java @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.broker.processor; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.broker.topic.TopicConfigManager; +import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.common.message.MessageAccessor; +import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.common.message.MessageDecoder; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageExtBrokerInner; +import org.apache.rocketmq.common.producer.RecallMessageHandle; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.remoting.protocol.RequestCode; +import org.apache.rocketmq.remoting.protocol.ResponseCode; +import org.apache.rocketmq.remoting.protocol.header.RecallMessageRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.RecallMessageResponseHeader; +import org.apache.rocketmq.store.AppendMessageResult; +import org.apache.rocketmq.store.AppendMessageStatus; +import org.apache.rocketmq.store.MessageStore; +import org.apache.rocketmq.store.PutMessageResult; +import org.apache.rocketmq.store.PutMessageStatus; +import org.apache.rocketmq.store.config.BrokerRole; +import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.apache.rocketmq.store.stats.BrokerStatsManager; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.verify; + +@RunWith(MockitoJUnitRunner.class) +public class RecallMessageProcessorTest { + private static final String TOPIC = "topic"; + private static final String BROKER_NAME = "brokerName"; + + private RecallMessageProcessor recallMessageProcessor; + @Mock + private BrokerConfig brokerConfig; + @Mock + private BrokerController brokerController; + @Mock + private ChannelHandlerContext handlerContext; + @Mock + private MessageStoreConfig messageStoreConfig; + @Mock + private TopicConfigManager topicConfigManager; + @Mock + private MessageStore messageStore; + @Mock + private BrokerStatsManager brokerStatsManager; + @Mock + private Channel channel; + + @Before + public void init() throws IllegalAccessException, NoSuchFieldException { + when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig); + when(brokerController.getTopicConfigManager()).thenReturn(topicConfigManager); + when(brokerController.getMessageStore()).thenReturn(messageStore); + when(brokerController.getBrokerConfig()).thenReturn(brokerConfig); + when(brokerConfig.getBrokerName()).thenReturn(BROKER_NAME); + when(brokerController.getBrokerStatsManager()).thenReturn(brokerStatsManager); + when(handlerContext.channel()).thenReturn(channel); + recallMessageProcessor = new RecallMessageProcessor(brokerController); + } + + @Test + public void testBuildMessage() { + String timestampStr = String.valueOf(System.currentTimeMillis()); + String id = "id"; + RecallMessageHandle.HandleV1 handle = new RecallMessageHandle.HandleV1(TOPIC, "brokerName", timestampStr, id); + MessageExtBrokerInner msg = + recallMessageProcessor.buildMessage(handlerContext, new RecallMessageRequestHeader(), handle); + + Assert.assertEquals(TOPIC, msg.getTopic()); + Map properties = MessageDecoder.string2messageProperties(msg.getPropertiesString()); + Assert.assertEquals(timestampStr, properties.get(MessageConst.PROPERTY_TIMER_DELIVER_MS)); + Assert.assertEquals(id, properties.get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX)); + Assert.assertEquals(TOPIC + "+" + id, properties.get(MessageConst.PROPERTY_TIMER_DEL_UNIQKEY)); + } + + @Test + public void testHandlePutMessageResult() { + MessageExt message = new MessageExt(); + MessageAccessor.putProperty(message, MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, "id"); + RemotingCommand response = RemotingCommand.createResponseCommand(RecallMessageResponseHeader.class); + recallMessageProcessor.handlePutMessageResult(null, null, response, message, handlerContext, 0L); + Assert.assertEquals(ResponseCode.SYSTEM_ERROR, response.getCode()); + + List okStatus = Arrays.asList(PutMessageStatus.PUT_OK, PutMessageStatus.FLUSH_DISK_TIMEOUT, + PutMessageStatus.FLUSH_SLAVE_TIMEOUT, PutMessageStatus.SLAVE_NOT_AVAILABLE); + + for (PutMessageStatus status : PutMessageStatus.values()) { + PutMessageResult putMessageResult = + new PutMessageResult(status, new AppendMessageResult(AppendMessageStatus.PUT_OK)); + recallMessageProcessor.handlePutMessageResult(putMessageResult, null, response, message, handlerContext, 0L); + if (okStatus.contains(status)) { + Assert.assertEquals(ResponseCode.SUCCESS, response.getCode()); + RecallMessageResponseHeader responseHeader = (RecallMessageResponseHeader) response.readCustomHeader(); + Assert.assertEquals("id", responseHeader.getMsgId()); + } else { + Assert.assertEquals(ResponseCode.SYSTEM_ERROR, response.getCode()); + } + } + } + + @Test + public void testProcessRequest_invalidStatus() throws RemotingCommandException { + RemotingCommand request = mockRequest(0, TOPIC, TOPIC, "id", BROKER_NAME); + RemotingCommand response; + + // role slave + when(messageStoreConfig.getBrokerRole()).thenReturn(BrokerRole.SLAVE); + response = recallMessageProcessor.processRequest(handlerContext, request); + Assert.assertEquals(ResponseCode.SLAVE_NOT_AVAILABLE, response.getCode()); + + // not reach startTimestamp + when(messageStoreConfig.getBrokerRole()).thenReturn(BrokerRole.SYNC_MASTER); + when(messageStore.now()).thenReturn(0L); + when(brokerConfig.getStartAcceptSendRequestTimeStamp()).thenReturn(System.currentTimeMillis()); + response = recallMessageProcessor.processRequest(handlerContext, request); + Assert.assertEquals(ResponseCode.SERVICE_NOT_AVAILABLE, response.getCode()); + } + + @Test + public void testProcessRequest_notWriteable() throws RemotingCommandException { + when(brokerConfig.getBrokerPermission()).thenReturn(4); + when(brokerConfig.isAllowRecallWhenBrokerNotWriteable()).thenReturn(false); + RemotingCommand request = mockRequest(0, TOPIC, TOPIC, "id", BROKER_NAME); + RemotingCommand response = recallMessageProcessor.processRequest(handlerContext, request); + Assert.assertEquals(ResponseCode.SERVICE_NOT_AVAILABLE, response.getCode()); + } + + @Test + public void testProcessRequest_topicNotFound_or_notMatch() throws RemotingCommandException { + when(brokerConfig.getBrokerPermission()).thenReturn(6); + RemotingCommand request; + RemotingCommand response; + + // not found + request = mockRequest(0, TOPIC, TOPIC, "id", BROKER_NAME); + response = recallMessageProcessor.processRequest(handlerContext, request); + Assert.assertEquals(ResponseCode.TOPIC_NOT_EXIST, response.getCode()); + + // not match + when(topicConfigManager.selectTopicConfig(TOPIC)).thenReturn(new TopicConfig(TOPIC)); + request = mockRequest(0, TOPIC, "anotherTopic", "id", BROKER_NAME); + response = recallMessageProcessor.processRequest(handlerContext, request); + Assert.assertEquals(ResponseCode.ILLEGAL_OPERATION, response.getCode()); + } + + @Test + public void testProcessRequest_brokerNameNotMatch() throws RemotingCommandException { + when(brokerConfig.getBrokerPermission()).thenReturn(6); + when(topicConfigManager.selectTopicConfig(TOPIC)).thenReturn(new TopicConfig(TOPIC)); + + RemotingCommand request = mockRequest(0, TOPIC, "anotherTopic", "id", BROKER_NAME + "_other"); + RemotingCommand response = recallMessageProcessor.processRequest(handlerContext, request); + Assert.assertEquals(ResponseCode.ILLEGAL_OPERATION, response.getCode()); + } + + @Test + public void testProcessRequest_timestampInvalid() throws RemotingCommandException { + when(brokerConfig.getBrokerPermission()).thenReturn(6); + when(topicConfigManager.selectTopicConfig(TOPIC)).thenReturn(new TopicConfig(TOPIC)); + RemotingCommand request; + RemotingCommand response; + + // past timestamp + request = mockRequest(0, TOPIC, TOPIC, "id", BROKER_NAME); + response = recallMessageProcessor.processRequest(handlerContext, request); + Assert.assertEquals(ResponseCode.ILLEGAL_OPERATION, response.getCode()); + + // timestamp overflow + when(messageStoreConfig.getTimerMaxDelaySec()).thenReturn(86400); + request = mockRequest(System.currentTimeMillis() + 86400 * 2 * 1000, TOPIC, TOPIC, "id", BROKER_NAME); + response = recallMessageProcessor.processRequest(handlerContext, request); + Assert.assertEquals(ResponseCode.ILLEGAL_OPERATION, response.getCode()); + } + + @Test + public void testProcessRequest_success() throws RemotingCommandException { + when(brokerConfig.getBrokerPermission()).thenReturn(6); + when(topicConfigManager.selectTopicConfig(TOPIC)).thenReturn(new TopicConfig(TOPIC)); + when(messageStoreConfig.getTimerMaxDelaySec()).thenReturn(86400); + when(messageStore.putMessage(any())).thenReturn( + new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK))); + + String msgId = "msgId"; + RemotingCommand request = mockRequest(System.currentTimeMillis() + 90 * 1000, TOPIC, TOPIC, msgId, BROKER_NAME); + RemotingCommand response = recallMessageProcessor.processRequest(handlerContext, request); + RecallMessageResponseHeader responseHeader = (RecallMessageResponseHeader) response.readCustomHeader(); + Assert.assertEquals(ResponseCode.SUCCESS, response.getCode()); + Assert.assertEquals(msgId, responseHeader.getMsgId()); + verify(messageStore, times(1)).putMessage(any()); + } + + private RemotingCommand mockRequest(long timestamp, String requestTopic, String handleTopic, + String msgId, String brokerName) { + String handle = + RecallMessageHandle.HandleV1.buildHandle(handleTopic, brokerName, String.valueOf(timestamp), msgId); + RecallMessageRequestHeader requestHeader = new RecallMessageRequestHeader(); + requestHeader.setProducerGroup("group"); + requestHeader.setTopic(requestTopic); + requestHeader.setRecallHandle(handle); + requestHeader.setBrokerName(brokerName); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.RECALL_MESSAGE, requestHeader); + request.makeCustomHeaderToNet(); + return request; + } +} diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java index 442794dcd26..9da6a96ec99 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; +import org.apache.commons.codec.DecoderException; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.common.AbortProcessException; import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext; @@ -36,10 +37,13 @@ import org.apache.rocketmq.broker.transaction.TransactionalMessageService; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.common.message.MessageAccessor; +import org.apache.rocketmq.common.message.MessageClientIDSetter; import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExtBrokerInner; +import org.apache.rocketmq.common.producer.RecallMessageHandle; import org.apache.rocketmq.common.sysflag.MessageSysFlag; import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.remoting.exception.RemotingCommandException; @@ -50,12 +54,14 @@ import org.apache.rocketmq.remoting.protocol.ResponseCode; import org.apache.rocketmq.remoting.protocol.header.ConsumerSendMsgBackRequestHeader; import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.SendMessageResponseHeader; import org.apache.rocketmq.store.AppendMessageResult; import org.apache.rocketmq.store.AppendMessageStatus; import org.apache.rocketmq.store.MessageStore; import org.apache.rocketmq.store.PutMessageResult; import org.apache.rocketmq.store.PutMessageStatus; import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -69,6 +75,8 @@ import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.times; @RunWith(MockitoJUnitRunner.class) public class SendMessageProcessorTest { @@ -78,6 +86,8 @@ public class SendMessageProcessorTest { @Mock private Channel channel; @Spy + private BrokerConfig brokerConfig; + @Spy private BrokerController brokerController = new BrokerController(new BrokerConfig(), new NettyServerConfig(), new NettyClientConfig(), new MessageStoreConfig()); @Mock @@ -98,6 +108,7 @@ public void init() { when(brokerController.getSubscriptionGroupManager()).thenReturn(subscriptionGroupManager); when(brokerController.getTopicConfigManager()).thenReturn(topicConfigManager); when(brokerController.getPutMessageFutureExecutor()).thenReturn(Executors.newSingleThreadExecutor()); + when(brokerController.getBrokerConfig()).thenReturn(brokerConfig); when(messageStore.now()).thenReturn(System.currentTimeMillis()); when(channel.remoteAddress()).thenReturn(new InetSocketAddress(1024)); when(handlerContext.channel()).thenReturn(channel); @@ -299,6 +310,60 @@ public void consumeMessageAfter(ConsumeMessageContext context) { assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); } + @Test + public void testAttachRecallHandle_skip() { + MessageExt message = new MessageExt(); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SEND_BATCH_MESSAGE, null); + sendMessageProcessor.attachRecallHandle(request, message, new SendMessageResponseHeader()); + + request = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, null); + sendMessageProcessor.attachRecallHandle(request, message, new SendMessageResponseHeader()); + + request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, null); + sendMessageProcessor.attachRecallHandle(request, message, new SendMessageResponseHeader()); + + request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE_V2, null); + sendMessageProcessor.attachRecallHandle(request, message, new SendMessageResponseHeader()); + + verify(brokerConfig, times(0)).getBrokerName(); + } + + @Test + public void testAttachRecallHandle_doAttach() throws DecoderException { + int[] precisionSet = {100, 200, 500, 1000}; + SendMessageResponseHeader responseHeader = new SendMessageResponseHeader(); + String id = MessageClientIDSetter.createUniqID(); + long timestamp = System.currentTimeMillis(); + + for (int precisionMs : precisionSet) { + long deliverMs = floor(timestamp, precisionMs); + MessageExt message = new MessageExt(); + MessageAccessor.putProperty(message, MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, id); + MessageAccessor.putProperty(message, MessageConst.PROPERTY_TIMER_OUT_MS, String.valueOf(deliverMs)); + MessageAccessor.putProperty(message, MessageConst.PROPERTY_REAL_TOPIC, topic); + + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE_V2, null); + sendMessageProcessor.attachRecallHandle(request, message, responseHeader); + Assert.assertNotNull(responseHeader.getRecallHandle()); + RecallMessageHandle.HandleV1 v1 = + (RecallMessageHandle.HandleV1) RecallMessageHandle.decodeHandle(responseHeader.getRecallHandle()); + Assert.assertEquals(id, v1.getMessageId()); + Assert.assertEquals(topic, v1.getTopic()); + Assert.assertEquals(deliverMs + 1, Long.parseLong(v1.getTimestampStr())); + Assert.assertEquals(deliverMs, floor(Long.valueOf(v1.getTimestampStr()), precisionMs)); + } + } + + private long floor(long deliverMs, int precisionMs) { + assert precisionMs > 0; + if (deliverMs % precisionMs == 0) { + deliverMs -= precisionMs; + } else { + deliverMs = deliverMs / precisionMs * precisionMs; + } + return deliverMs; + } + private RemotingCommand createSendTransactionMsgCommand(int requestCode) { SendMessageRequestHeader header = createSendMsgRequestHeader(); int sysFlag = header.getSysFlag(); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index 554b1efa524..2e088ac9da5 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -204,6 +204,8 @@ import org.apache.rocketmq.remoting.protocol.header.QuerySubscriptionByConsumerRequestHeader; import org.apache.rocketmq.remoting.protocol.header.QueryTopicConsumeByWhoRequestHeader; import org.apache.rocketmq.remoting.protocol.header.QueryTopicsByConsumerRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.RecallMessageRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.RecallMessageResponseHeader; import org.apache.rocketmq.remoting.protocol.header.RemoveBrokerRequestHeader; import org.apache.rocketmq.remoting.protocol.header.ResetMasterFlushOffsetHeader; import org.apache.rocketmq.remoting.protocol.header.ResetOffsetRequestHeader; @@ -853,6 +855,7 @@ protected SendResult processSendResponse( uniqMsgId, responseHeader.getMsgId(), messageQueue, responseHeader.getQueueOffset()); sendResult.setTransactionId(responseHeader.getTransactionId()); + sendResult.setRecallHandle(responseHeader.getRecallHandle()); String regionId = response.getExtFields().get(MessageConst.PROPERTY_MSG_REGION); if (regionId == null || regionId.isEmpty()) { regionId = MixAll.DEFAULT_TRACE_REGION_ID; @@ -3525,4 +3528,49 @@ public List listAcl(String addr, String subjectFilter, String resourceF } throw new MQBrokerException(response.getCode(), response.getRemark()); } + + public String recallMessage( + final String addr, + RecallMessageRequestHeader requestHeader, + final long timeoutMillis + ) throws RemotingException, MQBrokerException, InterruptedException { + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.RECALL_MESSAGE, requestHeader); + + RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis); + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + RecallMessageResponseHeader responseHeader = + response.decodeCommandCustomHeader(RecallMessageResponseHeader.class); + return responseHeader.getMsgId(); + } + default: + break; + } + throw new MQBrokerException(response.getCode(), response.getRemark(), addr); + } + + public void recallMessageAsync( + final String addr, + final RecallMessageRequestHeader requestHeader, + final long timeoutMillis, + final InvokeCallback invokeCallback + ) throws RemotingException, InterruptedException { + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.RECALL_MESSAGE, requestHeader); + + this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() { + @Override + public void operationComplete(ResponseFuture responseFuture) { + } + + @Override + public void operationSucceed(RemotingCommand response) { + invokeCallback.operationSucceed(response); + } + + @Override + public void operationFail(Throwable throwable) { + invokeCallback.operationFail(throwable); + } + }); + } } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java b/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java index 0e2092b8a0f..6624b3100d8 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java @@ -74,6 +74,8 @@ import org.apache.rocketmq.remoting.protocol.header.PullMessageRequestHeader; import org.apache.rocketmq.remoting.protocol.header.QueryConsumerOffsetRequestHeader; import org.apache.rocketmq.remoting.protocol.header.QueryConsumerOffsetResponseHeader; +import org.apache.rocketmq.remoting.protocol.header.RecallMessageRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.RecallMessageResponseHeader; import org.apache.rocketmq.remoting.protocol.header.SearchOffsetRequestHeader; import org.apache.rocketmq.remoting.protocol.header.SearchOffsetResponseHeader; import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader; @@ -624,6 +626,26 @@ public CompletableFuture notification(String brokerAddr, NotificationRe }); } + public CompletableFuture recallMessageAsync(String brokerAddr, + RecallMessageRequestHeader requestHeader, long timeoutMillis) { + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.RECALL_MESSAGE, requestHeader); + return this.getRemotingClient().invoke(brokerAddr, request, timeoutMillis).thenCompose(response -> { + CompletableFuture future = new CompletableFuture<>(); + if (ResponseCode.SUCCESS == response.getCode()) { + try { + RecallMessageResponseHeader responseHeader = + response.decodeCommandCustomHeader(RecallMessageResponseHeader.class); + future.complete(responseHeader.getMsgId()); + } catch (Throwable t) { + future.completeExceptionally(t); + } + } else { + future.completeExceptionally(new MQBrokerException(response.getCode(), response.getRemark(), brokerAddr)); + } + return future; + }); + } + public CompletableFuture invoke(String brokerAddr, RemotingCommand request, long timeoutMillis) { return getRemotingClient().invoke(brokerAddr, request, timeoutMillis); } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index 3d4fdbec373..15264f0e503 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -35,6 +35,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.Validators; import org.apache.rocketmq.client.common.ClientErrorCode; @@ -81,6 +82,7 @@ import org.apache.rocketmq.common.message.MessageId; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageType; +import org.apache.rocketmq.common.producer.RecallMessageHandle; import org.apache.rocketmq.common.sysflag.MessageSysFlag; import org.apache.rocketmq.common.utils.CorrelationIdUtil; import org.apache.rocketmq.remoting.RPCHook; @@ -91,6 +93,7 @@ import org.apache.rocketmq.remoting.protocol.NamespaceUtil; import org.apache.rocketmq.remoting.protocol.header.CheckTransactionStateRequestHeader; import org.apache.rocketmq.remoting.protocol.header.EndTransactionRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.RecallMessageRequestHeader; import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; @@ -1549,6 +1552,40 @@ public void endTransaction( this.defaultMQProducer.getSendMsgTimeout()); } + public String recallMessage( + String topic, + String recallHandle) throws RemotingException, MQClientException, MQBrokerException, InterruptedException { + makeSureStateOK(); + Validators.checkTopic(topic); + if (NamespaceUtil.isRetryTopic(topic) || NamespaceUtil.isDLQTopic(topic)) { + throw new MQClientException("topic is not supported", null); + } + RecallMessageHandle.HandleV1 handleEntity; + try { + handleEntity = (RecallMessageHandle.HandleV1) RecallMessageHandle.decodeHandle(recallHandle); + } catch (Exception e) { + throw new MQClientException(e.getMessage(), null); + } + + tryToFindTopicPublishInfo(topic); + String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(handleEntity.getBrokerName()); + brokerAddr = StringUtils.isNotEmpty(brokerAddr) ? + // find another address to support multi proxy endpoints, + // may cause failure request in proxy-less mode when the broker is temporarily unavailable + brokerAddr : this.mQClientFactory.findBrokerAddrByTopic(topic); + if (StringUtils.isEmpty(brokerAddr)) { + log.warn("can't find broker service address. {}", handleEntity.getBrokerName()); + throw new MQClientException("The broker service address not found", null); + } + RecallMessageRequestHeader requestHeader = new RecallMessageRequestHeader(); + requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup()); + requestHeader.setTopic(topic); + requestHeader.setRecallHandle(recallHandle); + requestHeader.setBrokerName(handleEntity.getBrokerName()); + return this.mQClientFactory.getMQClientAPIImpl().recallMessage(brokerAddr, + requestHeader, this.defaultMQProducer.getSendMsgTimeout()); + } + public void setCallbackExecutor(final ExecutorService callbackExecutor) { this.mQClientFactory.getMQClientAPIImpl().getRemotingClient().setCallbackExecutor(callbackExecutor); } diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java index a8bf7cee85f..e3f81ad9685 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java @@ -25,6 +25,7 @@ import org.apache.rocketmq.client.impl.MQClientManager; import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl; import org.apache.rocketmq.client.lock.ReadWriteCASLock; +import org.apache.rocketmq.client.trace.hook.DefaultRecallMessageTraceHook; import org.apache.rocketmq.client.trace.AsyncTraceDispatcher; import org.apache.rocketmq.client.trace.TraceDispatcher; import org.apache.rocketmq.client.trace.hook.EndTransactionTraceHookImpl; @@ -381,6 +382,8 @@ public void start() throws MQClientException { new SendMessageTraceHookImpl(traceDispatcher)); this.defaultMQProducerImpl.registerEndTransactionHook( new EndTransactionTraceHookImpl(traceDispatcher)); + this.defaultMQProducerImpl.getMqClientFactory().getMQClientAPIImpl().getRemotingClient() + .registerRPCHook(new DefaultRecallMessageTraceHook(traceDispatcher)); } catch (Throwable e) { logger.error("system mqtrace hook init failed ,maybe can't send msg trace data"); } @@ -1128,6 +1131,12 @@ public void send(Collection msgs, MessageQueue mq, this.defaultMQProducerImpl.send(batch(msgs), queueWithNamespace(mq), sendCallback, timeout); } + @Override + public String recallMessage(String topic, String recallHandle) + throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + return this.defaultMQProducerImpl.recallMessage(withNamespace(topic), recallHandle); + } + /** * Sets an Executor to be used for executing callback methods. * diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java index 8bd30e98d7b..4286fdd7f96 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java @@ -113,6 +113,9 @@ void send(final Collection msgs, final MessageQueue mq, final SendCallb final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException; + String recallMessage(String topic, String recallHandle) + throws MQClientException, RemotingException, MQBrokerException, InterruptedException; + //for rpc Message request(final Message msg, final long timeout) throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException, InterruptedException; diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/SendResult.java b/client/src/main/java/org/apache/rocketmq/client/producer/SendResult.java index dd7ea1cdc5f..d160eb4eae9 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/SendResult.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/SendResult.java @@ -29,6 +29,7 @@ public class SendResult { private String regionId; private boolean traceOn = true; private byte[] rawRespBody; + private String recallHandle; public SendResult() { } @@ -126,10 +127,18 @@ public void setOffsetMsgId(String offsetMsgId) { this.offsetMsgId = offsetMsgId; } + public String getRecallHandle() { + return recallHandle; + } + + public void setRecallHandle(String recallHandle) { + this.recallHandle = recallHandle; + } + @Override public String toString() { return "SendResult [sendStatus=" + sendStatus + ", msgId=" + msgId + ", offsetMsgId=" + offsetMsgId + ", messageQueue=" + messageQueue - + ", queueOffset=" + queueOffset + "]"; + + ", queueOffset=" + queueOffset + ", recallHandle=" + recallHandle + "]"; } public void setRawRespBody(byte[] body) { diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java b/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java index 57e9b6410db..1e66aa0498d 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java @@ -132,6 +132,19 @@ public static List decoderFromTraceDataString(String traceData) { endTransactionContext.setTraceBeans(new ArrayList<>(1)); endTransactionContext.getTraceBeans().add(bean); resList.add(endTransactionContext); + } else if (line[0].equals(TraceType.Recall.name())) { + TraceContext recallContext = new TraceContext(); + recallContext.setTraceType(TraceType.Recall); + recallContext.setTimeStamp(Long.parseLong(line[1])); + recallContext.setRegionId(line[2]); + recallContext.setGroupName(line[3]); + TraceBean bean = new TraceBean(); + bean.setTopic(line[4]); + bean.setMsgId(line[5]); + recallContext.setSuccess(Boolean.parseBoolean(line[6])); + recallContext.setTraceBeans(new ArrayList<>(1)); + recallContext.getTraceBeans().add(bean); + resList.add(recallContext); } } return resList; @@ -217,6 +230,17 @@ public static TraceTransferBean encoderFromContextBean(TraceContext ctx) { .append(bean.isFromTransactionCheck()).append(TraceConstants.FIELD_SPLITOR); } break; + case Recall: { + TraceBean bean = ctx.getTraceBeans().get(0); + sb.append(ctx.getTraceType()).append(TraceConstants.CONTENT_SPLITOR) + .append(ctx.getTimeStamp()).append(TraceConstants.CONTENT_SPLITOR) + .append(ctx.getRegionId()).append(TraceConstants.CONTENT_SPLITOR) + .append(ctx.getGroupName()).append(TraceConstants.CONTENT_SPLITOR) + .append(bean.getTopic()).append(TraceConstants.CONTENT_SPLITOR) + .append(bean.getMsgId()).append(TraceConstants.CONTENT_SPLITOR) + .append(ctx.isSuccess()).append(TraceConstants.FIELD_SPLITOR);// + } + break; default: } transferBean.setTransData(sb.toString()); diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/TraceType.java b/client/src/main/java/org/apache/rocketmq/client/trace/TraceType.java index 8870ddcbdb3..4c0e7d8ab26 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceType.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/TraceType.java @@ -18,6 +18,7 @@ public enum TraceType { Pub, + Recall, SubBefore, SubAfter, EndTransaction, diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/hook/DefaultRecallMessageTraceHook.java b/client/src/main/java/org/apache/rocketmq/client/trace/hook/DefaultRecallMessageTraceHook.java new file mode 100644 index 00000000000..c490a7b3599 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/trace/hook/DefaultRecallMessageTraceHook.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.client.trace.hook; + +import org.apache.rocketmq.client.trace.TraceBean; +import org.apache.rocketmq.client.trace.TraceContext; +import org.apache.rocketmq.client.trace.TraceDispatcher; +import org.apache.rocketmq.client.trace.TraceType; +import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.common.producer.RecallMessageHandle; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.remoting.protocol.NamespaceUtil; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.remoting.protocol.RequestCode; +import org.apache.rocketmq.remoting.protocol.ResponseCode; +import org.apache.rocketmq.remoting.protocol.header.RecallMessageRequestHeader; + +import java.util.ArrayList; + +public class DefaultRecallMessageTraceHook implements RPCHook { + + private static final String RECALL_TRACE_ENABLE_KEY = "com.rocketmq.recall.default.trace.enable"; + private boolean enableDefaultTrace = Boolean.parseBoolean(System.getProperty(RECALL_TRACE_ENABLE_KEY, "false")); + private TraceDispatcher traceDispatcher; + + public DefaultRecallMessageTraceHook(TraceDispatcher traceDispatcher) { + this.traceDispatcher = traceDispatcher; + } + + @Override + public void doBeforeRequest(String remoteAddr, RemotingCommand request) { + } + + @Override + public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) { + if (request.getCode() != RequestCode.RECALL_MESSAGE + || !enableDefaultTrace + || null == response.getExtFields() + || null == response.getExtFields().get(MessageConst.PROPERTY_MSG_REGION) + || null == traceDispatcher) { + return; + } + + try { + String regionId = response.getExtFields().get(MessageConst.PROPERTY_MSG_REGION); + RecallMessageRequestHeader requestHeader = + request.decodeCommandCustomHeader(RecallMessageRequestHeader.class); + String topic = NamespaceUtil.withoutNamespace(requestHeader.getTopic()); + String group = NamespaceUtil.withoutNamespace(requestHeader.getProducerGroup()); + String recallHandle = requestHeader.getRecallHandle(); + RecallMessageHandle.HandleV1 handleV1 = + (RecallMessageHandle.HandleV1) RecallMessageHandle.decodeHandle(recallHandle); + + TraceBean traceBean = new TraceBean(); + traceBean.setTopic(topic); + traceBean.setMsgId(handleV1.getMessageId()); + + TraceContext traceContext = new TraceContext(); + traceContext.setRegionId(regionId); + traceContext.setTraceBeans(new ArrayList<>(1)); + traceContext.setTraceType(TraceType.Recall); + traceContext.setGroupName(group); + traceContext.getTraceBeans().add(traceBean); + traceContext.setSuccess(ResponseCode.SUCCESS == response.getCode()); + + traceDispatcher.append(traceContext); + } catch (Exception e) { + } + } +} diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java index 81dc5883fb8..c76d0c734a0 100644 --- a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java @@ -41,6 +41,7 @@ import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageAccessor; +import org.apache.rocketmq.common.message.MessageClientIDSetter; import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; @@ -118,6 +119,8 @@ import org.apache.rocketmq.remoting.protocol.header.QueryConsumerOffsetRequestHeader; import org.apache.rocketmq.remoting.protocol.header.QueryConsumerOffsetResponseHeader; import org.apache.rocketmq.remoting.protocol.header.QueryMessageRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.RecallMessageRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.RecallMessageResponseHeader; import org.apache.rocketmq.remoting.protocol.header.SearchOffsetResponseHeader; import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader; import org.apache.rocketmq.remoting.protocol.header.SendMessageResponseHeader; @@ -142,6 +145,7 @@ import org.apache.rocketmq.remoting.protocol.subscription.GroupForbidden; import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; import org.assertj.core.api.Assertions; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -170,6 +174,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; @@ -2028,6 +2033,77 @@ public void assertListAcl() throws RemotingException, InterruptedException, MQBr assertEquals(1, actual.get(0).getPolicies().size()); } + @Test + public void testRecallMessage() throws RemotingException, InterruptedException, MQBrokerException { + RecallMessageRequestHeader requestHeader = new RecallMessageRequestHeader(); + requestHeader.setProducerGroup(group); + requestHeader.setTopic(topic); + requestHeader.setRecallHandle("handle"); + requestHeader.setBrokerName(brokerName); + + // success + mockInvokeSync(); + String msgId = MessageClientIDSetter.createUniqID(); + RecallMessageResponseHeader responseHeader = new RecallMessageResponseHeader(); + responseHeader.setMsgId(msgId); + setResponseHeader(responseHeader); + String result = mqClientAPI.recallMessage(defaultBrokerAddr, requestHeader, defaultTimeout); + assertEquals(msgId, result); + + // error + when(response.getCode()).thenReturn(ResponseCode.SYSTEM_ERROR); + when(response.getRemark()).thenReturn("error"); + MQBrokerException e = assertThrows(MQBrokerException.class, () -> { + mqClientAPI.recallMessage(defaultBrokerAddr, requestHeader, defaultTimeout); + }); + assertEquals(ResponseCode.SYSTEM_ERROR, e.getResponseCode()); + assertEquals("error", e.getErrorMessage()); + assertEquals(defaultBrokerAddr, e.getBrokerAddr()); + } + + @Test + public void testRecallMessageAsync() throws RemotingException, InterruptedException { + RecallMessageRequestHeader requestHeader = new RecallMessageRequestHeader(); + requestHeader.setProducerGroup(group); + requestHeader.setTopic(topic); + requestHeader.setRecallHandle("handle"); + requestHeader.setBrokerName(brokerName); + String msgId = "msgId"; + doAnswer((Answer) mock -> { + InvokeCallback callback = mock.getArgument(3); + RemotingCommand request = mock.getArgument(1); + RemotingCommand response = RemotingCommand.createResponseCommand(RecallMessageResponseHeader.class); + response.setCode(ResponseCode.SUCCESS); + response.setOpaque(request.getOpaque()); + RecallMessageResponseHeader responseHeader = (RecallMessageResponseHeader) response.readCustomHeader(); + responseHeader.setMsgId(msgId); + ResponseFuture responseFuture = new ResponseFuture(null, request.getOpaque(), 3 * 1000, null, null); + responseFuture.setResponseCommand(response); + callback.operationSucceed(responseFuture.getResponseCommand()); + return null; + }).when(remotingClient).invokeAsync(anyString(), any(RemotingCommand.class), anyLong(), any(InvokeCallback.class)); + + final CountDownLatch done = new CountDownLatch(1); + mqClientAPI.recallMessageAsync(defaultBrokerAddr, requestHeader, + defaultTimeout, new InvokeCallback() { + @Override + public void operationComplete(ResponseFuture responseFuture) { + } + + @Override + public void operationSucceed(RemotingCommand response) { + RecallMessageResponseHeader responseHeader = (RecallMessageResponseHeader) response.readCustomHeader(); + Assert.assertEquals(msgId, responseHeader.getMsgId()); + done.countDown(); + } + + @Override + public void operationFail(Throwable throwable) { + } + }); + done.await(); + } + private Properties createProperties() { Properties result = new Properties(); result.put("key", "value"); diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExtTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExtTest.java index 6f692dff950..e2a29c9a21f 100644 --- a/client/src/test/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExtTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExtTest.java @@ -18,6 +18,7 @@ package org.apache.rocketmq.client.impl.mqclient; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.exception.MQBrokerException; @@ -29,8 +30,11 @@ import org.apache.rocketmq.remoting.netty.NettyRemotingClient; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.ResponseCode; +import org.apache.rocketmq.remoting.protocol.header.RecallMessageRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.RecallMessageResponseHeader; import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader; import org.apache.rocketmq.remoting.protocol.header.UpdateConsumerOffsetRequestHeader; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -108,4 +112,47 @@ public void testUpdateConsumerOffsetAsync_Fail() throws InterruptedException { assertEquals(customEx.getErrorMessage(), "QueueId is null, topic is testTopic"); } } + + @Test + public void testRecallMessageAsync_success() { + String msgId = "msgId"; + RecallMessageRequestHeader requestHeader = new RecallMessageRequestHeader(); + requestHeader.setProducerGroup("group"); + requestHeader.setTopic("topic"); + requestHeader.setRecallHandle("handle"); + requestHeader.setBrokerName("brokerName"); + + RemotingCommand response = RemotingCommand.createResponseCommand(RecallMessageResponseHeader.class); + response.setCode(ResponseCode.SUCCESS); + RecallMessageResponseHeader responseHeader = (RecallMessageResponseHeader) response.readCustomHeader(); + responseHeader.setMsgId(msgId); + response.makeCustomHeaderToNet(); + CompletableFuture remotingFuture = new CompletableFuture<>(); + remotingFuture.complete(response); + doReturn(remotingFuture).when(remotingClientMock).invoke(anyString(), any(RemotingCommand.class), anyLong()); + + String resultId = + mqClientAPIExt.recallMessageAsync("brokerAddr", requestHeader, 3000L).join(); + Assert.assertEquals(msgId, resultId); + } + + @Test + public void testRecallMessageAsync_fail() { + RecallMessageRequestHeader requestHeader = new RecallMessageRequestHeader(); + requestHeader.setProducerGroup("group"); + requestHeader.setTopic("topic"); + requestHeader.setRecallHandle("handle"); + requestHeader.setBrokerName("brokerName"); + + CompletableFuture remotingFuture = new CompletableFuture<>(); + remotingFuture.complete(RemotingCommand.createResponseCommand(ResponseCode.SERVICE_NOT_AVAILABLE, "")); + doReturn(remotingFuture).when(remotingClientMock).invoke(anyString(), any(RemotingCommand.class), anyLong()); + + CompletionException exception = Assert.assertThrows(CompletionException.class, () -> { + mqClientAPIExt.recallMessageAsync("brokerAddr", requestHeader, 3000L).join(); + }); + Assert.assertTrue(exception.getCause() instanceof MQBrokerException); + MQBrokerException cause = (MQBrokerException) exception.getCause(); + Assert.assertEquals(ResponseCode.SERVICE_NOT_AVAILABLE, cause.getResponseCode()); + } } \ No newline at end of file diff --git a/client/src/test/java/org/apache/rocketmq/client/producer/selector/DefaultMQProducerImplTest.java b/client/src/test/java/org/apache/rocketmq/client/producer/selector/DefaultMQProducerImplTest.java index a17fe43f461..77a83af19c0 100644 --- a/client/src/test/java/org/apache/rocketmq/client/producer/selector/DefaultMQProducerImplTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/producer/selector/DefaultMQProducerImplTest.java @@ -33,11 +33,13 @@ import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.TransactionListener; import org.apache.rocketmq.client.producer.TransactionMQProducer; +import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.ServiceState; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.producer.RecallMessageHandle; import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.protocol.header.CheckTransactionStateRequestHeader; import org.junit.Before; @@ -61,9 +63,11 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertThrows; import static org.mockito.AdditionalMatchers.or; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -86,10 +90,15 @@ public class DefaultMQProducerImplTest { @Mock private MQClientInstance mQClientFactory; + @Mock + private MQClientAPIImpl mQClientAPIImpl; + private DefaultMQProducerImpl defaultMQProducerImpl; private final long defaultTimeout = 30000L; + private final String defaultBrokerName = "broker-0"; + private final String defaultBrokerAddr = "127.0.0.1:10911"; private final String defaultTopic = "testTopic"; @@ -104,7 +113,6 @@ public void init() throws Exception { when(clientConfig.queueWithNamespace(any())).thenReturn(messageQueue); when(mQClientFactory.getClientConfig()).thenReturn(clientConfig); when(mQClientFactory.getTopicRouteTable()).thenReturn(mock(ConcurrentMap.class)); - MQClientAPIImpl mQClientAPIImpl = mock(MQClientAPIImpl.class); when(mQClientFactory.getMQClientAPIImpl()).thenReturn(mQClientAPIImpl); when(mQClientFactory.findBrokerAddressInPublish(or(isNull(), anyString()))).thenReturn(defaultBrokerAddr); when(message.getTopic()).thenReturn(defaultTopic); @@ -313,6 +321,39 @@ public void assertCheckListener() { assertNull(defaultMQProducerImpl.checkListener()); } + @Test + public void testRecallMessage_invalid() { + assertThrows(MQClientException.class, () -> { + defaultMQProducerImpl.recallMessage(MixAll.REPLY_TOPIC_POSTFIX + defaultTopic, "handle"); + }); + assertThrows(MQClientException.class, () -> { + defaultMQProducerImpl.recallMessage(MixAll.DLQ_GROUP_TOPIC_PREFIX + defaultTopic, "handle"); + }); + assertThrows(MQClientException.class, () -> { + defaultMQProducerImpl.recallMessage(defaultTopic, "handle"); + }); + } + + @Test + public void testRecallMessage_addressNotFound() { + String handle = RecallMessageHandle.HandleV1.buildHandle(defaultTopic, defaultBrokerName, "1", "id"); + when(mQClientFactory.findBrokerAddressInPublish(defaultBrokerName)).thenReturn(null); + MQClientException e = assertThrows(MQClientException.class, () -> { + defaultMQProducerImpl.recallMessage(defaultTopic, handle); + }); + assertEquals("The broker service address not found", e.getErrorMessage()); + } + + @Test + public void testRecallMessage_success() + throws RemotingException, MQClientException, MQBrokerException, InterruptedException { + String handle = RecallMessageHandle.HandleV1.buildHandle(defaultTopic, defaultBrokerName, "1", "id"); + when(mQClientFactory.findBrokerAddressInPublish(defaultBrokerName)).thenReturn(defaultBrokerAddr); + when(mQClientAPIImpl.recallMessage(any(), any(), anyLong())).thenReturn("id"); + String result = defaultMQProducerImpl.recallMessage(defaultTopic, handle); + assertEquals("id", result); + } + private void setMQClientFactory() throws IllegalAccessException, NoSuchFieldException { setField(defaultMQProducerImpl, "mQClientFactory", mQClientFactory); } diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java index c0b557dfa11..bac2e2c7e40 100644 --- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java @@ -444,6 +444,8 @@ public class BrokerConfig extends BrokerIdentity { */ private String configManagerVersion = ConfigManagerVersion.V1.getVersion(); + private boolean allowRecallWhenBrokerNotWriteable = true; + public String getConfigBlackList() { return configBlackList; } @@ -1932,4 +1934,12 @@ public String getConfigManagerVersion() { public void setConfigManagerVersion(String configManagerVersion) { this.configManagerVersion = configManagerVersion; } + + public boolean isAllowRecallWhenBrokerNotWriteable() { + return allowRecallWhenBrokerNotWriteable; + } + + public void setAllowRecallWhenBrokerNotWriteable(boolean allowRecallWhenBrokerNotWriteable) { + this.allowRecallWhenBrokerNotWriteable = allowRecallWhenBrokerNotWriteable; + } } diff --git a/common/src/main/java/org/apache/rocketmq/common/producer/RecallMessageHandle.java b/common/src/main/java/org/apache/rocketmq/common/producer/RecallMessageHandle.java new file mode 100644 index 00000000000..b00b15bd863 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/producer/RecallMessageHandle.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.producer; + +import org.apache.commons.codec.DecoderException; +import org.apache.commons.lang3.StringUtils; + +import java.util.Base64; + +import static java.nio.charset.StandardCharsets.UTF_8; + +/** + * handle to recall a message, only support delay message for now + * v1 pattern like this: + * version topic brokerName timestamp messageId + * use Base64 to encode it + */ +public class RecallMessageHandle { + private static final String SEPARATOR = " "; + private static final String VERSION_1 = "v1"; + + public static class HandleV1 extends RecallMessageHandle { + private String version; + private String topic; + private String brokerName; + private String timestampStr; + private String messageId; // id of unique key + + public HandleV1(String topic, String brokerName, String timestamp, String messageId) { + this.version = VERSION_1; + this.topic = topic; + this.brokerName = brokerName; + this.timestampStr = timestamp; + this.messageId = messageId; + } + + // no param check + public static String buildHandle(String topic, String brokerName, String timestampStr, String messageId) { + String rawString = String.join(SEPARATOR, VERSION_1, topic, brokerName, timestampStr, messageId); + return Base64.getUrlEncoder().encodeToString(rawString.getBytes(UTF_8)); + } + + public String getTopic() { + return topic; + } + + public String getBrokerName() { + return brokerName; + } + + public String getTimestampStr() { + return timestampStr; + } + + public String getMessageId() { + return messageId; + } + + public String getVersion() { + return version; + } + } + + public static RecallMessageHandle decodeHandle(String handle) throws DecoderException { + if (StringUtils.isEmpty(handle)) { + throw new DecoderException("recall handle is invalid"); + } + String rawString; + try { + rawString = + new String(Base64.getUrlDecoder().decode(handle.getBytes(UTF_8)), UTF_8); + } catch (IllegalArgumentException e) { + throw new DecoderException("recall handle is invalid"); + } + String[] items = rawString.split(SEPARATOR); + if (!VERSION_1.equals(items[0]) || items.length < 5) { + throw new DecoderException("recall handle is invalid"); + } + return new HandleV1(items[1], items[2], items[3], items[4]); + } +} diff --git a/common/src/test/java/org/apache/rocketmq/common/producer/RecallMessageHandleTest.java b/common/src/test/java/org/apache/rocketmq/common/producer/RecallMessageHandleTest.java new file mode 100644 index 00000000000..56608227693 --- /dev/null +++ b/common/src/test/java/org/apache/rocketmq/common/producer/RecallMessageHandleTest.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.producer; + +import org.apache.commons.codec.DecoderException; +import org.apache.rocketmq.common.message.MessageClientIDSetter; +import org.junit.Assert; +import org.junit.Test; + +import java.nio.charset.StandardCharsets; +import java.util.Base64; + +public class RecallMessageHandleTest { + @Test + public void testHandleInvalid() { + Assert.assertThrows(DecoderException.class, () -> { + RecallMessageHandle.decodeHandle(""); + }); + Assert.assertThrows(DecoderException.class, () -> { + RecallMessageHandle.decodeHandle(null); + }); + + Assert.assertThrows(DecoderException.class, () -> { + String invalidHandle = Base64.getUrlEncoder().encodeToString("v1 a b c".getBytes(StandardCharsets.UTF_8)); + RecallMessageHandle.decodeHandle(invalidHandle); + }); + Assert.assertThrows(DecoderException.class, () -> { + String invalidHandle = Base64.getUrlEncoder().encodeToString("v2 a b c d".getBytes(StandardCharsets.UTF_8)); + RecallMessageHandle.decodeHandle(invalidHandle); + }); + Assert.assertThrows(DecoderException.class, () -> { + String invalidHandle = "v1 a b c d"; + RecallMessageHandle.decodeHandle(invalidHandle); + }); + } + + @Test + public void testEncodeAndDecodeV1() throws DecoderException { + String topic = "topic"; + String brokerName = "broker-0"; + String timestampStr = String.valueOf(System.currentTimeMillis()); + String messageId = MessageClientIDSetter.createUniqID(); + String handle = RecallMessageHandle.HandleV1.buildHandle(topic, brokerName, timestampStr, messageId); + RecallMessageHandle handleEntity = RecallMessageHandle.decodeHandle(handle); + Assert.assertTrue(handleEntity instanceof RecallMessageHandle.HandleV1); + RecallMessageHandle.HandleV1 handleV1 = (RecallMessageHandle.HandleV1) handleEntity; + Assert.assertEquals(handleV1.getVersion(), "v1"); + Assert.assertEquals(handleV1.getTopic(), topic); + Assert.assertEquals(handleV1.getBrokerName(), brokerName); + Assert.assertEquals(handleV1.getTimestampStr(), timestampStr); + Assert.assertEquals(handleV1.getMessageId(), messageId); + } +} diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/interceptor/RequestMapping.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/interceptor/RequestMapping.java index 866124d747c..f5edc03ba4a 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/interceptor/RequestMapping.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/interceptor/RequestMapping.java @@ -25,6 +25,7 @@ import apache.rocketmq.v2.NotifyClientTerminationRequest; import apache.rocketmq.v2.QueryAssignmentRequest; import apache.rocketmq.v2.QueryRouteRequest; +import apache.rocketmq.v2.RecallMessageRequest; import apache.rocketmq.v2.ReceiveMessageRequest; import apache.rocketmq.v2.SendMessageRequest; import java.util.HashMap; @@ -38,6 +39,7 @@ public class RequestMapping { put(QueryRouteRequest.getDescriptor().getFullName(), RequestCode.GET_ROUTEINFO_BY_TOPIC); put(HeartbeatRequest.getDescriptor().getFullName(), RequestCode.HEART_BEAT); put(SendMessageRequest.getDescriptor().getFullName(), RequestCode.SEND_MESSAGE_V2); + put(RecallMessageRequest.getDescriptor().getFullName(), RequestCode.RECALL_MESSAGE); put(QueryAssignmentRequest.getDescriptor().getFullName(), RequestCode.GET_ROUTEINFO_BY_TOPIC); put(ReceiveMessageRequest.getDescriptor().getFullName(), RequestCode.PULL_MESSAGE); put(AckMessageRequest.getDescriptor().getFullName(), RequestCode.UPDATE_CONSUMER_OFFSET); diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/DefaultGrpcMessingActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/DefaultGrpcMessingActivity.java index 091e9086ecc..3c6f120ee58 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/DefaultGrpcMessingActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/DefaultGrpcMessingActivity.java @@ -32,6 +32,8 @@ import apache.rocketmq.v2.QueryAssignmentResponse; import apache.rocketmq.v2.QueryRouteRequest; import apache.rocketmq.v2.QueryRouteResponse; +import apache.rocketmq.v2.RecallMessageRequest; +import apache.rocketmq.v2.RecallMessageResponse; import apache.rocketmq.v2.ReceiveMessageRequest; import apache.rocketmq.v2.ReceiveMessageResponse; import apache.rocketmq.v2.SendMessageRequest; @@ -51,6 +53,7 @@ import org.apache.rocketmq.proxy.grpc.v2.consumer.ChangeInvisibleDurationActivity; import org.apache.rocketmq.proxy.grpc.v2.consumer.ReceiveMessageActivity; import org.apache.rocketmq.proxy.grpc.v2.producer.ForwardMessageToDLQActivity; +import org.apache.rocketmq.proxy.grpc.v2.producer.RecallMessageActivity; import org.apache.rocketmq.proxy.grpc.v2.producer.SendMessageActivity; import org.apache.rocketmq.proxy.grpc.v2.route.RouteActivity; import org.apache.rocketmq.proxy.grpc.v2.transaction.EndTransactionActivity; @@ -65,6 +68,7 @@ public class DefaultGrpcMessingActivity extends AbstractStartAndShutdown impleme protected AckMessageActivity ackMessageActivity; protected ChangeInvisibleDurationActivity changeInvisibleDurationActivity; protected SendMessageActivity sendMessageActivity; + protected RecallMessageActivity recallMessageActivity; protected ForwardMessageToDLQActivity forwardMessageToDLQActivity; protected EndTransactionActivity endTransactionActivity; protected RouteActivity routeActivity; @@ -82,6 +86,7 @@ protected void init(MessagingProcessor messagingProcessor) { this.ackMessageActivity = new AckMessageActivity(messagingProcessor, grpcClientSettingsManager, grpcChannelManager); this.changeInvisibleDurationActivity = new ChangeInvisibleDurationActivity(messagingProcessor, grpcClientSettingsManager, grpcChannelManager); this.sendMessageActivity = new SendMessageActivity(messagingProcessor, grpcClientSettingsManager, grpcChannelManager); + this.recallMessageActivity = new RecallMessageActivity(messagingProcessor, grpcClientSettingsManager, grpcChannelManager); this.forwardMessageToDLQActivity = new ForwardMessageToDLQActivity(messagingProcessor, grpcClientSettingsManager, grpcChannelManager); this.endTransactionActivity = new EndTransactionActivity(messagingProcessor, grpcClientSettingsManager, grpcChannelManager); this.routeActivity = new RouteActivity(messagingProcessor, grpcClientSettingsManager, grpcChannelManager); @@ -145,6 +150,12 @@ public CompletableFuture changeInvisibleDuratio return this.changeInvisibleDurationActivity.changeInvisibleDuration(ctx, request); } + @Override + public CompletableFuture recallMessage(ProxyContext ctx, + RecallMessageRequest request) { + return this.recallMessageActivity.recallMessage(ctx, request); + } + @Override public ContextStreamObserver telemetry(StreamObserver responseObserver) { return this.clientActivity.telemetry(responseObserver); diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingApplication.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingApplication.java index 4f029dec336..c470eda55ca 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingApplication.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingApplication.java @@ -35,6 +35,8 @@ import apache.rocketmq.v2.QueryAssignmentResponse; import apache.rocketmq.v2.QueryRouteRequest; import apache.rocketmq.v2.QueryRouteResponse; +import apache.rocketmq.v2.RecallMessageRequest; +import apache.rocketmq.v2.RecallMessageResponse; import apache.rocketmq.v2.ReceiveMessageRequest; import apache.rocketmq.v2.ReceiveMessageResponse; import apache.rocketmq.v2.SendMessageRequest; @@ -371,6 +373,25 @@ public void changeInvisibleDuration(ChangeInvisibleDurationRequest request, } } + @Override + public void recallMessage(RecallMessageRequest request, StreamObserver responseObserver) { + Function statusResponseCreator = + status -> RecallMessageResponse.newBuilder().setStatus(status).build(); + ProxyContext context = createContext(); + try { + this.addExecutor(this.producerThreadPoolExecutor, // reuse producer thread pool + context, + request, + () -> grpcMessingActivity.recallMessage(context, request) + .whenComplete((response, throwable) -> + writeResponse(context, request, response, responseObserver, throwable, statusResponseCreator)), + responseObserver, + statusResponseCreator); + } catch (Throwable t) { + writeResponse(context, request, null, responseObserver, t, statusResponseCreator); + } + } + @Override public StreamObserver telemetry(StreamObserver responseObserver) { Function statusResponseCreator = status -> TelemetryCommand.newBuilder().setStatus(status).build(); diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessingActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessingActivity.java index 77bd3a88f9d..db15f25f6f7 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessingActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessingActivity.java @@ -33,6 +33,8 @@ import apache.rocketmq.v2.QueryAssignmentResponse; import apache.rocketmq.v2.QueryRouteRequest; import apache.rocketmq.v2.QueryRouteResponse; +import apache.rocketmq.v2.RecallMessageRequest; +import apache.rocketmq.v2.RecallMessageResponse; import apache.rocketmq.v2.ReceiveMessageRequest; import apache.rocketmq.v2.ReceiveMessageResponse; import apache.rocketmq.v2.SendMessageRequest; @@ -69,5 +71,7 @@ CompletableFuture notifyClientTermination(Proxy CompletableFuture changeInvisibleDuration(ProxyContext ctx, ChangeInvisibleDurationRequest request); + CompletableFuture recallMessage(ProxyContext ctx, RecallMessageRequest request); + ContextStreamObserver telemetry(StreamObserver responseObserver); } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/RecallMessageActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/RecallMessageActivity.java new file mode 100644 index 00000000000..28ec97dca34 --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/RecallMessageActivity.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.grpc.v2.producer; + +import apache.rocketmq.v2.Code; +import apache.rocketmq.v2.RecallMessageRequest; +import apache.rocketmq.v2.RecallMessageResponse; +import apache.rocketmq.v2.Resource; +import org.apache.rocketmq.proxy.common.ProxyContext; +import org.apache.rocketmq.proxy.grpc.v2.AbstractMessingActivity; +import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcChannelManager; +import org.apache.rocketmq.proxy.grpc.v2.common.GrpcClientSettingsManager; +import org.apache.rocketmq.proxy.grpc.v2.common.ResponseBuilder; +import org.apache.rocketmq.proxy.processor.MessagingProcessor; + +import java.time.Duration; +import java.util.concurrent.CompletableFuture; + +public class RecallMessageActivity extends AbstractMessingActivity { + + public RecallMessageActivity(MessagingProcessor messagingProcessor, + GrpcClientSettingsManager grpcClientSettingsManager, GrpcChannelManager grpcChannelManager) { + super(messagingProcessor, grpcClientSettingsManager, grpcChannelManager); + } + + public CompletableFuture recallMessage(ProxyContext ctx, + RecallMessageRequest request) { + CompletableFuture future = new CompletableFuture<>(); + + try { + Resource topic = request.getTopic(); + validateTopic(topic); + + future = this.messagingProcessor.recallMessage( + ctx, + topic.getName(), + request.getRecallHandle(), + Duration.ofSeconds(2).toMillis() + ).thenApply(result -> RecallMessageResponse.newBuilder() + .setMessageId(result) + .setStatus(ResponseBuilder.getInstance().buildStatus(Code.OK, Code.OK.name())) + .build()); + } catch (Throwable t) { + future.completeExceptionally(t); + } + return future; + } +} diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivity.java index 8a3d315c68c..f7b8014bb99 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivity.java @@ -341,6 +341,7 @@ protected SendMessageResponse convertToSendMessageResponse(ProxyContext ctx, Sen .setOffset(result.getQueueOffset()) .setMessageId(StringUtils.defaultString(result.getMsgId())) .setTransactionId(StringUtils.defaultString(result.getTransactionId())) + .setRecallHandle(StringUtils.defaultString(result.getRecallHandle())) .build(); break; default: diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java index 9c494d7a451..d0c0dd6e655 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java @@ -261,6 +261,12 @@ public CompletableFuture getMinOffset(ProxyContext ctx, MessageQueue messa return this.consumerProcessor.getMinOffset(ctx, messageQueue, timeoutMillis); } + @Override + public CompletableFuture recallMessage(ProxyContext ctx, String topic, + String recallHandle, long timeoutMillis) { + return this.producerProcessor.recallMessage(ctx, topic, recallHandle, timeoutMillis); + } + @Override public CompletableFuture request(ProxyContext ctx, String brokerName, RemotingCommand request, long timeoutMillis) { diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java index 03d28262d73..fee0465e2bf 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java @@ -260,6 +260,13 @@ CompletableFuture getMinOffset( long timeoutMillis ); + CompletableFuture recallMessage( + ProxyContext ctx, + String topic, + String recallHandle, + long timeoutMillis + ); + CompletableFuture request(ProxyContext ctx, String brokerName, RemotingCommand request, long timeoutMillis); diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java index 4f2d5280d37..43e16ddd2d7 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java @@ -19,6 +19,7 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; +import org.apache.commons.codec.DecoderException; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; @@ -32,6 +33,7 @@ import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageId; +import org.apache.rocketmq.common.producer.RecallMessageHandle; import org.apache.rocketmq.common.sysflag.MessageSysFlag; import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.common.utils.FutureUtils; @@ -49,6 +51,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.ResponseCode; import org.apache.rocketmq.remoting.protocol.header.ConsumerSendMsgBackRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.RecallMessageRequestHeader; import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader; public class ProducerProcessor extends AbstractProcessor { @@ -124,6 +127,33 @@ public CompletableFuture> sendMessage(ProxyContext ctx, QueueSe return FutureUtils.addExecutor(future, this.executor); } + public CompletableFuture recallMessage(ProxyContext ctx, String topic, + String recallHandle, long timeoutMillis) { + CompletableFuture future = new CompletableFuture<>(); + try { + if (ConfigurationManager.getProxyConfig().isEnableTopicMessageTypeCheck()) { + TopicMessageType messageType = serviceManager.getMetadataService().getTopicMessageType(ctx, topic); + topicMessageTypeValidator.validate(messageType, TopicMessageType.DELAY); + } + + RecallMessageHandle.HandleV1 handleEntity; + try { + handleEntity = (RecallMessageHandle.HandleV1) RecallMessageHandle.decodeHandle(recallHandle); + } catch (DecoderException e) { + throw new ProxyException(ProxyExceptionCode.INTERNAL_SERVER_ERROR, e.getMessage()); + } + String brokerName = handleEntity.getBrokerName(); + RecallMessageRequestHeader requestHeader = new RecallMessageRequestHeader(); + requestHeader.setTopic(topic); + requestHeader.setRecallHandle(recallHandle); + requestHeader.setBrokerName(brokerName); + future = serviceManager.getMessageService().recallMessage(ctx, brokerName, requestHeader, timeoutMillis); + } catch (Throwable t) { + future.completeExceptionally(t); + } + return FutureUtils.addExecutor(future, this.executor); + } + protected void fillTransactionData(ProxyContext ctx, String producerGroup, AddressableMessageQueue messageQueue, SendResult sendResult, List messageList) { try { MessageId id; diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java index 14c7c0db6fa..8c44305b42c 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java @@ -45,6 +45,7 @@ import org.apache.rocketmq.proxy.remoting.activity.GetTopicRouteActivity; import org.apache.rocketmq.proxy.remoting.activity.PopMessageActivity; import org.apache.rocketmq.proxy.remoting.activity.PullMessageActivity; +import org.apache.rocketmq.proxy.remoting.activity.RecallMessageActivity; import org.apache.rocketmq.proxy.remoting.activity.SendMessageActivity; import org.apache.rocketmq.proxy.remoting.activity.TransactionActivity; import org.apache.rocketmq.proxy.remoting.channel.RemotingChannelManager; @@ -75,6 +76,7 @@ public class RemotingProtocolServer implements StartAndShutdown, RemotingProxyOu protected final ClientManagerActivity clientManagerActivity; protected final ConsumerManagerActivity consumerManagerActivity; protected final SendMessageActivity sendMessageActivity; + protected final RecallMessageActivity recallMessageActivity; protected final TransactionActivity transactionActivity; protected final PullMessageActivity pullMessageActivity; protected final PopMessageActivity popMessageActivity; @@ -97,6 +99,7 @@ public RemotingProtocolServer(MessagingProcessor messagingProcessor, List getMinOffset(ProxyContext ctx, AddressableMessage ); } + @Override + public CompletableFuture recallMessage(ProxyContext ctx, String brokerName, + RecallMessageRequestHeader requestHeader, long timeoutMillis) { + return this.mqClientAPIFactory.getClient().recallMessageAsync( + this.resolveBrokerAddr(ctx, brokerName), + requestHeader, + timeoutMillis + ); + } + @Override public CompletableFuture request(ProxyContext ctx, String brokerName, RemotingCommand request, long timeoutMillis) { diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java index a8088a95d0a..cb9b7a4ae00 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java @@ -71,6 +71,8 @@ import org.apache.rocketmq.remoting.protocol.header.PopMessageResponseHeader; import org.apache.rocketmq.remoting.protocol.header.PullMessageRequestHeader; import org.apache.rocketmq.remoting.protocol.header.QueryConsumerOffsetRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.RecallMessageRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.RecallMessageResponseHeader; import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader; import org.apache.rocketmq.remoting.protocol.header.SendMessageResponseHeader; import org.apache.rocketmq.remoting.protocol.header.UpdateConsumerOffsetRequestHeader; @@ -153,6 +155,7 @@ public CompletableFuture> sendMessage(ProxyContext ctx, Address sendResult.setQueueOffset(responseHeader.getQueueOffset()); sendResult.setTransactionId(responseHeader.getTransactionId()); sendResult.setOffsetMsgId(responseHeader.getMsgId()); + sendResult.setRecallHandle(responseHeader.getRecallHandle()); return Collections.singletonList(sendResult); }); } @@ -470,6 +473,32 @@ public CompletableFuture getMinOffset(ProxyContext ctx, AddressableMessage throw new NotImplementedException("getMinOffset is not implemented in LocalMessageService"); } + @Override + public CompletableFuture recallMessage(ProxyContext ctx, String brokerName, + RecallMessageRequestHeader requestHeader, long timeoutMillis) { + SimpleChannel channel = channelManager.createChannel(ctx); + ChannelHandlerContext channelHandlerContext = channel.getChannelHandlerContext(); + RemotingCommand command = + LocalRemotingCommand.createRequestCommand(RequestCode.RECALL_MESSAGE, requestHeader, ctx.getLanguage()); + CompletableFuture future = new CompletableFuture<>(); + try { + RemotingCommand response = brokerController.getRecallMessageProcessor() + .processRequest(channelHandlerContext, command); + future.complete(response); + } catch (Exception e) { + log.error("Fail to process recallMessage command", e); + future.completeExceptionally(e); + } + return future.thenApply(r -> { + switch (r.getCode()) { + case ResponseCode.SUCCESS: + return ((RecallMessageResponseHeader) r.readCustomHeader()).getMsgId(); + default: + throw new ProxyException(ProxyExceptionCode.INTERNAL_SERVER_ERROR, r.getRemark()); + } + }); + } + @Override public CompletableFuture request(ProxyContext ctx, String brokerName, RemotingCommand request, long timeoutMillis) { diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/MessageService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/MessageService.java index 61accbc0412..80f5ae7217c 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/MessageService.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/MessageService.java @@ -40,6 +40,7 @@ import org.apache.rocketmq.remoting.protocol.header.PopMessageRequestHeader; import org.apache.rocketmq.remoting.protocol.header.PullMessageRequestHeader; import org.apache.rocketmq.remoting.protocol.header.QueryConsumerOffsetRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.RecallMessageRequestHeader; import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader; import org.apache.rocketmq.remoting.protocol.header.UpdateConsumerOffsetRequestHeader; @@ -155,6 +156,13 @@ CompletableFuture getMinOffset( long timeoutMillis ); + CompletableFuture recallMessage( + ProxyContext ctx, + String brokerName, + RecallMessageRequestHeader requestHeader, + long timeoutMillis + ); + CompletableFuture request(ProxyContext ctx, String brokerName, RemotingCommand request, long timeoutMillis); diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/RecallMessageActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/RecallMessageActivityTest.java new file mode 100644 index 00000000000..e42aeadbb6b --- /dev/null +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/RecallMessageActivityTest.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.grpc.v2.producer; + +import apache.rocketmq.v2.Code; +import apache.rocketmq.v2.RecallMessageRequest; +import apache.rocketmq.v2.RecallMessageResponse; +import apache.rocketmq.v2.Resource; +import org.apache.rocketmq.proxy.common.ProxyException; +import org.apache.rocketmq.proxy.common.ProxyExceptionCode; +import org.apache.rocketmq.proxy.grpc.v2.BaseActivityTest; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; + +import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.when; + +public class RecallMessageActivityTest extends BaseActivityTest { + private RecallMessageActivity recallMessageActivity; + + @Before + public void before() throws Throwable { + super.before(); + this.recallMessageActivity = + new RecallMessageActivity(messagingProcessor, grpcClientSettingsManager, grpcChannelManager); + } + + @Test + public void testRecallMessage_success() { + when(this.messagingProcessor.recallMessage(any(), any(), any(), anyLong())) + .thenReturn(CompletableFuture.completedFuture("msgId")); + + RecallMessageResponse response = this.recallMessageActivity.recallMessage( + createContext(), + RecallMessageRequest.newBuilder() + .setRecallHandle("handle") + .setTopic(Resource.newBuilder().setResourceNamespace("ns").setName("topic")) + .build() + ).join(); + + assertEquals(Code.OK, response.getStatus().getCode()); + assertEquals("msgId", response.getMessageId()); + } + + @Test + public void testRecallMessage_fail() { + CompletableFuture exceptionFuture = new CompletableFuture(); + when(this.messagingProcessor.recallMessage(any(), any(), any(), anyLong())).thenReturn(exceptionFuture); + exceptionFuture.completeExceptionally( + new ProxyException(ProxyExceptionCode.MESSAGE_PROPERTY_CONFLICT_WITH_TYPE, "info")); + + CompletionException exception = Assert.assertThrows(CompletionException.class, () -> { + this.recallMessageActivity.recallMessage( + createContext(), + RecallMessageRequest.newBuilder() + .setRecallHandle("handle") + .setTopic(Resource.newBuilder().setResourceNamespace("ns").setName("topic")) + .build() + ).join(); + }); + Assert.assertTrue(exception.getCause() instanceof ProxyException); + ProxyException cause = (ProxyException) exception.getCause(); + Assert.assertEquals(ProxyExceptionCode.MESSAGE_PROPERTY_CONFLICT_WITH_TYPE, cause.getCode()); + } +} diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ProducerProcessorTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ProducerProcessorTest.java index 3192d5c8dfb..6729ef0c4b3 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ProducerProcessorTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ProducerProcessorTest.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.Executors; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; @@ -34,20 +35,25 @@ import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.producer.RecallMessageHandle; import org.apache.rocketmq.common.sysflag.MessageSysFlag; import org.apache.rocketmq.common.utils.NetworkUtil; +import org.apache.rocketmq.proxy.common.ProxyException; +import org.apache.rocketmq.proxy.common.ProxyExceptionCode; import org.apache.rocketmq.proxy.service.route.AddressableMessageQueue; import org.apache.rocketmq.proxy.service.transaction.TransactionData; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.header.ConsumerSendMsgBackRequestHeader; import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader; import org.assertj.core.util.Lists; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentCaptor; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; @@ -205,6 +211,39 @@ public void testForwardMessageToDeadLetterQueue() throws Throwable { assertEquals(CONSUMER_GROUP, requestHeader.getGroup()); } + @Test + public void testRecallMessage_notDelayMessage() { + when(metadataService.getTopicMessageType(any(), any())).thenReturn(TopicMessageType.NORMAL); + CompletionException exception = Assert.assertThrows(CompletionException.class, () -> { + producerProcessor.recallMessage(createContext(), TOPIC, "handle", 3000).join(); + }); + assertTrue(exception.getCause() instanceof ProxyException); + ProxyException cause = (ProxyException) exception.getCause(); + assertEquals(ProxyExceptionCode.MESSAGE_PROPERTY_CONFLICT_WITH_TYPE, cause.getCode()); + } + + @Test + public void testRecallMessage_invalidRecallHandle() { + when(metadataService.getTopicMessageType(any(), any())).thenReturn(TopicMessageType.DELAY); + CompletionException exception = Assert.assertThrows(CompletionException.class, () -> { + producerProcessor.recallMessage(createContext(), TOPIC, "handle", 3000).join(); + }); + assertTrue(exception.getCause() instanceof ProxyException); + ProxyException cause = (ProxyException) exception.getCause(); + assertEquals("recall handle is invalid", cause.getMessage()); + } + + @Test + public void testRecallMessage_success() { + when(metadataService.getTopicMessageType(any(), any())).thenReturn(TopicMessageType.DELAY); + when(this.messageService.recallMessage(any(), any(), any(), anyLong())) + .thenReturn(CompletableFuture.completedFuture("msgId")); + + String handle = RecallMessageHandle.HandleV1.buildHandle(TOPIC, "brokerName", "timestampStr", "whateverId"); + String msgId = producerProcessor.recallMessage(createContext(), TOPIC, handle, 3000).join(); + assertEquals("msgId", msgId); + } + private static String createOffsetMsgId(long commitLogOffset) { int msgIDLength = 4 + 4 + 8; ByteBuffer byteBufferMsgId = ByteBuffer.allocate(msgIDLength); diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/RecallMessageActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/RecallMessageActivityTest.java new file mode 100644 index 00000000000..7d64923d774 --- /dev/null +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/RecallMessageActivityTest.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.remoting.activity; + +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; +import org.apache.rocketmq.common.attribute.TopicMessageType; +import org.apache.rocketmq.proxy.common.ProxyException; +import org.apache.rocketmq.proxy.common.ProxyExceptionCode; +import org.apache.rocketmq.proxy.config.InitConfigTest; +import org.apache.rocketmq.proxy.processor.MessagingProcessor; +import org.apache.rocketmq.proxy.service.channel.SimpleChannel; +import org.apache.rocketmq.proxy.service.channel.SimpleChannelHandlerContext; +import org.apache.rocketmq.proxy.service.metadata.MetadataService; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.remoting.protocol.RequestCode; +import org.apache.rocketmq.remoting.protocol.ResponseCode; +import org.apache.rocketmq.remoting.protocol.header.RecallMessageRequestHeader; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Spy; +import org.mockito.junit.MockitoJUnitRunner; + +import java.util.concurrent.CompletableFuture; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class RecallMessageActivityTest extends InitConfigTest { + private static final String TOPIC = "topic"; + private static final String GROUP = "group"; + private static final String BROKER_NAME = "brokerName"; + + private RecallMessageActivity recallMessageActivity; + @Mock + private MessagingProcessor messagingProcessor; + @Mock + private MetadataService metadataService; + + @Spy + private ChannelHandlerContext ctx = new SimpleChannelHandlerContext(new SimpleChannel(null, "1", "2")) { + @Override + public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) { + return null; + } + }; + + @Before + public void init() { + recallMessageActivity = new RecallMessageActivity(null, messagingProcessor); + when(messagingProcessor.getMetadataService()).thenReturn(metadataService); + } + + @Test + public void testRecallMessage_notDelayMessage() { + when(metadataService.getTopicMessageType(any(), eq(TOPIC))).thenReturn(TopicMessageType.NORMAL); + ProxyException exception = Assert.assertThrows(ProxyException.class, () -> { + recallMessageActivity.processRequest0(ctx, mockRequest(), null); + }); + Assert.assertEquals(ProxyExceptionCode.MESSAGE_PROPERTY_CONFLICT_WITH_TYPE, exception.getCode()); + } + + @Test + public void testRecallMessage_success() throws Exception { + when(metadataService.getTopicMessageType(any(), eq(TOPIC))).thenReturn(TopicMessageType.DELAY); + RemotingCommand request = mockRequest(); + RemotingCommand expectResponse = RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, ""); + when(messagingProcessor.request(any(), eq(BROKER_NAME), eq(request), anyLong())) + .thenReturn(CompletableFuture.completedFuture(expectResponse)); + RemotingCommand response = recallMessageActivity.processRequest0(ctx, request, null); + Assert.assertNull(response); + verify(ctx, times(1)).writeAndFlush(eq(expectResponse)); + } + + private RemotingCommand mockRequest() { + RecallMessageRequestHeader requestHeader = new RecallMessageRequestHeader(); + requestHeader.setProducerGroup(GROUP); + requestHeader.setTopic(TOPIC); + requestHeader.setRecallHandle("handle"); + requestHeader.setBrokerName(BROKER_NAME); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.RECALL_MESSAGE, requestHeader); + request.makeCustomHeaderToNet(); + return request; + } +} diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/message/LocalMessageServiceTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/message/LocalMessageServiceTest.java index f7a656d7682..20ce2a16848 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/message/LocalMessageServiceTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/message/LocalMessageServiceTest.java @@ -26,12 +26,14 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.processor.AckMessageProcessor; import org.apache.rocketmq.broker.processor.ChangeInvisibleTimeProcessor; import org.apache.rocketmq.broker.processor.EndTransactionProcessor; import org.apache.rocketmq.broker.processor.PopMessageProcessor; +import org.apache.rocketmq.broker.processor.RecallMessageProcessor; import org.apache.rocketmq.broker.processor.SendMessageProcessor; import org.apache.rocketmq.client.consumer.AckResult; import org.apache.rocketmq.client.consumer.AckStatus; @@ -68,8 +70,11 @@ import org.apache.rocketmq.remoting.protocol.header.ExtraInfoUtil; import org.apache.rocketmq.remoting.protocol.header.PopMessageRequestHeader; import org.apache.rocketmq.remoting.protocol.header.PopMessageResponseHeader; +import org.apache.rocketmq.remoting.protocol.header.RecallMessageRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.RecallMessageResponseHeader; import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader; import org.apache.rocketmq.remoting.protocol.header.SendMessageResponseHeader; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -94,6 +99,8 @@ public class LocalMessageServiceTest extends InitConfigTest { @Mock private AckMessageProcessor ackMessageProcessorMock; @Mock + private RecallMessageProcessor recallMessageProcessorMock; + @Mock private BrokerController brokerControllerMock; private ProxyContext proxyContext; @@ -122,6 +129,7 @@ public void setUp() throws Throwable { Mockito.when(brokerControllerMock.getChangeInvisibleTimeProcessor()).thenReturn(changeInvisibleTimeProcessorMock); Mockito.when(brokerControllerMock.getAckMessageProcessor()).thenReturn(ackMessageProcessorMock); Mockito.when(brokerControllerMock.getEndTransactionProcessor()).thenReturn(endTransactionProcessorMock); + Mockito.when(brokerControllerMock.getRecallMessageProcessor()).thenReturn(recallMessageProcessorMock); Mockito.when(brokerControllerMock.getBrokerConfig()).thenReturn(new BrokerConfig()); localMessageService = new LocalMessageService(brokerControllerMock, channelManager, null); proxyContext = ProxyContext.create().withVal(ContextVariable.REMOTE_ADDRESS, "0.0.0.1") @@ -424,6 +432,31 @@ public void testAckMessage() throws Exception { assertThat(ackResult.getStatus()).isEqualTo(AckStatus.OK); } + @Test + public void testRecallMessage_success() throws Exception { + RecallMessageResponseHeader responseHeader = new RecallMessageResponseHeader(); + responseHeader.setMsgId("msgId"); + RemotingCommand response = RemotingCommand.createResponseCommandWithHeader(ResponseCode.SUCCESS, responseHeader); + Mockito.when(recallMessageProcessorMock.processRequest(Mockito.any(SimpleChannelHandlerContext.class), + Mockito.any())).thenReturn(response); + RecallMessageRequestHeader requestHeader = new RecallMessageRequestHeader(); + String msgId = localMessageService.recallMessage(proxyContext, "brokerName", requestHeader, 1000L).join(); + assertThat(msgId).isEqualTo("msgId"); + } + + @Test + public void testRecallMessage_fail() throws Exception { + RecallMessageResponseHeader responseHeader = new RecallMessageResponseHeader(); + RemotingCommand response = RemotingCommand.createResponseCommandWithHeader(ResponseCode.SLAVE_NOT_AVAILABLE, responseHeader); + Mockito.when(recallMessageProcessorMock.processRequest(Mockito.any(SimpleChannelHandlerContext.class), + Mockito.any())).thenReturn(response); + RecallMessageRequestHeader requestHeader = new RecallMessageRequestHeader(); + CompletionException exception = Assert.assertThrows(CompletionException.class, () -> { + localMessageService.recallMessage(proxyContext, "brokerName", requestHeader, 1000L).join(); + }); + Assert.assertTrue(exception.getCause() instanceof ProxyException); + } + private MessageExt buildMessageExt(String topic, int queueId, long queueOffset) { MessageExt message1 = new MessageExt(); message1.setTopic(topic); diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java index cfc5cc22785..9e86422c482 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java @@ -220,6 +220,7 @@ public class RequestCode { public static final int CHECK_ROCKSDB_CQ_WRITE_PROGRESS = 354; public static final int LITE_PULL_MESSAGE = 361; + public static final int RECALL_MESSAGE = 370; public static final int QUERY_ASSIGNMENT = 400; public static final int SET_MESSAGE_REQUEST_MODE = 401; diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/RecallMessageRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/RecallMessageRequestHeader.java new file mode 100644 index 00000000000..c29883682a0 --- /dev/null +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/RecallMessageRequestHeader.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.remoting.protocol.header; + +import com.google.common.base.MoreObjects; +import org.apache.rocketmq.common.action.Action; +import org.apache.rocketmq.common.action.RocketMQAction; +import org.apache.rocketmq.common.resource.ResourceType; +import org.apache.rocketmq.common.resource.RocketMQResource; +import org.apache.rocketmq.remoting.annotation.CFNotNull; +import org.apache.rocketmq.remoting.annotation.CFNullable; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.protocol.RequestCode; +import org.apache.rocketmq.remoting.rpc.TopicRequestHeader; + +@RocketMQAction(value = RequestCode.RECALL_MESSAGE, action = Action.PUB) +public class RecallMessageRequestHeader extends TopicRequestHeader { + @CFNullable + private String producerGroup; + + @CFNotNull + @RocketMQResource(ResourceType.TOPIC) + private String topic; + + @CFNotNull + private String recallHandle; + + @Override + public void checkFields() throws RemotingCommandException { + } + + public String getProducerGroup() { + return producerGroup; + } + + public void setProducerGroup(String producerGroup) { + this.producerGroup = producerGroup; + } + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + public String getRecallHandle() { + return recallHandle; + } + + public void setRecallHandle(String recallHandle) { + this.recallHandle = recallHandle; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("producerGroup", producerGroup) + .add("topic", topic) + .add("recallHandle", recallHandle) + .toString(); + } +} diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/RecallMessageResponseHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/RecallMessageResponseHeader.java new file mode 100644 index 00000000000..1833cfcd053 --- /dev/null +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/RecallMessageResponseHeader.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.protocol.header; + +import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.annotation.CFNotNull; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; + +public class RecallMessageResponseHeader implements CommandCustomHeader { + @CFNotNull + private String msgId; + @Override + public void checkFields() throws RemotingCommandException { + } + + public String getMsgId() { + return msgId; + } + + public void setMsgId(String msgId) { + this.msgId = msgId; + } +} diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/SendMessageResponseHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/SendMessageResponseHeader.java index fe1e8533e54..7563b910331 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/SendMessageResponseHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/SendMessageResponseHeader.java @@ -36,6 +36,7 @@ public class SendMessageResponseHeader implements CommandCustomHeader, FastCodes private Long queueOffset; private String transactionId; private String batchUniqId; + private String recallHandle; @Override public void checkFields() throws RemotingCommandException { @@ -48,6 +49,7 @@ public void encode(ByteBuf out) { writeIfNotNull(out, "queueOffset", queueOffset); writeIfNotNull(out, "transactionId", transactionId); writeIfNotNull(out, "batchUniqId", batchUniqId); + writeIfNotNull(out, "recallHandle", recallHandle); } @Override @@ -76,6 +78,11 @@ public void decode(HashMap fields) throws RemotingCommandExcepti if (str != null) { this.batchUniqId = str; } + + str = fields.get("recallHandle"); + if (str != null) { + this.recallHandle = str; + } } public String getMsgId() { @@ -117,4 +124,12 @@ public String getBatchUniqId() { public void setBatchUniqId(String batchUniqId) { this.batchUniqId = batchUniqId; } + + public String getRecallHandle() { + return recallHandle; + } + + public void setRecallHandle(String recallHandle) { + this.recallHandle = recallHandle; + } } diff --git a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java index 2b14618eede..4287ce78ab0 100644 --- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java @@ -1599,7 +1599,8 @@ public void run() { if (null == uniqueKey) { LOGGER.warn("No uniqueKey for msg:{}", msgExt); } - if (null != uniqueKey && tr.getDeleteList() != null && tr.getDeleteList().size() > 0 && tr.getDeleteList().contains(uniqueKey)) { + if (null != uniqueKey && tr.getDeleteList() != null && tr.getDeleteList().size() > 0 + && tr.getDeleteList().contains(buildDeleteKey(getRealTopic(msgExt), uniqueKey))) { //Normally, it cancels out with the +1 above addMetric(msgExt, -1); doRes = true; @@ -1909,4 +1910,9 @@ public void setFrequency(AtomicInteger frequency) { public TimerCheckpoint getTimerCheckpoint() { return timerCheckpoint; } + + // identify a message by topic + uk, like query operation + public static String buildDeleteKey(String realTopic, String uniqueKey) { + return realTopic + "+" + uniqueKey; + } } diff --git a/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java b/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java index 52e58efde23..36853bb44fe 100644 --- a/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java @@ -359,7 +359,7 @@ public void testDeleteTimerMessage() throws Exception { MessageExtBrokerInner delMsg = buildMessage(delayMs, topic, false); transformTimerMessage(timerMessageStore,delMsg); - MessageAccessor.putProperty(delMsg, TimerMessageStore.TIMER_DELETE_UNIQUE_KEY, uniqKey); + MessageAccessor.putProperty(delMsg, TimerMessageStore.TIMER_DELETE_UNIQUE_KEY, TimerMessageStore.buildDeleteKey(topic, uniqKey)); delMsg.setPropertiesString(MessageDecoder.messageProperties2String(delMsg.getProperties())); assertEquals(PutMessageStatus.PUT_OK, messageStore.putMessage(delMsg).getPutMessageStatus()); @@ -374,6 +374,49 @@ public void testDeleteTimerMessage() throws Exception { assertNull(getOneMessage(topic, 0, 4, 500)); } + @Test + public void testDeleteTimerMessage_ukCollision() throws Exception { + String topic = "TimerTest_testDeleteTimerMessage"; + String collisionTopic = "TimerTest_testDeleteTimerMessage_collision"; + + TimerMessageStore timerMessageStore = createTimerMessageStore(null); + timerMessageStore.load(); + timerMessageStore.start(true); + + long curr = System.currentTimeMillis() / precisionMs * precisionMs; + long delayMs = curr + 1000; + + MessageExtBrokerInner inner = buildMessage(delayMs, topic, false); + transformTimerMessage(timerMessageStore, inner); + String firstUniqKey = MessageClientIDSetter.getUniqID(inner); + assertEquals(PutMessageStatus.PUT_OK, messageStore.putMessage(inner).getPutMessageStatus()); + + inner = buildMessage(delayMs, topic, false); + transformTimerMessage(timerMessageStore, inner); + String secondUniqKey = MessageClientIDSetter.getUniqID(inner); + assertEquals(PutMessageStatus.PUT_OK, messageStore.putMessage(inner).getPutMessageStatus()); + + MessageExtBrokerInner delMsg = buildMessage(delayMs, "whatever", false); + transformTimerMessage(timerMessageStore, delMsg); + MessageAccessor.putProperty(delMsg, TimerMessageStore.TIMER_DELETE_UNIQUE_KEY, TimerMessageStore.buildDeleteKey(topic, firstUniqKey)); + delMsg.setPropertiesString(MessageDecoder.messageProperties2String(delMsg.getProperties())); + assertEquals(PutMessageStatus.PUT_OK, messageStore.putMessage(delMsg).getPutMessageStatus()); + + delMsg = buildMessage(delayMs, "whatever", false); + transformTimerMessage(timerMessageStore, delMsg); + MessageAccessor.putProperty(delMsg, TimerMessageStore.TIMER_DELETE_UNIQUE_KEY, TimerMessageStore.buildDeleteKey(collisionTopic, secondUniqKey)); + delMsg.setPropertiesString(MessageDecoder.messageProperties2String(delMsg.getProperties())); + assertEquals(PutMessageStatus.PUT_OK, messageStore.putMessage(delMsg).getPutMessageStatus()); + + // The first one should have been deleted, the second one should not be deleted. + ByteBuffer msgBuff = getOneMessage(topic, 0, 0, 3000); + assertNotNull(msgBuff); + MessageExt msgExt = MessageDecoder.decode(msgBuff); + assertNotNull(msgExt); + assertNotEquals(firstUniqKey, MessageClientIDSetter.getUniqID(msgExt)); + assertEquals(secondUniqKey, MessageClientIDSetter.getUniqID(msgExt)); + } + @Test public void testPutDeleteTimerMessage() throws Exception { String topic = "TimerTest_testPutDeleteTimerMessage"; diff --git a/test/BUILD.bazel b/test/BUILD.bazel index e6703d69a01..80bd06539e8 100644 --- a/test/BUILD.bazel +++ b/test/BUILD.bazel @@ -117,6 +117,8 @@ GenTestRules( "src/test/java/org/apache/rocketmq/test/client/producer/oneway/OneWaySendIT", "src/test/java/org/apache/rocketmq/test/client/producer/oneway/OneWaySendWithMQIT", "src/test/java/org/apache/rocketmq/test/offset/OffsetNotFoundIT", + "src/test/java/org/apache/rocketmq/test/recall/RecallWithTraceIT", + "src/test/java/org/apache/rocketmq/test/recall/SendAndRecallDelayMessageIT", "src/test/java/org/apache/rocketmq/test/client/producer/querymsg/QueryMsgByIdIT", "src/test/java/org/apache/rocketmq/test/client/producer/oneway/OneWaySendWithSelectorIT", "src/test/java/org/apache/rocketmq/test/smoke/NormalMessageSendAndRecvIT", diff --git a/test/src/test/java/org/apache/rocketmq/test/grpc/v2/ClusterGrpcIT.java b/test/src/test/java/org/apache/rocketmq/test/grpc/v2/ClusterGrpcIT.java index 77f5f362125..b754466a916 100644 --- a/test/src/test/java/org/apache/rocketmq/test/grpc/v2/ClusterGrpcIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/grpc/v2/ClusterGrpcIT.java @@ -93,6 +93,11 @@ public void testSimpleConsumerSendAndRecvDelayMessage() throws Exception { super.testSimpleConsumerSendAndRecvDelayMessage(); } + @Test + public void testSimpleConsumerSendAndRecallDelayMessage() throws Exception { + super.testSimpleConsumerSendAndRecallDelayMessage(); + } + @Test public void testSimpleConsumerSendAndRecvBigMessage() throws Exception { super.testSimpleConsumerSendAndRecvBigMessage(); diff --git a/test/src/test/java/org/apache/rocketmq/test/grpc/v2/GrpcBaseIT.java b/test/src/test/java/org/apache/rocketmq/test/grpc/v2/GrpcBaseIT.java index 9d8f85b9981..534108c2805 100644 --- a/test/src/test/java/org/apache/rocketmq/test/grpc/v2/GrpcBaseIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/grpc/v2/GrpcBaseIT.java @@ -41,6 +41,8 @@ import apache.rocketmq.v2.QueryAssignmentResponse; import apache.rocketmq.v2.QueryRouteRequest; import apache.rocketmq.v2.QueryRouteResponse; +import apache.rocketmq.v2.RecallMessageRequest; +import apache.rocketmq.v2.RecallMessageResponse; import apache.rocketmq.v2.ReceiveMessageRequest; import apache.rocketmq.v2.ReceiveMessageResponse; import apache.rocketmq.v2.RecoverOrphanedTransactionCommand; @@ -393,6 +395,69 @@ public void testSimpleConsumerSendAndRecvDelayMessage() throws Exception { assertThat(Math.abs(recvTime.get() - sendTime - delayTime) < 2 * 1000).isTrue(); } + public void testSimpleConsumerSendAndRecallDelayMessage() throws Exception { + String topic = initTopicOnSampleTopicBroker(BROKER1_NAME, TopicMessageType.DELAY); + String group = MQRandomUtils.getRandomConsumerGroup(); + long delayTime = TimeUnit.SECONDS.toMillis(5); + + // init consumer offset + this.sendClientSettings(stub, buildSimpleConsumerClientSettings(group)).get(); + receiveMessage(blockingStub, topic, group, 1); + + this.sendClientSettings(stub, buildProducerClientSettings(topic)).get(); + String messageId = createUniqID(); + SendMessageResponse sendResponse = blockingStub.sendMessage(SendMessageRequest.newBuilder() + .addMessages(Message.newBuilder() + .setTopic(Resource.newBuilder() + .setName(topic) + .build()) + .setSystemProperties(SystemProperties.newBuilder() + .setMessageId(messageId) + .setQueueId(0) + .setMessageType(MessageType.DELAY) + .setBodyEncoding(Encoding.GZIP) + .setBornTimestamp(Timestamps.fromMillis(System.currentTimeMillis())) + .setBornHost(StringUtils.defaultString(NetworkUtil.getLocalAddress(), "127.0.0.1:1234")) + .setDeliveryTimestamp(Timestamps.fromMillis(System.currentTimeMillis() + delayTime)) + .build()) + .setBody(ByteString.copyFromUtf8("hello")) + .build()) + .build()); + long sendTime = System.currentTimeMillis(); + assertSendMessage(sendResponse, messageId); + String recallHandle = sendResponse.getEntries(0).getRecallHandle(); + assertThat(recallHandle).isNotEmpty(); + + RecallMessageRequest recallRequest = RecallMessageRequest.newBuilder() + .setRecallHandle(recallHandle) + .setTopic(Resource.newBuilder().setResourceNamespace("").setName(topic).build()) + .build(); + RecallMessageResponse recallResponse = + blockingStub.withDeadlineAfter(2, TimeUnit.SECONDS).recallMessage(recallRequest); + assertThat(recallResponse.getStatus()).isEqualTo( + ResponseBuilder.getInstance().buildStatus(Code.OK, Code.OK.name())); + assertThat(recallResponse.getMessageId()).isEqualTo(messageId); + + this.sendClientSettings(stub, buildSimpleConsumerClientSettings(group)).get(); + + AtomicLong recvTime = new AtomicLong(); + AtomicReference recvMessage = new AtomicReference<>(); + try { + await().atMost(java.time.Duration.ofSeconds(10)).until(() -> { + List messageList = getMessageFromReceiveMessageResponse(receiveMessage(blockingStub, topic, group)); + if (messageList.isEmpty()) { + return false; + } + recvTime.set(System.currentTimeMillis()); + recvMessage.set(messageList.get(0)); + return messageList.get(0).getSystemProperties().getMessageId().equals(messageId); + }); + } catch (Exception e) { + } + assertThat(recvTime.get()).isEqualTo(0L); + assertThat(recvMessage.get()).isNull(); + } + public void testSimpleConsumerSendAndRecvBigMessage() throws Exception { String topic = initTopicOnSampleTopicBroker(BROKER1_NAME); String group = MQRandomUtils.getRandomConsumerGroup(); @@ -427,6 +492,7 @@ public void testSimpleConsumerSendAndRecv() throws Exception { String messageId = createUniqID(); SendMessageResponse sendResponse = blockingStub.sendMessage(buildSendMessageRequest(topic, messageId)); assertSendMessage(sendResponse, messageId); + assertThat(sendResponse.getEntries(0).getRecallHandle()).isNullOrEmpty(); this.sendClientSettings(stub, buildSimpleConsumerClientSettings(group)).get(); diff --git a/test/src/test/java/org/apache/rocketmq/test/grpc/v2/LocalGrpcIT.java b/test/src/test/java/org/apache/rocketmq/test/grpc/v2/LocalGrpcIT.java index 515c3f121dd..5dd06f53420 100644 --- a/test/src/test/java/org/apache/rocketmq/test/grpc/v2/LocalGrpcIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/grpc/v2/LocalGrpcIT.java @@ -81,6 +81,11 @@ public void testSimpleConsumerSendAndRecvDelayMessage() throws Exception { super.testSimpleConsumerSendAndRecvDelayMessage(); } + @Test + public void testSimpleConsumerSendAndRecallDelayMessage() throws Exception { + super.testSimpleConsumerSendAndRecallDelayMessage(); + } + @Test public void testSimpleConsumerSendAndRecvBigMessage() throws Exception { super.testSimpleConsumerSendAndRecvBigMessage(); diff --git a/test/src/test/java/org/apache/rocketmq/test/recall/RecallWithTraceIT.java b/test/src/test/java/org/apache/rocketmq/test/recall/RecallWithTraceIT.java new file mode 100644 index 00000000000..d52c7002548 --- /dev/null +++ b/test/src/test/java/org/apache/rocketmq/test/recall/RecallWithTraceIT.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.recall; + +import org.apache.rocketmq.client.consumer.PopResult; +import org.apache.rocketmq.client.consumer.PopStatus; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.trace.TraceContext; +import org.apache.rocketmq.client.trace.TraceDataEncoder; +import org.apache.rocketmq.client.trace.TraceType; +import org.apache.rocketmq.common.attribute.CQType; +import org.apache.rocketmq.common.attribute.TopicMessageType; +import org.apache.rocketmq.common.message.MessageClientIDSetter; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.producer.RecallMessageHandle; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.apache.rocketmq.test.base.BaseConf; +import org.apache.rocketmq.test.base.IntegrationTestBase; +import org.apache.rocketmq.test.client.rmq.RMQPopConsumer; +import org.apache.rocketmq.test.factory.ConsumerFactory; +import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener; +import org.apache.rocketmq.test.util.MQRandomUtils; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static org.awaitility.Awaitility.await; + +public class RecallWithTraceIT extends BaseConf { + private static String topic; + private static String group; + private static DefaultMQProducer producer; + private static RMQPopConsumer popConsumer; + + @BeforeClass + public static void init() throws MQClientException { + System.setProperty("com.rocketmq.recall.default.trace.enable", Boolean.TRUE.toString()); + topic = MQRandomUtils.getRandomTopic(); + IntegrationTestBase.initTopic(topic, NAMESRV_ADDR, BROKER1_NAME, 1, CQType.SimpleCQ, TopicMessageType.NORMAL); + group = initConsumerGroup(); + producer = new DefaultMQProducer(group, true, topic); + producer.setNamesrvAddr(NAMESRV_ADDR); + producer.start(); + popConsumer = ConsumerFactory.getRMQPopConsumer(NAMESRV_ADDR, group, topic, "*", new RMQNormalListener()); + mqClients.add(popConsumer); + mqClients.add(producer); + } + + @AfterClass + public static void tearDown() { + shutdown(); + } + + @Test + public void testRecallTrace() throws MQBrokerException, RemotingException, InterruptedException, MQClientException { + String msgId = MessageClientIDSetter.createUniqID(); + String recallHandle = RecallMessageHandle.HandleV1.buildHandle(topic, BROKER1_NAME, + String.valueOf(System.currentTimeMillis() + 30000), msgId); + producer.recallMessage(topic, recallHandle); + + MessageQueue messageQueue = new MessageQueue(topic, BROKER1_NAME, 0); + String brokerAddress = brokerController1.getBrokerAddr(); + AtomicReference traceMessage = new AtomicReference(); + await() + .pollInterval(1, TimeUnit.SECONDS) + .atMost(15, TimeUnit.SECONDS) + .until(() -> { + PopResult popResult = popConsumer.pop(brokerAddress, messageQueue, 60 * 1000, -1); + boolean found = popResult.getPopStatus().equals(PopStatus.FOUND); + traceMessage.set(found ? popResult.getMsgFoundList().get(0) : null); + return found; + }); + + Assert.assertNotNull(traceMessage.get()); + TraceContext context = + TraceDataEncoder.decoderFromTraceDataString(new String(traceMessage.get().getBody())).get(0); + Assert.assertEquals(TraceType.Recall, context.getTraceType()); + Assert.assertEquals(group, context.getGroupName()); + Assert.assertTrue(context.isSuccess()); + Assert.assertEquals(msgId, context.getTraceBeans().get(0).getMsgId()); + } +} diff --git a/test/src/test/java/org/apache/rocketmq/test/recall/SendAndRecallDelayMessageIT.java b/test/src/test/java/org/apache/rocketmq/test/recall/SendAndRecallDelayMessageIT.java new file mode 100644 index 00000000000..2fb9e023712 --- /dev/null +++ b/test/src/test/java/org/apache/rocketmq/test/recall/SendAndRecallDelayMessageIT.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.recall; + +import org.apache.rocketmq.client.consumer.PopResult; +import org.apache.rocketmq.client.consumer.PopStatus; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.attribute.CQType; +import org.apache.rocketmq.common.attribute.TopicMessageType; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.producer.RecallMessageHandle; +import org.apache.rocketmq.test.base.BaseConf; +import org.apache.rocketmq.test.base.IntegrationTestBase; +import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; +import org.apache.rocketmq.test.client.rmq.RMQPopConsumer; +import org.apache.rocketmq.test.factory.ConsumerFactory; +import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener; +import org.apache.rocketmq.test.util.MQRandomUtils; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.awaitility.Awaitility.await; + +public class SendAndRecallDelayMessageIT extends BaseConf { + + private static String initTopic; + private static String consumerGroup; + private static RMQNormalProducer producer; + private static RMQPopConsumer popConsumer; + + @BeforeClass + public static void init() { + initTopic = initTopic(); + consumerGroup = initConsumerGroup(); + producer = getProducer(NAMESRV_ADDR, initTopic); + popConsumer = ConsumerFactory.getRMQPopConsumer(NAMESRV_ADDR, consumerGroup, initTopic, "*", new RMQNormalListener()); + mqClients.add(popConsumer); + } + + @AfterClass + public static void tearDown() { + shutdown(); + } + + @Test + public void testSendAndRecv() throws Exception { + int delaySecond = 1; + String topic = MQRandomUtils.getRandomTopic(); + IntegrationTestBase.initTopic(topic, NAMESRV_ADDR, BROKER1_NAME, 1, CQType.SimpleCQ, TopicMessageType.DELAY); + MessageQueue messageQueue = new MessageQueue(topic, BROKER1_NAME, 0); + String brokerAddress = brokerController1.getBrokerAddr(); + + List sendList = buildSendMessageList(topic, delaySecond); + List recvList = new ArrayList<>(); + + for (Message message : sendList) { + producer.getProducer().send(message); + } + + await() + .pollInterval(1, TimeUnit.SECONDS) + .atMost(delaySecond + 15, TimeUnit.SECONDS) + .until(() -> { + PopResult popResult = popConsumer.pop(brokerAddress, messageQueue, 60 * 1000, -1); + processPopResult(recvList, popResult); + return recvList.size() == sendList.size(); + }); + } + + @Test + public void testSendAndRecall() throws Exception { + int delaySecond = 5; + String topic = MQRandomUtils.getRandomTopic(); + IntegrationTestBase.initTopic(topic, NAMESRV_ADDR, BROKER1_NAME, 1, CQType.SimpleCQ, TopicMessageType.DELAY); + MessageQueue messageQueue = new MessageQueue(topic, BROKER1_NAME, 0); + String brokerAddress = brokerController1.getBrokerAddr(); + + List sendList = buildSendMessageList(topic, delaySecond); + List recvList = new ArrayList<>(); + int recallCount = 0; + + for (Message message : sendList) { + SendResult sendResult = producer.getProducer().send(message); + if (sendResult.getRecallHandle() != null) { + String messageId = producer.getProducer().recallMessage(topic, sendResult.getRecallHandle()); + assertEquals(sendResult.getMsgId(), messageId); + recallCount += 1; + } + } + assertEquals(sendList.size() - 2, recallCount); // one normal and one delay-level message + try { + await() + .pollInterval(1, TimeUnit.SECONDS) + .atMost(delaySecond + 15, TimeUnit.SECONDS) + .until(() -> { + PopResult popResult = popConsumer.pop(brokerAddress, messageQueue, 60 * 1000, -1); + processPopResult(recvList, popResult); + return recvList.size() == sendList.size(); + }); + } catch (Exception e) { + } + assertEquals(sendList.size() - recallCount, recvList.size()); + } + + @Test + public void testSendAndRecall_ukCollision() throws Exception { + int delaySecond = 5; + String topic = MQRandomUtils.getRandomTopic(); + String collisionTopic = MQRandomUtils.getRandomTopic(); + IntegrationTestBase.initTopic(topic, NAMESRV_ADDR, BROKER1_NAME, 1, CQType.SimpleCQ, TopicMessageType.DELAY); + IntegrationTestBase.initTopic(collisionTopic, NAMESRV_ADDR, BROKER1_NAME, 1, CQType.SimpleCQ, TopicMessageType.DELAY); + MessageQueue messageQueue = new MessageQueue(topic, BROKER1_NAME, 0); + String brokerAddress = brokerController1.getBrokerAddr(); + + List sendList = buildSendMessageList(topic, delaySecond); + List recvList = new ArrayList<>(); + int recallCount = 0; + + for (Message message : sendList) { + SendResult sendResult = producer.getProducer().send(message); + if (sendResult.getRecallHandle() != null) { + RecallMessageHandle.HandleV1 handleEntity = + (RecallMessageHandle.HandleV1) RecallMessageHandle.decodeHandle(sendResult.getRecallHandle()); + String collisionHandle = RecallMessageHandle.HandleV1.buildHandle(collisionTopic, + handleEntity.getBrokerName(), handleEntity.getTimestampStr(), handleEntity.getMessageId()); + String messageId = producer.getProducer().recallMessage(collisionTopic, collisionHandle); + assertEquals(sendResult.getMsgId(), messageId); + recallCount += 1; + } + } + assertEquals(sendList.size() - 2, recallCount); // one normal and one delay-level message + + try { + await() + .pollInterval(1, TimeUnit.SECONDS) + .atMost(delaySecond + 15, TimeUnit.SECONDS) + .until(() -> { + PopResult popResult = popConsumer.pop(brokerAddress, messageQueue, 60 * 1000, -1); + processPopResult(recvList, popResult); + return recvList.size() == sendList.size(); + }); + } catch (Exception e) { + } + assertEquals(sendList.size(), recvList.size()); + } + + private void processPopResult(List recvList, PopResult popResult) { + if (popResult.getPopStatus() == PopStatus.FOUND && popResult.getMsgFoundList() != null) { + recvList.addAll(popResult.getMsgFoundList()); + } + } + + private List buildSendMessageList(String topic, int delaySecond) { + Message msg0 = new Message(topic, "tag", "Hello RocketMQ".getBytes()); // not supported + + Message msg1 = new Message(topic, "tag", "Hello RocketMQ".getBytes()); // not supported + msg1.setDelayTimeLevel(2); + + Message msg2 = new Message(topic, "tag", "Hello RocketMQ".getBytes()); + msg2.setDelayTimeMs(delaySecond * 1000L); + + Message msg3 = new Message(topic, "tag", "Hello RocketMQ".getBytes()); + msg3.setDelayTimeSec(delaySecond); + + Message msg4 = new Message(topic, "tag", "Hello RocketMQ".getBytes()); + msg4.setDeliverTimeMs(System.currentTimeMillis() + delaySecond * 1000L); + + return Arrays.asList(msg0, msg1, msg2, msg3, msg4); + } +}