Skip to content

Commit

Permalink
[AMQ-8354] Fix ack replication if there was a replica restart after a…
Browse files Browse the repository at this point in the history
…dding a durable subscriber.
  • Loading branch information
NikitaShupletsov committed Aug 7, 2024
1 parent 8cd2a82 commit ab93e03
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,12 @@
import org.apache.activemq.broker.region.DurableTopicSubscription;
import org.apache.activemq.broker.region.IndirectMessageReference;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.Region;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.TopicRegion;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.LocalTransactionId;
import org.apache.activemq.command.MessageAck;
Expand All @@ -44,7 +42,6 @@
import org.apache.activemq.replica.storage.ReplicaSequenceStorage;
import org.apache.activemq.transaction.Transaction;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.util.SubscriptionKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -58,7 +55,6 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -271,7 +267,10 @@ private void processMessage(ActiveMQMessage message, ReplicaEventType eventType,
logger.trace("Processing replicated messages dropped");
try {
messageAck((MessageAck) deserializedData,
(List<String>) message.getObjectProperty(ReplicaSupport.MESSAGE_IDS_PROPERTY), transactionId);
(List<String>) message.getObjectProperty(ReplicaSupport.MESSAGE_IDS_PROPERTY),
message.getStringProperty(ReplicaSupport.SUBSCRIPTION_NAME_PROPERTY),
message.getStringProperty(ReplicaSupport.CLIENT_ID_PROPERTY),
transactionId);
} catch (JMSException e) {
logger.error("Failed to extract property to replicate messages dropped [{}]", deserializedData, e);
throw new Exception(e);
Expand Down Expand Up @@ -370,15 +369,6 @@ private boolean isTransactionExisted(TransactionId transactionId) throws Excepti
}
}

private boolean isExceptionDueToNonExistingMessage(JMSException exception) {
if (exception.getMessage().contains("Slave broker out of sync with master - Message:")
&& exception.getMessage().contains("does not exist among pending")) {
return true;
}

return false;
}

private void processBatch(ActiveMQMessage message, TransactionId tid) throws Exception {
List<Object> objects = eventSerializer.deserializeListOfObjects(message.getContent().getData());
for (Object o : objects) {
Expand Down Expand Up @@ -434,12 +424,29 @@ private void sendMessage(ActiveMQMessage message, TransactionId transactionId) t
}
}

