Skip to content

Commit

Permalink
[AMQ-8354] Fix removal of extra messages in the storage if there are …
Browse files Browse the repository at this point in the history
…any.
  • Loading branch information
NikitaShupletsov committed Aug 7, 2024
1 parent b15528e commit 0459be6
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,9 @@ private synchronized void processMessageWithRetries(ActiveMQMessage message, Tra
sequenceStorage.enqueue(connectionContext, tid, sequence.toString() + "#" + sequenceMessageId);

broker.commitTransaction(connectionContext, tid, true);

sequenceStorage.iterate();

acknowledgeCallback.setSafeToAck(true);
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,9 @@ private void iterateSend0() {

broker.commitTransaction(connectionContext, transactionId, true);

restoreSequenceStorage.iterate();
sequenceStorage.iterate();

sequence = newSequence;
} catch (Exception e) {
logger.error("Failed to persist messages in the main replication queue", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.LocalTransactionId;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
Expand All @@ -37,19 +38,23 @@
import org.apache.activemq.replica.ReplicaSupport;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.LongSequenceGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;

import static java.util.Objects.requireNonNull;

public abstract class ReplicaBaseStorage {

private final Logger logger = LoggerFactory.getLogger(ReplicaBaseStorage.class);

protected final ProducerId replicationProducerId = new ProducerId();
private final LongSequenceGenerator eventMessageIdGenerator = new LongSequenceGenerator();

protected Broker broker;
protected ConnectionContext connectionContext;
protected ReplicaInternalMessageProducer replicaInternalMessageProducer;
protected ActiveMQQueue destination;
protected Queue queue;
Expand All @@ -68,7 +73,7 @@ public ReplicaBaseStorage(Broker broker, ReplicaInternalMessageProducer replicaI
replicationProducerId.setConnectionId(new IdGenerator().generateId());
}

protected List<ActiveMQTextMessage> initializeBase(ConnectionContext connectionContext) throws Exception {
protected List<ActiveMQTextMessage> initializeBase(ConnectionContext connectionContext, boolean keepOnlyOneMessage) throws Exception {
queue = broker.getDestinations(destination).stream().findFirst()
.map(DestinationExtractor::extractQueue).orElseThrow();
ConnectionId connectionId = new ConnectionId(new IdGenerator(idGeneratorPrefix).generateId());
Expand All @@ -84,8 +89,43 @@ protected List<ActiveMQTextMessage> initializeBase(ConnectionContext connectionC
subscription = (PrefetchSubscription) broker.addConsumer(connectionContext, consumerInfo);
queue.iterate();

return subscription.getDispatched().stream().map(MessageReference::getMessage)
List<ActiveMQTextMessage> allMessages = subscription.getDispatched().stream().map(MessageReference::getMessage)
.map(ActiveMQTextMessage.class::cast).collect(Collectors.toList());


if (keepOnlyOneMessage) {
return keepOnlyOneMessage(connectionContext, allMessages);
}
return allMessages;

}

private List<ActiveMQTextMessage> keepOnlyOneMessage(ConnectionContext connectionContext, List<ActiveMQTextMessage> allMessages) throws Exception {
if (allMessages.size() < 2) {
return allMessages;
}

logger.error("Found more than one message during storage initialization. Destination: " + destination + ", selector: " + selector);

TransactionId transactionId = new LocalTransactionId(
new ConnectionId(ReplicaSupport.REPLICATION_PLUGIN_CONNECTION_ID),
ReplicaSupport.LOCAL_TRANSACTION_ID_GENERATOR.getNextSequenceId());

broker.beginTransaction(connectionContext, transactionId);
try {
MessageAck ack = new MessageAck(allMessages.get(allMessages.size() - 2).getMessage(), MessageAck.STANDARD_ACK_TYPE, allMessages.size() - 1);
ack.setFirstMessageId(allMessages.get(0).getMessageId());
ack.setDestination(destination);
ack.setTransactionId(transactionId);
acknowledge(connectionContext, ack);

broker.commitTransaction(connectionContext, transactionId, true);
} catch (Exception e) {
broker.rollbackTransaction(connectionContext, transactionId);
throw e;
}

return Collections.singletonList(allMessages.get(allMessages.size() - 1));
}

protected void acknowledgeAll(ConnectionContext connectionContext, TransactionId tid) throws Exception {
Expand Down Expand Up @@ -132,4 +172,8 @@ public void send(ConnectionContext connectionContext, TransactionId tid, String
public void send(ConnectionContext connectionContext, ActiveMQTextMessage seqMessage) throws Exception {
replicaInternalMessageProducer.sendForcingFlowControl(connectionContext, seqMessage);
}

public void iterate() {
queue.iterate();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public ReplicaRecoverySequenceStorage(Broker broker, ReplicaReplicationQueueSupp

public List<String> initialize(ConnectionContext connectionContext) throws Exception {
List<String> result = new ArrayList<>();
for (ActiveMQTextMessage message : super.initializeBase(connectionContext)) {
for (ActiveMQTextMessage message : super.initializeBase(connectionContext, false)) {
result.add(message.getText());
}
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,12 @@ public ReplicaRoleStorage(Broker broker, ReplicaReplicationQueueSupplier queuePr
}

public ReplicaRole initialize(ConnectionContext connectionContext) throws Exception {
List<ActiveMQTextMessage> allMessages = super.initializeBase(connectionContext);
List<ActiveMQTextMessage> allMessages = super.initializeBase(connectionContext, true);

if (allMessages.size() == 0) {
if (allMessages.isEmpty()) {
return null;
}

if (allMessages.size() > 1) {
logger.error("Found more than one message during role storage initialization");
for (int i = 0; i < allMessages.size() - 1; i++) {
queue.removeMessage(allMessages.get(i).getMessageId().toString());
}
}

return ReplicaRole.valueOf(allMessages.get(0).getText());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,27 +28,18 @@

public class ReplicaSequenceStorage extends ReplicaBaseSequenceStorage {

private final Logger logger = LoggerFactory.getLogger(ReplicaSequenceStorage.class);

public ReplicaSequenceStorage(Broker broker, ReplicaReplicationQueueSupplier queueProvider,
ReplicaInternalMessageProducer replicaInternalMessageProducer, String sequenceName) {
super(broker, queueProvider, replicaInternalMessageProducer, sequenceName);
}

public String initialize(ConnectionContext connectionContext) throws Exception {
List<ActiveMQTextMessage> allMessages = super.initializeBase(connectionContext);
List<ActiveMQTextMessage> allMessages = super.initializeBase(connectionContext, true);

if (allMessages.size() == 0) {
if (allMessages.isEmpty()) {
return null;
}

if (allMessages.size() > 1) {
logger.error("Found more than one message during sequence storage initialization");
for (int i = 0; i < allMessages.size() - 1; i++) {
queue.removeMessage(allMessages.get(i).getMessageId().toString());
}
}

return allMessages.get(0).getText();
}
}

0 comments on commit 0459be6

Please sign in to comment.