diff --git a/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaBroker.java b/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaBroker.java index 0ba17e01a05..966e4414781 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaBroker.java @@ -57,16 +57,18 @@ public class ReplicaBroker extends MutativeRoleBroker { private final ReplicaReplicationQueueSupplier queueProvider; private final ReplicaPolicy replicaPolicy; private final PeriodAcknowledge periodAcknowledgeCallBack; + private final ReplicaStatistics replicaStatistics; private ReplicaBrokerEventListener messageListener; private ScheduledFuture replicationScheduledFuture; private ScheduledFuture ackPollerScheduledFuture; public ReplicaBroker(Broker broker, ReplicaRoleManagement management, ReplicaReplicationQueueSupplier queueProvider, - ReplicaPolicy replicaPolicy) { + ReplicaPolicy replicaPolicy, ReplicaStatistics replicaStatistics) { super(broker, management); this.queueProvider = queueProvider; this.replicaPolicy = replicaPolicy; this.periodAcknowledgeCallBack = new PeriodAcknowledge(replicaPolicy); + this.replicaStatistics = replicaStatistics; } @Override @@ -126,7 +128,7 @@ private void init(ReplicaRole role) { } } }, replicaPolicy.getReplicaAckPeriod(), replicaPolicy.getReplicaAckPeriod(), TimeUnit.MILLISECONDS); - messageListener = new ReplicaBrokerEventListener(this, queueProvider, periodAcknowledgeCallBack); + messageListener = new ReplicaBrokerEventListener(this, queueProvider, periodAcknowledgeCallBack, replicaStatistics); } private void deinitialize() throws JMSException { diff --git a/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaBrokerEventListener.java b/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaBrokerEventListener.java index 7f58401745c..be6054252c1 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaBrokerEventListener.java +++ b/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaBrokerEventListener.java @@ -69,6 +69,7 @@ public class ReplicaBrokerEventListener implements MessageListener { private final ConnectionContext connectionContext; private final ReplicaInternalMessageProducer replicaInternalMessageProducer; private final PeriodAcknowledge acknowledgeCallback; + private final ReplicaStatistics replicaStatistics; private final AtomicReference replicaEventRetrier = new AtomicReference<>(); final ReplicaSequenceStorage sequenceStorage; private final TransactionBroker transactionBroker; @@ -77,10 +78,11 @@ public class ReplicaBrokerEventListener implements MessageListener { MessageId sequenceMessageId; ReplicaBrokerEventListener(ReplicaBroker replicaBroker, ReplicaReplicationQueueSupplier queueProvider, - PeriodAcknowledge acknowledgeCallback) { + PeriodAcknowledge acknowledgeCallback, ReplicaStatistics replicaStatistics) { this.replicaBroker = requireNonNull(replicaBroker); this.broker = requireNonNull(replicaBroker.getNext()); this.acknowledgeCallback = requireNonNull(acknowledgeCallback); + this.replicaStatistics = replicaStatistics; connectionContext = broker.getAdminConnectionContext().copy(); connectionContext.setUserName(ReplicaSupport.REPLICATION_PLUGIN_USER_NAME); connectionContext.setClientId(REPLICATION_CONSUMER_CLIENT_ID); @@ -203,6 +205,10 @@ private void processMessage(ActiveMQMessage message, ReplicaEventType eventType, "Replication event is out of order. Current sequence %s belongs to message with id %s," + "but the id of the event is %s", sequence, sequenceMessageId, messageId)); } + + long currentTime = System.currentTimeMillis(); + replicaStatistics.setReplicationLag(currentTime - message.getTimestamp()); + replicaStatistics.setReplicaLastProcessedTime(currentTime); } private void processMessage(ActiveMQMessage message, ReplicaEventType eventType, Object deserializedData, diff --git a/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaCompactor.java b/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaCompactor.java index cac546bef96..34f4e5166c0 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaCompactor.java +++ b/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaCompactor.java @@ -55,15 +55,17 @@ public class ReplicaCompactor { private final ReplicaReplicationQueueSupplier queueProvider; private final PrefetchSubscription subscription; private final int additionalMessagesLimit; + private final ReplicaStatistics replicaStatistics; private final Queue intermediateQueue; public ReplicaCompactor(Broker broker, ReplicaReplicationQueueSupplier queueProvider, PrefetchSubscription subscription, - int additionalMessagesLimit) { + int additionalMessagesLimit, ReplicaStatistics replicaStatistics) { this.broker = broker; this.queueProvider = queueProvider; this.subscription = subscription; this.additionalMessagesLimit = additionalMessagesLimit; + this.replicaStatistics = replicaStatistics; intermediateQueue = broker.getDestinations(queueProvider.getIntermediateQueue()).stream().findFirst() .map(DestinationExtractor::extractQueue).orElseThrow(); @@ -157,6 +159,8 @@ private List compactAndFilter0(ConnectionContext conn Set messageIds = toDelete.stream().map(dmid -> dmid.messageReference.getMessageId()).collect(Collectors.toSet()); result.removeIf(reference -> messageIds.contains(reference.messageReference.getMessageId())); + replicaStatistics.increaseTpsCounter(toDelete.size()); + return result; } diff --git a/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaEvent.java b/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaEvent.java index 78c6090e364..48209b5c696 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaEvent.java +++ b/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaEvent.java @@ -33,6 +33,7 @@ public class ReplicaEvent { private Map replicationProperties = new HashMap<>(); private Integer version; + private Long timestamp; ReplicaEvent setTransactionId(TransactionId transactionId) { this.transactionId = transactionId; @@ -64,6 +65,12 @@ ReplicaEvent setVersion(int version) { return this; } + ReplicaEvent setTimestamp(long timestamp) { + this.timestamp = timestamp; + return this; + } + + TransactionId getTransactionId() { return transactionId; } @@ -83,4 +90,8 @@ public Map getReplicationProperties() { public Integer getVersion() { return version; } + + public Long getTimestamp() { + return timestamp; + } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaPlugin.java b/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaPlugin.java index c06378ea118..88d48d0302c 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaPlugin.java +++ b/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaPlugin.java @@ -69,9 +69,11 @@ public Broker installPlugin(final Broker broker) throws Exception { logger.info("{} installed, running as {}", ReplicaPlugin.class.getName(), role); - final BrokerService brokerService = broker.getBrokerService(); + ReplicaStatistics replicaStatistics = new ReplicaStatistics(); + + BrokerService brokerService = broker.getBrokerService(); if (brokerService.isUseJmx()) { - replicationView = new ReplicationView(this); + replicationView = new ReplicationView(this, replicaStatistics); AnnotatedMBean.registerMBean(brokerService.getManagementContext(), replicationView, ReplicationJmxHelper.createJmxName(brokerService)); } @@ -96,7 +98,7 @@ public Broker installPlugin(final Broker broker) throws Exception { interceptors[interceptors.length - 1] = new ReplicaAdvisorySuppressor(); compositeInterceptor.setInterceptors(interceptors); - replicaRoleManagementBroker = new ReplicaRoleManagementBroker(broker, replicaPolicy, role); + replicaRoleManagementBroker = new ReplicaRoleManagementBroker(broker, replicaPolicy, role, replicaStatistics); return new ReplicaAuthorizationBroker(replicaRoleManagementBroker); } diff --git a/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaRoleManagementBroker.java b/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaRoleManagementBroker.java index 7017743dd97..6c9dbc6f0c0 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaRoleManagementBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaRoleManagementBroker.java @@ -48,6 +48,7 @@ public class ReplicaRoleManagementBroker extends MutableBrokerFilter implements private final ReplicaPolicy replicaPolicy; private final ClassLoader contextClassLoader; private ReplicaRole role; + private final ReplicaStatistics replicaStatistics; private final ReplicaReplicationQueueSupplier queueProvider; private final WebConsoleAccessController webConsoleAccessController; private final ReplicaInternalMessageProducer replicaInternalMessageProducer; @@ -59,11 +60,12 @@ public class ReplicaRoleManagementBroker extends MutableBrokerFilter implements ReplicaBroker replicaBroker; private ReplicaRoleStorage replicaRoleStorage; - public ReplicaRoleManagementBroker(Broker broker, ReplicaPolicy replicaPolicy, ReplicaRole role) { + public ReplicaRoleManagementBroker(Broker broker, ReplicaPolicy replicaPolicy, ReplicaRole role, ReplicaStatistics replicaStatistics) { super(broker); this.broker = broker; this.replicaPolicy = replicaPolicy; this.role = role; + this.replicaStatistics = replicaStatistics; contextClassLoader = Thread.currentThread().getContextClassLoader(); @@ -77,7 +79,7 @@ public ReplicaRoleManagementBroker(Broker broker, ReplicaPolicy replicaPolicy, R ReplicationMessageProducer replicationMessageProducer = new ReplicationMessageProducer(replicaInternalMessageProducer, queueProvider); ReplicaSequencer replicaSequencer = new ReplicaSequencer(broker, queueProvider, replicaInternalMessageProducer, - replicationMessageProducer, replicaPolicy); + replicationMessageProducer, replicaPolicy, replicaStatistics); sourceBroker = buildSourceBroker(replicationMessageProducer, replicaSequencer, queueProvider); replicaBroker = buildReplicaBroker(queueProvider); @@ -131,6 +133,7 @@ public synchronized void switchRole(ReplicaRole role, boolean force) throws Exce } public void onStopSuccess() throws Exception { + replicaStatistics.reset(); MutativeRoleBroker nextByRole = getNextByRole(); nextByRole.startAfterRoleChange(); setNext(nextByRole); @@ -189,7 +192,7 @@ private ReplicaSourceBroker buildSourceBroker(ReplicationMessageProducer replica } private ReplicaBroker buildReplicaBroker(ReplicaReplicationQueueSupplier queueProvider) { - return new ReplicaBroker(broker, this, queueProvider, replicaPolicy); + return new ReplicaBroker(broker, this, queueProvider, replicaPolicy, replicaStatistics); } private void addInterceptor4CompositeQueues() { diff --git a/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaSequencer.java b/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaSequencer.java index be32608f5e4..e79884fd724 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaSequencer.java +++ b/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaSequencer.java @@ -84,6 +84,7 @@ public class ReplicaSequencer { private final ReplicaAckHelper replicaAckHelper; private final ReplicaPolicy replicaPolicy; private final ReplicaBatcher replicaBatcher; + private final ReplicaStatistics replicaStatistics; ReplicaCompactor replicaCompactor; private final LongSequenceGenerator sessionIdGenerator = new LongSequenceGenerator(); @@ -107,7 +108,8 @@ public class ReplicaSequencer { public ReplicaSequencer(Broker broker, ReplicaReplicationQueueSupplier queueProvider, ReplicaInternalMessageProducer replicaInternalMessageProducer, - ReplicationMessageProducer replicationMessageProducer, ReplicaPolicy replicaPolicy) { + ReplicationMessageProducer replicationMessageProducer, ReplicaPolicy replicaPolicy, + ReplicaStatistics replicaStatistics) { this.broker = broker; this.queueProvider = queueProvider; this.replicaInternalMessageProducer = replicaInternalMessageProducer; @@ -115,6 +117,7 @@ public ReplicaSequencer(Broker broker, ReplicaReplicationQueueSupplier queueProv this.replicaAckHelper = new ReplicaAckHelper(broker); this.replicaPolicy = replicaPolicy; this.replicaBatcher = new ReplicaBatcher(replicaPolicy); + this.replicaStatistics = replicaStatistics; } void initialize() throws Exception { @@ -154,7 +157,7 @@ void initialize() throws Exception { subscription = (PrefetchSubscription) broker.addConsumer(subscriptionConnectionContext, consumerInfo); replicaCompactor = new ReplicaCompactor(broker, queueProvider, subscription, - replicaPolicy.getCompactorAdditionalMessagesLimit()); + replicaPolicy.getCompactorAdditionalMessagesLimit(), replicaStatistics); intermediateQueue.iterate(); String savedSequences = sequenceStorage.initialize(subscriptionConnectionContext); @@ -313,6 +316,7 @@ List acknowledge(ConsumerBrokerExchange consumerExchange, Mess } List messageIds = new ArrayList<>(); List sequenceMessageIds = new ArrayList<>(); + long timestamp = messagesToAck.get(0).getMessage().getTimestamp(); for (MessageReference reference : messagesToAck) { ActiveMQMessage message = (ActiveMQMessage) reference.getMessage(); List messageIdsProperty; @@ -323,6 +327,8 @@ List acknowledge(ConsumerBrokerExchange consumerExchange, Mess } messageIds.addAll(messageIdsProperty); sequenceMessageIds.add(messageIdsProperty.get(0)); + + timestamp = Math.max(timestamp, message.getTimestamp()); } broker.acknowledge(consumerExchange, ack); @@ -332,6 +338,10 @@ List acknowledge(ConsumerBrokerExchange consumerExchange, Mess sequenceMessageIds.forEach(sequenceMessageToAck::addLast); } + long currentTime = System.currentTimeMillis(); + replicaStatistics.setTotalReplicationLag(currentTime - timestamp); + replicaStatistics.setSourceLastProcessedTime(currentTime); + asyncAckWakeup(); return messagesToAck; @@ -430,6 +440,8 @@ private void iterateAck0() { broker.commitTransaction(connectionContext, transactionId, true); + replicaStatistics.increaseTpsCounter(messages.size()); + synchronized (messageToAck) { messageToAck.removeAll(messages); sequenceMessageToAck.removeAll(sequenceMessages); @@ -575,6 +587,7 @@ private BigInteger enqueueReplicaEvent(ConnectionContext connectionContext, List List messageIds = new ArrayList<>(); List messages = new ArrayList<>(); + long timestamp = batch.get(0).getMessage().getTimestamp(); for (MessageReference reference : batch) { ActiveMQMessage originalMessage = (ActiveMQMessage) reference.getMessage(); ActiveMQMessage message = (ActiveMQMessage) originalMessage.copy(); @@ -589,12 +602,16 @@ private BigInteger enqueueReplicaEvent(ConnectionContext connectionContext, List messages.add(message); sequence = sequence.add(BigInteger.ONE); + + // take timestamp from the newest message for statistics + timestamp = Math.max(timestamp, message.getTimestamp()); } ReplicaEvent replicaEvent = new ReplicaEvent() .setEventType(ReplicaEventType.BATCH) .setEventData(eventSerializer.serializeListOfObjects(messages)) .setTransactionId(transactionId) + .setTimestamp(timestamp) .setReplicationProperty(ReplicaSupport.MESSAGE_IDS_PROPERTY, messageIds); replicationMessageProducer.enqueueMainReplicaEvent(connectionContext, replicaEvent); diff --git a/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaStatistics.java b/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaStatistics.java new file mode 100644 index 00000000000..4e1885d9674 --- /dev/null +++ b/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaStatistics.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.replica; + +import org.apache.activemq.broker.jmx.MBeanInfo; + +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +public class ReplicaStatistics { + + private AtomicLong replicationTps; + private AtomicLong tpsCounter; + private AtomicLong lastTpsCounter; + + private AtomicLong totalReplicationLag; + private AtomicLong sourceLastProcessedTime; + private AtomicLong replicationLag; + private AtomicLong replicaLastProcessedTime; + + public ReplicaStatistics() { + Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> { + if (tpsCounter == null) { + return; + } + + long c = tpsCounter.get(); + if (replicationTps == null) { + replicationTps = new AtomicLong(); + } + replicationTps.set((c - lastTpsCounter.get()) / 10); + if (lastTpsCounter == null) { + lastTpsCounter = new AtomicLong(); + } + lastTpsCounter.set(c); + }, 60, 60, TimeUnit.SECONDS); + } + + public void reset() { + replicationTps = null; + tpsCounter = null; + lastTpsCounter = null; + + totalReplicationLag = null; + sourceLastProcessedTime = null; + replicationLag = null; + replicaLastProcessedTime = null; + } + + public void increaseTpsCounter(long size) { + if (tpsCounter == null) { + tpsCounter = new AtomicLong(); + } + tpsCounter.addAndGet(size); + } + + public AtomicLong getReplicationTps() { + return replicationTps; + } + + public AtomicLong getTotalReplicationLag() { + return totalReplicationLag; + } + + public void setTotalReplicationLag(long totalReplicationLag) { + if (this.totalReplicationLag == null) { + this.totalReplicationLag = new AtomicLong(); + } + this.totalReplicationLag.set(totalReplicationLag); + } + + public AtomicLong getSourceLastProcessedTime() { + return sourceLastProcessedTime; + } + + public void setSourceLastProcessedTime(long sourceLastProcessedTime) { + if (this.sourceLastProcessedTime == null) { + this.sourceLastProcessedTime = new AtomicLong(); + } + this.sourceLastProcessedTime.set(sourceLastProcessedTime); + } + + public AtomicLong getReplicationLag() { + return replicationLag; + } + + public void setReplicationLag(long replicationLag) { + if (this.replicationLag == null) { + this.replicationLag = new AtomicLong(); + } + this.replicationLag.set(replicationLag); + } + + public AtomicLong getReplicaLastProcessedTime() { + return replicaLastProcessedTime; + } + + public void setReplicaLastProcessedTime(long replicaLastProcessedTime) { + if (this.replicaLastProcessedTime == null) { + this.replicaLastProcessedTime = new AtomicLong(); + } + this.replicaLastProcessedTime.set(replicaLastProcessedTime); + } +} diff --git a/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicationMessageProducer.java b/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicationMessageProducer.java index ec88a8a7e3e..3b921733b55 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicationMessageProducer.java +++ b/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicationMessageProducer.java @@ -70,6 +70,7 @@ private void enqueueReplicaEvent(ConnectionContext connectionContext, ReplicaEve eventMessage.setProperties(event.getReplicationProperties()); eventMessage.setTransactionId(event.getTransactionId()); eventMessage.setIntProperty(ReplicaSupport.VERSION_PROPERTY, event.getVersion() == null ? ReplicaSupport.CURRENT_VERSION : event.getVersion()); + eventMessage.setTimestamp(event.getTimestamp() == null ? System.currentTimeMillis() : event.getTimestamp()); replicaInternalMessageProducer.sendIgnoringFlowControl(connectionContext, eventMessage); } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/replica/jmx/ReplicationView.java b/activemq-broker/src/main/java/org/apache/activemq/replica/jmx/ReplicationView.java index 31c341a2b1d..737706e2511 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/replica/jmx/ReplicationView.java +++ b/activemq-broker/src/main/java/org/apache/activemq/replica/jmx/ReplicationView.java @@ -18,13 +18,24 @@ import org.apache.activemq.replica.ReplicaPlugin; import org.apache.activemq.replica.ReplicaRole; +import org.apache.activemq.replica.ReplicaStatistics; + +import java.util.Optional; +import java.util.concurrent.atomic.AtomicLong; public class ReplicationView implements ReplicationViewMBean { private final ReplicaPlugin plugin; + private final ReplicaStatistics replicaStatistics; - public ReplicationView(ReplicaPlugin plugin) { + public ReplicationView(ReplicaPlugin plugin, ReplicaStatistics replicaStatistics) { this.plugin = plugin; + this.replicaStatistics = replicaStatistics; + } + + @Override + public Long getReplicationTps() { + return Optional.ofNullable(replicaStatistics.getReplicationTps()).map(AtomicLong::get).orElse(null); } @Override @@ -36,4 +47,26 @@ public void setReplicationRole(String role, boolean force) throws Exception { public String getReplicationRole() { return plugin.getRole().name(); } + + @Override + public Long getTotalReplicationLag() { + return Optional.ofNullable(replicaStatistics.getTotalReplicationLag()).map(AtomicLong::get).orElse(null); + } + + @Override + public Long getSourceWaitTime() { + return Optional.ofNullable(replicaStatistics.getSourceLastProcessedTime()).map(AtomicLong::get) + .map(v -> System.currentTimeMillis() - v).orElse(null); + } + + @Override + public Long getReplicationLag() { + return Optional.ofNullable(replicaStatistics.getReplicationLag()).map(AtomicLong::get).orElse(null); + } + + @Override + public Long getReplicaWaitTime() { + return Optional.ofNullable(replicaStatistics.getReplicaLastProcessedTime()).map(AtomicLong::get) + .map(v -> System.currentTimeMillis() - v).orElse(null); + } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/replica/jmx/ReplicationViewMBean.java b/activemq-broker/src/main/java/org/apache/activemq/replica/jmx/ReplicationViewMBean.java index b729cc32e0a..7edd5f848d9 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/replica/jmx/ReplicationViewMBean.java +++ b/activemq-broker/src/main/java/org/apache/activemq/replica/jmx/ReplicationViewMBean.java @@ -20,10 +20,24 @@ public interface ReplicationViewMBean { + @MBeanInfo("Replication TPS") + Long getReplicationTps(); + @MBeanInfo("Set replication role for broker") void setReplicationRole(String role, boolean force) throws Exception; @MBeanInfo("Get current replication role for broker") String getReplicationRole(); + @MBeanInfo("Total replication lag") + Long getTotalReplicationLag(); + + @MBeanInfo("Get wait time(if the broker's role is source)") + Long getSourceWaitTime(); + + @MBeanInfo("Get replication lag") + Long getReplicationLag(); + + @MBeanInfo("Get wait time(if the broker's role is replica)") + Long getReplicaWaitTime(); } diff --git a/activemq-broker/src/test/java/org/apache/activemq/replica/ReplicaBrokerEventListenerTest.java b/activemq-broker/src/test/java/org/apache/activemq/replica/ReplicaBrokerEventListenerTest.java index 65edd99563c..02a56decbaf 100644 --- a/activemq-broker/src/test/java/org/apache/activemq/replica/ReplicaBrokerEventListenerTest.java +++ b/activemq-broker/src/test/java/org/apache/activemq/replica/ReplicaBrokerEventListenerTest.java @@ -85,6 +85,7 @@ public class ReplicaBrokerEventListenerTest { private ReplicaBrokerEventListener listener; private PeriodAcknowledge acknowledgeCallback; private final ReplicaEventSerializer eventSerializer = new ReplicaEventSerializer(); + private final ReplicaStatistics replicaStatistics = new ReplicaStatistics(); @Before public void setUp() throws Exception { @@ -106,7 +107,7 @@ public void setUp() throws Exception { when(broker.addConsumer(any(), any())).thenReturn(subscription); when(broker.getAdaptor(TransactionBroker.class)).thenReturn(transactionBroker); acknowledgeCallback = new PeriodAcknowledge(new ReplicaPolicy()); - listener = new ReplicaBrokerEventListener(replicaBroker, queueProvider, acknowledgeCallback); + listener = new ReplicaBrokerEventListener(replicaBroker, queueProvider, acknowledgeCallback, replicaStatistics); listener.initialize(); } diff --git a/activemq-broker/src/test/java/org/apache/activemq/replica/ReplicaCompactorTest.java b/activemq-broker/src/test/java/org/apache/activemq/replica/ReplicaCompactorTest.java index 321a89c548c..afe59b3504e 100644 --- a/activemq-broker/src/test/java/org/apache/activemq/replica/ReplicaCompactorTest.java +++ b/activemq-broker/src/test/java/org/apache/activemq/replica/ReplicaCompactorTest.java @@ -68,7 +68,7 @@ public void setUp() throws Exception { PrefetchSubscription originalSubscription = mock(PrefetchSubscription.class); when(originalSubscription.getConsumerInfo()).thenReturn(consumerInfo); - replicaCompactor = new ReplicaCompactor(broker, queueProvider, originalSubscription, 1000); + replicaCompactor = new ReplicaCompactor(broker, queueProvider, originalSubscription, 1000, new ReplicaStatistics()); } @Test diff --git a/activemq-broker/src/test/java/org/apache/activemq/replica/ReplicaRoleManagementBrokerTest.java b/activemq-broker/src/test/java/org/apache/activemq/replica/ReplicaRoleManagementBrokerTest.java index d998da0bbc6..569822c454e 100644 --- a/activemq-broker/src/test/java/org/apache/activemq/replica/ReplicaRoleManagementBrokerTest.java +++ b/activemq-broker/src/test/java/org/apache/activemq/replica/ReplicaRoleManagementBrokerTest.java @@ -69,7 +69,7 @@ public void setUp() throws Exception { when(regionBroker.getDestinationInterceptor()).thenReturn(cdi); when(cdi.getInterceptors()).thenReturn(new DestinationInterceptor[]{}); - replicaRoleManagementBroker = new ReplicaRoleManagementBroker(broker, replicaPolicy, ReplicaRole.replica); + replicaRoleManagementBroker = new ReplicaRoleManagementBroker(broker, replicaPolicy, ReplicaRole.replica, new ReplicaStatistics()); replicaRoleManagementBroker.replicaBroker = replicaBroker; replicaRoleManagementBroker.sourceBroker = sourceBroker; } diff --git a/activemq-broker/src/test/java/org/apache/activemq/replica/ReplicaSequencerTest.java b/activemq-broker/src/test/java/org/apache/activemq/replica/ReplicaSequencerTest.java index 83282a5a4d6..66c02892766 100644 --- a/activemq-broker/src/test/java/org/apache/activemq/replica/ReplicaSequencerTest.java +++ b/activemq-broker/src/test/java/org/apache/activemq/replica/ReplicaSequencerTest.java @@ -106,7 +106,7 @@ public void setUp() throws Exception { .thenAnswer(a -> a.getArgument(1).getConsumerId().toString().contains("Sequencer") ? intermediateSubscription : mock(PrefetchSubscription.class)); - sequencer = new ReplicaSequencer(broker, queueProvider, replicaInternalMessageProducer, replicationMessageProducer, new ReplicaPolicy()); + sequencer = new ReplicaSequencer(broker, queueProvider, replicaInternalMessageProducer, replicationMessageProducer, new ReplicaPolicy(), new ReplicaStatistics()); sequencer.initialize(); } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/replica/ReplicaAcknowledgeReplicationEventTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/replica/ReplicaAcknowledgeReplicationEventTest.java index ebb62a700f6..6bc7175d6b5 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/replica/ReplicaAcknowledgeReplicationEventTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/replica/ReplicaAcknowledgeReplicationEventTest.java @@ -34,6 +34,7 @@ import org.apache.activemq.replica.ReplicaPolicy; import org.apache.activemq.replica.ReplicaRole; import org.apache.activemq.replica.ReplicaRoleManagementBroker; +import org.apache.activemq.replica.ReplicaStatistics; import org.apache.activemq.replica.ReplicaSupport; import org.junit.After; import org.junit.Before; @@ -246,7 +247,7 @@ protected BrokerService createSecondBroker() throws Exception { ReplicaPlugin replicaPlugin = new ReplicaPlugin() { @Override public Broker installPlugin(final Broker broker) { - return new ReplicaRoleManagementBroker(broker, mockReplicaPolicy, ReplicaRole.replica); + return new ReplicaRoleManagementBroker(broker, mockReplicaPolicy, ReplicaRole.replica, new ReplicaStatistics()); } }; replicaPlugin.setRole(ReplicaRole.replica); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/replica/ReplicationEventHandlingTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/replica/ReplicationEventHandlingTest.java index cc9681b2b5a..eba39ee553f 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/replica/ReplicationEventHandlingTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/replica/ReplicationEventHandlingTest.java @@ -36,6 +36,7 @@ import org.apache.activemq.replica.ReplicaPolicy; import org.apache.activemq.replica.ReplicaRole; import org.apache.activemq.replica.ReplicaRoleManagementBroker; +import org.apache.activemq.replica.ReplicaStatistics; import org.apache.activemq.replica.ReplicaSupport; import org.junit.After; import org.junit.Before; @@ -255,7 +256,7 @@ protected BrokerService createSecondBroker() throws Exception { @Override public Broker installPlugin(final Broker broker) { nextBrokerSpy = spy(broker); - return new ReplicaRoleManagementBroker(nextBrokerSpy, replicaPolicy, ReplicaRole.replica); + return new ReplicaRoleManagementBroker(nextBrokerSpy, replicaPolicy, ReplicaRole.replica, new ReplicaStatistics()); } }; replicaPlugin.setRole(ReplicaRole.replica);