private void messageDispatch(ConsumerId consumerId, ActiveMQDestination destination, String messageId) throws Exception {
private List<String> messageDispatch(MessageAck ack, List<String> messageIdsToAck) throws Exception {
List<String> existingMessageIdsToAck = new ArrayList<>();
for (String messageId : messageIdsToAck) {
try {
broker.processDispatchNotification(getMessageDispatchNotification(ack, messageId));
existingMessageIdsToAck.add(messageId);
} catch (JMSException e) {
if (e.getMessage().contains("Slave broker out of sync with master")) {
logger.warn("Skip MESSAGE_ACK processing event due to non-existing message [{}]", messageId);
} else {
throw e;
}
}
}
return existingMessageIdsToAck;
}

private static MessageDispatchNotification getMessageDispatchNotification(MessageAck ack, String messageId) {
MessageDispatchNotification mdn = new MessageDispatchNotification();
mdn.setConsumerId(consumerId);
mdn.setDestination(destination);
mdn.setConsumerId(ack.getConsumerId());
mdn.setDestination(ack.getDestination());
mdn.setMessageId(new MessageId(messageId));
broker.processDispatchNotification(mdn);
return mdn;
}

private void removeScheduledMessageProperties(ActiveMQMessage message) throws IOException {
Expand Down Expand Up @@ -529,12 +536,7 @@ private void commitTransaction(TransactionId xid, boolean onePhase) throws Excep

private void addDurableConsumer(ConsumerInfo consumerInfo, String clientId) throws Exception {
try {
boolean exists = broker.getDestinations(consumerInfo.getDestination()).stream()
.findFirst()
.map(Destination::getConsumers)
.stream().flatMap(Collection::stream)
.anyMatch(v -> v.getConsumerInfo().getSubscriptionName().equals(consumerInfo.getSubscriptionName()));
if (exists) {
if (getDurableSubscription(consumerInfo, clientId).isPresent()) {
// consumer already exists
return;
}
Expand All @@ -555,16 +557,7 @@ private void addDurableConsumer(ConsumerInfo consumerInfo, String clientId) thro

private void removeDurableConsumer(ConsumerInfo consumerInfo, String clientId) throws Exception {
try {
ConnectionContext context = broker.getDestinations(consumerInfo.getDestination()).stream()
.findFirst()
.map(Destination::getConsumers)
.stream().flatMap(Collection::stream)
.filter(v -> v.getConsumerInfo().getSubscriptionName().equals(consumerInfo.getSubscriptionName()))
.map(Subscription::getContext)

.filter(v -> clientId == null || clientId.equals(v.getClientId()))
.findFirst()
.orElse(null);
ConnectionContext context = getDurableSubscription(consumerInfo, clientId).map(Subscription::getContext).orElse(null);
if (context == null || !ReplicaSupport.REPLICATION_PLUGIN_USER_NAME.equals(context.getUserName())) {
// a real consumer had stolen the context before we got the message
return;
Expand Down Expand Up @@ -593,38 +586,38 @@ private void removeDurableConsumerSubscription(RemoveSubscriptionInfo subscripti
}
}

private void messageAck(MessageAck ack, List<String> messageIdsToAck, TransactionId transactionId) throws Exception {
private void messageAck(MessageAck ack, List<String> messageIdsToAck, String subscriptionName, String clientId,
TransactionId transactionId) throws Exception {
ActiveMQDestination destination = ack.getDestination();
MessageAck messageAck = new MessageAck();
ConsumerInfo consumerInfo = null;
ConnectionContext context = connectionContext;
try {
if (!isDestinationExisted(destination)) {
logger.warn("Skip MESSAGE_ACK processing event due to non-existing destination [{}]", destination.getPhysicalName());
return;
}
ConsumerInfo consumerInfo = null;
if (destination.isQueue()) {
consumerInfo = new ConsumerInfo();
consumerInfo.setConsumerId(ack.getConsumerId());
consumerInfo.setPrefetchSize(0);
consumerInfo.setDestination(destination);
broker.addConsumer(connectionContext, consumerInfo);
broker.addConsumer(context, consumerInfo);
} else if (destination.isTopic() && subscriptionName != null && clientId != null) {
consumerInfo = new ConsumerInfo();
consumerInfo.setConsumerId(ack.getConsumerId());
consumerInfo.setPrefetchSize(0);
consumerInfo.setDestination(destination);
consumerInfo.setSubscriptionName(subscriptionName);
context = connectionContext.copy();
context.setClientId(clientId);
context.setConnection(new DummyConnection());
broker.addConsumer(context, consumerInfo);
}

List<String> existingMessageIdsToAck = new ArrayList();
for (String messageId : messageIdsToAck) {
try {
messageDispatch(ack.getConsumerId(), destination, messageId);
existingMessageIdsToAck.add(messageId);
} catch (JMSException e) {
if (isExceptionDueToNonExistingMessage(e)) {
logger.warn("Skip MESSAGE_ACK processing event due to non-existing message [{}]", messageId);
} else {
throw e;
}
}
}
List<String> existingMessageIdsToAck = messageDispatch(ack, messageIdsToAck);

if (existingMessageIdsToAck.size() == 0) {
if (existingMessageIdsToAck.isEmpty()) {
return;
}

Expand All @@ -637,6 +630,7 @@ private void messageAck(MessageAck ack, List<String> messageIdsToAck, Transactio
if (messageAck.getTransactionId() == null || !messageAck.getTransactionId().isXATransaction()) {
messageAck.setTransactionId(transactionId);
}
messageAck.setTransactionId(null);

if (messageAck.isPoisonAck()) {
messageAck.setAckType(MessageAck.STANDARD_ACK_TYPE);
Expand All @@ -645,16 +639,16 @@ private void messageAck(MessageAck ack, List<String> messageIdsToAck, Transactio
ConsumerBrokerExchange consumerBrokerExchange = new ConsumerBrokerExchange();
consumerBrokerExchange.setConnectionContext(connectionContext);
broker.acknowledge(consumerBrokerExchange, messageAck);

if (consumerInfo != null) {
broker.removeConsumer(connectionContext, consumerInfo);
}
} catch (Exception e) {
logger.error("Unable to ack messages [{} <-> {}] for consumer {}",
ack.getFirstMessageId(),
ack.getLastMessageId(),
ack.getConsumerId(), e);
throw e;
} finally {
if (consumerInfo != null) {
broker.removeConsumer(context, consumerInfo);
}
}
}

Expand Down Expand Up @@ -682,4 +676,15 @@ private void createTransactionMapIfNotExist() {
connectionContext.setTransactions(new ConcurrentHashMap<>());
}
}

private Optional<Subscription> getDurableSubscription(ConsumerInfo consumerInfo, String clientId) {
return broker.getDestinations(consumerInfo.getDestination()).stream()
.findFirst()
.map(Destination::getConsumers)
.stream().flatMap(Collection::stream)
.filter(v -> v.getConsumerInfo().getSubscriptionName().equals(consumerInfo.getSubscriptionName()))
.filter(v -> clientId == null || clientId.equals(v.getContext().getClientId()))
.findFirst();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -681,7 +681,7 @@ public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack)

try {
super.acknowledge(consumerExchange, ack);
replicateAck(connectionContext, ack, transactionId, messageIdsToAck);
replicateAck(connectionContext, ack, subscription, transactionId, messageIdsToAck);
if (isInternalTransaction) {
super.commitTransaction(connectionContext, transactionId, true);
}
Expand All @@ -705,24 +705,30 @@ private List<String> getMessageIdsToAck(MessageAck ack, PrefetchSubscription sub
.collect(Collectors.toList());
}

private void replicateAck(ConnectionContext connectionContext, MessageAck ack, TransactionId transactionId,
List<String> messageIdsToAck) throws Exception {
private void replicateAck(ConnectionContext connectionContext, MessageAck ack, PrefetchSubscription subscription,
TransactionId transactionId, List<String> messageIdsToAck) throws Exception {
try {
TransactionId originalTransactionId = ack.getTransactionId();
enqueueReplicaEvent(
connectionContext,
new ReplicaEvent()
.setEventType(ReplicaEventType.MESSAGE_ACK)
.setEventData(eventSerializer.serializeReplicationData(ack))
.setTransactionId(transactionId)
.setReplicationProperty(ReplicaSupport.MESSAGE_IDS_PROPERTY, messageIdsToAck)
.setReplicationProperty(ReplicaSupport.IS_ORIGINAL_MESSAGE_SENT_TO_QUEUE_PROPERTY,
ack.getDestination().isQueue())
.setReplicationProperty(ReplicaSupport.ORIGINAL_MESSAGE_DESTINATION_PROPERTY,
ack.getDestination().toString())
.setReplicationProperty(ReplicaSupport.IS_ORIGINAL_MESSAGE_IN_XA_TRANSACTION_PROPERTY,
originalTransactionId != null && originalTransactionId.isXATransaction())
);
ActiveMQDestination destination = ack.getDestination();
ReplicaEvent event = new ReplicaEvent()
.setEventType(ReplicaEventType.MESSAGE_ACK)
.setEventData(eventSerializer.serializeReplicationData(ack))
.setTransactionId(transactionId)
.setReplicationProperty(ReplicaSupport.MESSAGE_IDS_PROPERTY, messageIdsToAck)
.setReplicationProperty(ReplicaSupport.IS_ORIGINAL_MESSAGE_SENT_TO_QUEUE_PROPERTY,
destination.isQueue())
.setReplicationProperty(ReplicaSupport.ORIGINAL_MESSAGE_DESTINATION_PROPERTY,
destination.toString())
.setReplicationProperty(ReplicaSupport.IS_ORIGINAL_MESSAGE_IN_XA_TRANSACTION_PROPERTY,
originalTransactionId != null && originalTransactionId.isXATransaction());
if (destination.isTopic() && subscription instanceof DurableTopicSubscription) {
event.setReplicationProperty(ReplicaSupport.CLIENT_ID_PROPERTY, connectionContext.getClientId());
event.setReplicationProperty(ReplicaSupport.SUBSCRIPTION_NAME_PROPERTY,
((DurableTopicSubscription) subscription).getSubscriptionKey().getSubscriptionName());
event.setVersion(3);
}

enqueueReplicaEvent(connectionContext, event);
} catch (Exception e) {
logger.error("Failed to replicate ack messages [{} <-> {}] for consumer {}",
ack.getFirstMessageId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ private ReplicaSupport() {
// Intentionally hidden
}

public static final int CURRENT_VERSION = 2;
public static final int CURRENT_VERSION = 3;
public static final int DEFAULT_VERSION = 1;

public static final int INTERMEDIATE_QUEUE_PREFETCH_SIZE = 10000;
Expand All @@ -54,6 +54,7 @@ private ReplicaSupport() {

public static final String TRANSACTION_ONE_PHASE_PROPERTY = "transactionOnePhaseProperty";
public static final String CLIENT_ID_PROPERTY = "clientIdProperty";
public static final String SUBSCRIPTION_NAME_PROPERTY = "subscriptionNameProperty";
public static final String IS_ORIGINAL_MESSAGE_SENT_TO_QUEUE_PROPERTY = "isOriginalMessageSentToQueueProperty";
public static final String ORIGINAL_MESSAGE_DESTINATION_PROPERTY = "originalMessageDestinationProperty";
public static final String IS_ORIGINAL_MESSAGE_IN_XA_TRANSACTION_PROPERTY = "isOriginalMessageInXaTransactionProperty";
Expand Down

0 comments on commit ab93e03

Please sign in to comment.