Skip to content

Commit

Permalink
[ISSUE #8974] Support recalling of delay message (#8975)
Browse files Browse the repository at this point in the history
  • Loading branch information
imzs authored Dec 9, 2024
1 parent 4571a85 commit bfb3d17
Show file tree
Hide file tree
Showing 54 changed files with 2,253 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import apache.rocketmq.v2.NotifyClientTerminationRequest;
import apache.rocketmq.v2.QueryAssignmentRequest;
import apache.rocketmq.v2.QueryRouteRequest;
import apache.rocketmq.v2.RecallMessageRequest;
import apache.rocketmq.v2.ReceiveMessageRequest;
import apache.rocketmq.v2.Resource;
import apache.rocketmq.v2.SendMessageRequest;
Expand Down Expand Up @@ -128,6 +129,9 @@ public static PlainAccessResource parse(RemotingCommand request, String remoteAd
final String topicV2 = request.getExtFields().get("b");
accessResource.addResourceAndPerm(topicV2, PlainAccessResource.isRetryTopic(topicV2) ? Permission.SUB : Permission.PUB);
break;
case RequestCode.RECALL_MESSAGE:
accessResource.addResourceAndPerm(request.getExtFields().get("topic"), Permission.PUB);
break;
case RequestCode.CONSUMER_SEND_MSG_BACK:
accessResource.addResourceAndPerm(getRetryTopic(request.getExtFields().get("group")), Permission.SUB);
break;
Expand Down Expand Up @@ -232,6 +236,9 @@ public static PlainAccessResource parse(GeneratedMessageV3 messageV3, Authentica
}
}
accessResource.addResourceAndPerm(topic, Permission.PUB);
} else if (RecallMessageRequest.getDescriptor().getFullName().equals(rpcFullName)) {
RecallMessageRequest request = (RecallMessageRequest) messageV3;
accessResource.addResourceAndPerm(request.getTopic(), Permission.PUB);
} else if (ReceiveMessageRequest.getDescriptor().getFullName().equals(rpcFullName)) {
ReceiveMessageRequest request = (ReceiveMessageRequest) messageV3;
accessResource.addGroupResourceAndPerm(request.getGroup(), Permission.SUB);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,15 @@

import java.util.HashMap;
import java.util.Map;
import apache.rocketmq.v2.RecallMessageRequest;
import apache.rocketmq.v2.Resource;
import com.google.protobuf.GeneratedMessageV3;
import org.apache.rocketmq.acl.common.AuthenticationHeader;
import org.apache.rocketmq.acl.common.Permission;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RequestCode;
import org.apache.rocketmq.remoting.protocol.header.RecallMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeaderV2;
import org.junit.Assert;
Expand All @@ -33,6 +38,8 @@ public class PlainAccessResourceTest {
public static final String DEFAULT_PRODUCER_GROUP = "PID_acl";
public static final String DEFAULT_CONSUMER_GROUP = "GID_acl";
public static final String DEFAULT_REMOTE_ADDR = "192.128.1.1";
public static final String AUTH_HEADER =
"Signature Credential=1234567890/test, SignedHeaders=host, Signature=1234567890";

@Test
public void testParseSendNormal() {
Expand Down Expand Up @@ -93,4 +100,34 @@ public void testParseSendRetryV2() {

Assert.assertEquals(permMap, accessResource.getResourcePermMap());
}

@Test
public void testParseRecallMessage() {
// remoting
RecallMessageRequestHeader requestHeader = new RecallMessageRequestHeader();
requestHeader.setTopic(DEFAULT_TOPIC);
requestHeader.setProducerGroup(DEFAULT_PRODUCER_GROUP);
requestHeader.setRecallHandle("handle");
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.RECALL_MESSAGE, requestHeader);
request.makeCustomHeaderToNet();

PlainAccessResource accessResource = PlainAccessResource.parse(request, DEFAULT_REMOTE_ADDR);
Assert.assertTrue(Permission.PUB == accessResource.getResourcePermMap().get(DEFAULT_TOPIC));

// grpc
GeneratedMessageV3 grpcRequest = RecallMessageRequest.newBuilder()
.setTopic(Resource.newBuilder().setName(DEFAULT_TOPIC).build())
.setRecallHandle("handle")
.build();
accessResource = PlainAccessResource.parse(grpcRequest, mockAuthenticationHeader());
Assert.assertTrue(Permission.PUB == accessResource.getResourcePermMap().get(DEFAULT_TOPIC));
}

private AuthenticationHeader mockAuthenticationHeader() {
return AuthenticationHeader.builder()
.remoteAddress(DEFAULT_REMOTE_ADDR)
.authorization(AUTH_HEADER)
.datetime("datetime")
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import apache.rocketmq.v2.NotifyClientTerminationRequest;
import apache.rocketmq.v2.QueryAssignmentRequest;
import apache.rocketmq.v2.QueryRouteRequest;
import apache.rocketmq.v2.RecallMessageRequest;
import apache.rocketmq.v2.ReceiveMessageRequest;
import apache.rocketmq.v2.SendMessageRequest;
import apache.rocketmq.v2.Subscription;
Expand Down Expand Up @@ -101,6 +102,10 @@ public List<DefaultAuthorizationContext> build(Metadata metadata, GeneratedMessa
}
result = newPubContext(metadata, request.getMessages(0).getTopic());
}
if (message instanceof RecallMessageRequest) {
RecallMessageRequest request = (RecallMessageRequest) message;
result = newPubContext(metadata, request.getTopic());
}
if (message instanceof EndTransactionRequest) {
EndTransactionRequest request = (EndTransactionRequest) message;
result = newPubContext(metadata, request.getTopic());
Expand Down Expand Up @@ -207,6 +212,10 @@ public List<DefaultAuthorizationContext> build(ChannelHandlerContext context, Re
result.add(DefaultAuthorizationContext.of(subject, topic, Action.PUB, sourceIp));
}
break;
case RequestCode.RECALL_MESSAGE:
topic = Resource.ofTopic(fields.get(TOPIC));
result.add(DefaultAuthorizationContext.of(subject, topic, Action.PUB, sourceIp));
break;
case RequestCode.END_TRANSACTION:
if (StringUtils.isNotBlank(fields.get(TOPIC))) {
topic = Resource.ofTopic(fields.get(TOPIC));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import apache.rocketmq.v2.Publishing;
import apache.rocketmq.v2.QueryAssignmentRequest;
import apache.rocketmq.v2.QueryRouteRequest;
import apache.rocketmq.v2.RecallMessageRequest;
import apache.rocketmq.v2.ReceiveMessageRequest;
import apache.rocketmq.v2.Resource;
import apache.rocketmq.v2.SendMessageRequest;
Expand Down Expand Up @@ -65,6 +66,7 @@
import org.apache.rocketmq.remoting.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.QueryConsumerOffsetRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.QueryMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.RecallMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeaderV2;
import org.apache.rocketmq.remoting.protocol.header.UnregisterClientRequestHeader;
Expand Down Expand Up @@ -122,6 +124,19 @@ public void buildGrpc() {
Assert.assertEquals(result.get(0).getChannelId(), "channel-id");
Assert.assertEquals(result.get(0).getRpcCode(), SendMessageRequest.getDescriptor().getFullName());

request = RecallMessageRequest.newBuilder()
.setTopic(Resource.newBuilder().setName("topic").build())
.setRecallHandle("handle")
.build();
result = builder.build(metadata, request);
Assert.assertEquals(1, result.size());
Assert.assertEquals(result.get(0).getSubject().getSubjectKey(), "User:rocketmq");
Assert.assertEquals(result.get(0).getResource().getResourceKey(), "Topic:topic");
Assert.assertTrue(result.get(0).getActions().containsAll(Arrays.asList(Action.PUB)));
Assert.assertEquals(result.get(0).getSourceIp(), "192.168.0.1");
Assert.assertEquals(result.get(0).getChannelId(), "channel-id");
Assert.assertEquals(result.get(0).getRpcCode(), RecallMessageRequest.getDescriptor().getFullName());

request = EndTransactionRequest.newBuilder()
.setTopic(Resource.newBuilder().setName("topic").build())
.build();
Expand Down Expand Up @@ -315,6 +330,22 @@ public void buildRemoting() {
Assert.assertEquals("Group:group", result.get(0).getResource().getResourceKey());
Assert.assertTrue(result.get(0).getActions().containsAll(Arrays.asList(Action.SUB)));

RecallMessageRequestHeader recallMessageRequestHeader = new RecallMessageRequestHeader();
recallMessageRequestHeader.setTopic("topic");
recallMessageRequestHeader.setRecallHandle("handle");
request = RemotingCommand.createRequestCommand(RequestCode.RECALL_MESSAGE, recallMessageRequestHeader);
request.setVersion(441);
request.addExtField("AccessKey", "rocketmq");
request.makeCustomHeaderToNet();
result = builder.build(channelHandlerContext, request);
Assert.assertEquals(1, result.size());
Assert.assertEquals("User:rocketmq", result.get(0).getSubject().getSubjectKey());
Assert.assertEquals("Topic:topic", result.get(0).getResource().getResourceKey());
Assert.assertTrue(result.get(0).getActions().containsAll(Arrays.asList(Action.PUB)));
Assert.assertEquals("192.168.0.1", result.get(0).getSourceIp());
Assert.assertEquals("channel-id", result.get(0).getChannelId());
Assert.assertEquals(RequestCode.RECALL_MESSAGE + "", result.get(0).getRpcCode());

EndTransactionRequestHeader endTransactionRequestHeader = new EndTransactionRequestHeader();
endTransactionRequestHeader.setTopic("topic");
request = RemotingCommand.createRequestCommand(RequestCode.END_TRANSACTION, endTransactionRequestHeader);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
import org.apache.rocketmq.broker.processor.PullMessageProcessor;
import org.apache.rocketmq.broker.processor.QueryAssignmentProcessor;
import org.apache.rocketmq.broker.processor.QueryMessageProcessor;
import org.apache.rocketmq.broker.processor.RecallMessageProcessor;
import org.apache.rocketmq.broker.processor.ReplyMessageProcessor;
import org.apache.rocketmq.broker.processor.SendMessageProcessor;
import org.apache.rocketmq.broker.schedule.ScheduleMessageService;
Expand Down Expand Up @@ -210,6 +211,7 @@ public class BrokerController {
protected final QueryAssignmentProcessor queryAssignmentProcessor;
protected final ClientManageProcessor clientManageProcessor;
protected final SendMessageProcessor sendMessageProcessor;
protected final RecallMessageProcessor recallMessageProcessor;
protected final ReplyMessageProcessor replyMessageProcessor;
protected final PullRequestHoldService pullRequestHoldService;
protected final MessageArrivingListener messageArrivingListener;
Expand Down Expand Up @@ -369,6 +371,7 @@ public BrokerController(
this.ackMessageProcessor = new AckMessageProcessor(this);
this.changeInvisibleTimeProcessor = new ChangeInvisibleTimeProcessor(this);
this.sendMessageProcessor = new SendMessageProcessor(this);
this.recallMessageProcessor = new RecallMessageProcessor(this);
this.replyMessageProcessor = new ReplyMessageProcessor(this);
this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService, this.popMessageProcessor, this.notificationProcessor);
this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this);
Expand Down Expand Up @@ -1096,10 +1099,12 @@ public void registerProcessor() {
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendMessageProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendMessageProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendMessageProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.RECALL_MESSAGE, recallMessageProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendMessageProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendMessageProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendMessageProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendMessageProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.RECALL_MESSAGE, recallMessageProcessor, this.sendMessageExecutor);
/**
* PullMessageProcessor
*/
Expand Down Expand Up @@ -2424,6 +2429,10 @@ public SendMessageProcessor getSendMessageProcessor() {
return sendMessageProcessor;
}

public RecallMessageProcessor getRecallMessageProcessor() {
return recallMessageProcessor;
}

public QueryAssignmentProcessor getQueryAssignmentProcessor() {
return queryAssignmentProcessor;
}
Expand Down
Loading

0 comments on commit bfb3d17

Please sign in to comment.