diff --git a/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaBrokerEventListener.java b/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaBrokerEventListener.java index 54eb7ee96c3..c005a1453f5 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaBrokerEventListener.java +++ b/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaBrokerEventListener.java @@ -26,14 +26,12 @@ import org.apache.activemq.broker.region.DurableTopicSubscription; import org.apache.activemq.broker.region.IndirectMessageReference; import org.apache.activemq.broker.region.Queue; -import org.apache.activemq.broker.region.Region; import org.apache.activemq.broker.region.RegionBroker; import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.TopicRegion; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ConnectionId; -import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.LocalTransactionId; import org.apache.activemq.command.MessageAck; @@ -44,7 +42,6 @@ import org.apache.activemq.replica.storage.ReplicaSequenceStorage; import org.apache.activemq.transaction.Transaction; import org.apache.activemq.usage.MemoryUsage; -import org.apache.activemq.util.SubscriptionKey; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,7 +55,6 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; -import java.util.Objects; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicReference; @@ -271,7 +267,10 @@ private void processMessage(ActiveMQMessage message, ReplicaEventType eventType, logger.trace("Processing replicated messages dropped"); try { messageAck((MessageAck) deserializedData, - (List) message.getObjectProperty(ReplicaSupport.MESSAGE_IDS_PROPERTY), transactionId); + (List) message.getObjectProperty(ReplicaSupport.MESSAGE_IDS_PROPERTY), + message.getStringProperty(ReplicaSupport.SUBSCRIPTION_NAME_PROPERTY), + message.getStringProperty(ReplicaSupport.CLIENT_ID_PROPERTY), + transactionId); } catch (JMSException e) { logger.error("Failed to extract property to replicate messages dropped [{}]", deserializedData, e); throw new Exception(e); @@ -370,15 +369,6 @@ private boolean isTransactionExisted(TransactionId transactionId) throws Excepti } } - private boolean isExceptionDueToNonExistingMessage(JMSException exception) { - if (exception.getMessage().contains("Slave broker out of sync with master - Message:") - && exception.getMessage().contains("does not exist among pending")) { - return true; - } - - return false; - } - private void processBatch(ActiveMQMessage message, TransactionId tid) throws Exception { List objects = eventSerializer.deserializeListOfObjects(message.getContent().getData()); for (Object o : objects) { @@ -434,12 +424,29 @@ private void sendMessage(ActiveMQMessage message, TransactionId transactionId) t } } - private void messageDispatch(ConsumerId consumerId, ActiveMQDestination destination, String messageId) throws Exception { + private List messageDispatch(MessageAck ack, List messageIdsToAck) throws Exception { + List existingMessageIdsToAck = new ArrayList<>(); + for (String messageId : messageIdsToAck) { + try { + broker.processDispatchNotification(getMessageDispatchNotification(ack, messageId)); + existingMessageIdsToAck.add(messageId); + } catch (JMSException e) { + if (e.getMessage().contains("Slave broker out of sync with master")) { + logger.warn("Skip MESSAGE_ACK processing event due to non-existing message [{}]", messageId); + } else { + throw e; + } + } + } + return existingMessageIdsToAck; + } + + private static MessageDispatchNotification getMessageDispatchNotification(MessageAck ack, String messageId) { MessageDispatchNotification mdn = new MessageDispatchNotification(); - mdn.setConsumerId(consumerId); - mdn.setDestination(destination); + mdn.setConsumerId(ack.getConsumerId()); + mdn.setDestination(ack.getDestination()); mdn.setMessageId(new MessageId(messageId)); - broker.processDispatchNotification(mdn); + return mdn; } private void removeScheduledMessageProperties(ActiveMQMessage message) throws IOException { @@ -529,12 +536,7 @@ private void commitTransaction(TransactionId xid, boolean onePhase) throws Excep private void addDurableConsumer(ConsumerInfo consumerInfo, String clientId) throws Exception { try { - boolean exists = broker.getDestinations(consumerInfo.getDestination()).stream() - .findFirst() - .map(Destination::getConsumers) - .stream().flatMap(Collection::stream) - .anyMatch(v -> v.getConsumerInfo().getSubscriptionName().equals(consumerInfo.getSubscriptionName())); - if (exists) { + if (getDurableSubscription(consumerInfo, clientId).isPresent()) { // consumer already exists return; } @@ -555,16 +557,7 @@ private void addDurableConsumer(ConsumerInfo consumerInfo, String clientId) thro private void removeDurableConsumer(ConsumerInfo consumerInfo, String clientId) throws Exception { try { - ConnectionContext context = broker.getDestinations(consumerInfo.getDestination()).stream() - .findFirst() - .map(Destination::getConsumers) - .stream().flatMap(Collection::stream) - .filter(v -> v.getConsumerInfo().getSubscriptionName().equals(consumerInfo.getSubscriptionName())) - .map(Subscription::getContext) - - .filter(v -> clientId == null || clientId.equals(v.getClientId())) - .findFirst() - .orElse(null); + ConnectionContext context = getDurableSubscription(consumerInfo, clientId).map(Subscription::getContext).orElse(null); if (context == null || !ReplicaSupport.REPLICATION_PLUGIN_USER_NAME.equals(context.getUserName())) { // a real consumer had stolen the context before we got the message return; @@ -593,38 +586,38 @@ private void removeDurableConsumerSubscription(RemoveSubscriptionInfo subscripti } } - private void messageAck(MessageAck ack, List messageIdsToAck, TransactionId transactionId) throws Exception { + private void messageAck(MessageAck ack, List messageIdsToAck, String subscriptionName, String clientId, + TransactionId transactionId) throws Exception { ActiveMQDestination destination = ack.getDestination(); MessageAck messageAck = new MessageAck(); + ConsumerInfo consumerInfo = null; + ConnectionContext context = connectionContext; try { if (!isDestinationExisted(destination)) { logger.warn("Skip MESSAGE_ACK processing event due to non-existing destination [{}]", destination.getPhysicalName()); return; } - ConsumerInfo consumerInfo = null; if (destination.isQueue()) { consumerInfo = new ConsumerInfo(); consumerInfo.setConsumerId(ack.getConsumerId()); consumerInfo.setPrefetchSize(0); consumerInfo.setDestination(destination); - broker.addConsumer(connectionContext, consumerInfo); + broker.addConsumer(context, consumerInfo); + } else if (destination.isTopic() && subscriptionName != null && clientId != null) { + consumerInfo = new ConsumerInfo(); + consumerInfo.setConsumerId(ack.getConsumerId()); + consumerInfo.setPrefetchSize(0); + consumerInfo.setDestination(destination); + consumerInfo.setSubscriptionName(subscriptionName); + context = connectionContext.copy(); + context.setClientId(clientId); + context.setConnection(new DummyConnection()); + broker.addConsumer(context, consumerInfo); } - List existingMessageIdsToAck = new ArrayList(); - for (String messageId : messageIdsToAck) { - try { - messageDispatch(ack.getConsumerId(), destination, messageId); - existingMessageIdsToAck.add(messageId); - } catch (JMSException e) { - if (isExceptionDueToNonExistingMessage(e)) { - logger.warn("Skip MESSAGE_ACK processing event due to non-existing message [{}]", messageId); - } else { - throw e; - } - } - } + List existingMessageIdsToAck = messageDispatch(ack, messageIdsToAck); - if (existingMessageIdsToAck.size() == 0) { + if (existingMessageIdsToAck.isEmpty()) { return; } @@ -637,6 +630,7 @@ private void messageAck(MessageAck ack, List messageIdsToAck, Transactio if (messageAck.getTransactionId() == null || !messageAck.getTransactionId().isXATransaction()) { messageAck.setTransactionId(transactionId); } + messageAck.setTransactionId(null); if (messageAck.isPoisonAck()) { messageAck.setAckType(MessageAck.STANDARD_ACK_TYPE); @@ -645,16 +639,16 @@ private void messageAck(MessageAck ack, List messageIdsToAck, Transactio ConsumerBrokerExchange consumerBrokerExchange = new ConsumerBrokerExchange(); consumerBrokerExchange.setConnectionContext(connectionContext); broker.acknowledge(consumerBrokerExchange, messageAck); - - if (consumerInfo != null) { - broker.removeConsumer(connectionContext, consumerInfo); - } } catch (Exception e) { logger.error("Unable to ack messages [{} <-> {}] for consumer {}", ack.getFirstMessageId(), ack.getLastMessageId(), ack.getConsumerId(), e); throw e; + } finally { + if (consumerInfo != null) { + broker.removeConsumer(context, consumerInfo); + } } } @@ -682,4 +676,15 @@ private void createTransactionMapIfNotExist() { connectionContext.setTransactions(new ConcurrentHashMap<>()); } } + + private Optional getDurableSubscription(ConsumerInfo consumerInfo, String clientId) { + return broker.getDestinations(consumerInfo.getDestination()).stream() + .findFirst() + .map(Destination::getConsumers) + .stream().flatMap(Collection::stream) + .filter(v -> v.getConsumerInfo().getSubscriptionName().equals(consumerInfo.getSubscriptionName())) + .filter(v -> clientId == null || clientId.equals(v.getContext().getClientId())) + .findFirst(); + } + } diff --git a/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaSourceBroker.java b/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaSourceBroker.java index 32b041b1cab..fdc95f4e2af 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaSourceBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaSourceBroker.java @@ -681,7 +681,7 @@ public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) try { super.acknowledge(consumerExchange, ack); - replicateAck(connectionContext, ack, transactionId, messageIdsToAck); + replicateAck(connectionContext, ack, subscription, transactionId, messageIdsToAck); if (isInternalTransaction) { super.commitTransaction(connectionContext, transactionId, true); } @@ -705,24 +705,30 @@ private List getMessageIdsToAck(MessageAck ack, PrefetchSubscription sub .collect(Collectors.toList()); } - private void replicateAck(ConnectionContext connectionContext, MessageAck ack, TransactionId transactionId, - List messageIdsToAck) throws Exception { + private void replicateAck(ConnectionContext connectionContext, MessageAck ack, PrefetchSubscription subscription, + TransactionId transactionId, List messageIdsToAck) throws Exception { try { TransactionId originalTransactionId = ack.getTransactionId(); - enqueueReplicaEvent( - connectionContext, - new ReplicaEvent() - .setEventType(ReplicaEventType.MESSAGE_ACK) - .setEventData(eventSerializer.serializeReplicationData(ack)) - .setTransactionId(transactionId) - .setReplicationProperty(ReplicaSupport.MESSAGE_IDS_PROPERTY, messageIdsToAck) - .setReplicationProperty(ReplicaSupport.IS_ORIGINAL_MESSAGE_SENT_TO_QUEUE_PROPERTY, - ack.getDestination().isQueue()) - .setReplicationProperty(ReplicaSupport.ORIGINAL_MESSAGE_DESTINATION_PROPERTY, - ack.getDestination().toString()) - .setReplicationProperty(ReplicaSupport.IS_ORIGINAL_MESSAGE_IN_XA_TRANSACTION_PROPERTY, - originalTransactionId != null && originalTransactionId.isXATransaction()) - ); + ActiveMQDestination destination = ack.getDestination(); + ReplicaEvent event = new ReplicaEvent() + .setEventType(ReplicaEventType.MESSAGE_ACK) + .setEventData(eventSerializer.serializeReplicationData(ack)) + .setTransactionId(transactionId) + .setReplicationProperty(ReplicaSupport.MESSAGE_IDS_PROPERTY, messageIdsToAck) + .setReplicationProperty(ReplicaSupport.IS_ORIGINAL_MESSAGE_SENT_TO_QUEUE_PROPERTY, + destination.isQueue()) + .setReplicationProperty(ReplicaSupport.ORIGINAL_MESSAGE_DESTINATION_PROPERTY, + destination.toString()) + .setReplicationProperty(ReplicaSupport.IS_ORIGINAL_MESSAGE_IN_XA_TRANSACTION_PROPERTY, + originalTransactionId != null && originalTransactionId.isXATransaction()); + if (destination.isTopic() && subscription instanceof DurableTopicSubscription) { + event.setReplicationProperty(ReplicaSupport.CLIENT_ID_PROPERTY, connectionContext.getClientId()); + event.setReplicationProperty(ReplicaSupport.SUBSCRIPTION_NAME_PROPERTY, + ((DurableTopicSubscription) subscription).getSubscriptionKey().getSubscriptionName()); + event.setVersion(3); + } + + enqueueReplicaEvent(connectionContext, event); } catch (Exception e) { logger.error("Failed to replicate ack messages [{} <-> {}] for consumer {}", ack.getFirstMessageId(), diff --git a/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaSupport.java b/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaSupport.java index 3b4f7750c79..cd8151dc5e6 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaSupport.java +++ b/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaSupport.java @@ -33,7 +33,7 @@ private ReplicaSupport() { // Intentionally hidden } - public static final int CURRENT_VERSION = 2; + public static final int CURRENT_VERSION = 3; public static final int DEFAULT_VERSION = 1; public static final int INTERMEDIATE_QUEUE_PREFETCH_SIZE = 10000; @@ -54,6 +54,7 @@ private ReplicaSupport() { public static final String TRANSACTION_ONE_PHASE_PROPERTY = "transactionOnePhaseProperty"; public static final String CLIENT_ID_PROPERTY = "clientIdProperty"; + public static final String SUBSCRIPTION_NAME_PROPERTY = "subscriptionNameProperty"; public static final String IS_ORIGINAL_MESSAGE_SENT_TO_QUEUE_PROPERTY = "isOriginalMessageSentToQueueProperty"; public static final String ORIGINAL_MESSAGE_DESTINATION_PROPERTY = "originalMessageDestinationProperty"; public static final String IS_ORIGINAL_MESSAGE_IN_XA_TRANSACTION_PROPERTY = "isOriginalMessageInXaTransactionProperty";