diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java index 34fd9f17f6ea6..aae54162e5a08 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java @@ -195,6 +195,7 @@ public void startProducer() { prepareCreateProducer().thenCompose(ignore -> { ProducerBuilderImpl builderImpl = (ProducerBuilderImpl) producerBuilder; builderImpl.getConf().setNonPartitionedTopicExpected(true); + builderImpl.getConf().setReplProducer(true); return producerBuilder.createAsync().thenAccept(producer -> { setProducerAndTriggerReadEntries(producer); }); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java index c39b722888f71..0784f74591ec5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java @@ -20,6 +20,8 @@ import static com.scurrilous.circe.checksum.Crc32cIntChecksum.computeChecksum; import static org.apache.pulsar.broker.service.AbstractReplicator.REPL_PRODUCER_NAME_DELIMITER; +import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_IS_REPL_MARKER; +import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_REPL_SOURCE_POSITION; import static org.apache.pulsar.common.protocol.Commands.hasChecksum; import static org.apache.pulsar.common.protocol.Commands.readChecksum; import com.google.common.annotations.VisibleForTesting; @@ -91,6 +93,7 @@ public class Producer { AtomicReferenceFieldUpdater.newUpdater(Producer.class, Attributes.class, "attributes"); private final boolean isRemote; + private final boolean isRemoteOrShadow; private final String remoteCluster; private final boolean isNonPersistentTopic; private final boolean isShadowTopic; @@ -148,6 +151,7 @@ public Producer(Topic topic, TransportCnx cnx, long producerId, String producerN String replicatorPrefix = serviceConf.getReplicatorPrefix() + "."; this.isRemote = producerName.startsWith(replicatorPrefix); + this.isRemoteOrShadow = isRemoteOrShadow(producerName, serviceConf.getReplicatorPrefix()); this.remoteCluster = parseRemoteClusterName(producerName, isRemote, replicatorPrefix); this.isEncrypted = isEncrypted; @@ -159,6 +163,13 @@ public Producer(Topic topic, TransportCnx cnx, long producerId, String producerN this.brokerInterceptor = cnx.getBrokerService().getInterceptor(); } + /** + * Difference with "isRemote" is whether the prefix string is end with a dot. + */ + public static boolean isRemoteOrShadow(String producerName, String replicatorPrefix) { + return producerName != null && producerName.startsWith(replicatorPrefix); + } + /** * Producer name for replicator is in format. * "replicatorPrefix.localCluster" (old) @@ -267,11 +278,16 @@ public boolean checkAndStartPublish(long producerId, long sequenceId, ByteBuf he return true; } + private boolean isSupportsReplDedupByLidAndEid() { + // Non-Persistent topic does not have ledger id or entry id, so it does not support. + return cnx.isClientSupportsReplDedupByLidAndEid() && topic.isPersistent(); + } + private void publishMessageToTopic(ByteBuf headersAndPayload, long sequenceId, int batchSize, boolean isChunked, boolean isMarker, Position position) { MessagePublishContext messagePublishContext = MessagePublishContext.get(this, sequenceId, headersAndPayload.readableBytes(), - batchSize, isChunked, System.nanoTime(), isMarker, position); + batchSize, isChunked, System.nanoTime(), isMarker, position, isSupportsReplDedupByLidAndEid()); if (brokerInterceptor != null) { brokerInterceptor .onMessagePublish(this, headersAndPayload, messagePublishContext); @@ -283,7 +299,7 @@ private void publishMessageToTopic(ByteBuf headersAndPayload, long lowestSequenc int batchSize, boolean isChunked, boolean isMarker, Position position) { MessagePublishContext messagePublishContext = MessagePublishContext.get(this, lowestSequenceId, highestSequenceId, headersAndPayload.readableBytes(), batchSize, - isChunked, System.nanoTime(), isMarker, position); + isChunked, System.nanoTime(), isMarker, position, isSupportsReplDedupByLidAndEid()); if (brokerInterceptor != null) { brokerInterceptor .onMessagePublish(this, headersAndPayload, messagePublishContext); @@ -378,6 +394,7 @@ private static final class MessagePublishContext implements PublishContext, Runn private int batchSize; private boolean chunked; private boolean isMarker; + private boolean supportsReplDedupByLidAndEid; private long startTimeNs; @@ -463,6 +480,11 @@ public long getOriginalSequenceId() { return originalSequenceId; } + @Override + public boolean supportsReplDedupByLidAndEid() { + return supportsReplDedupByLidAndEid; + } + @Override public void setOriginalHighestSequenceId(long originalHighestSequenceId) { this.originalHighestSequenceId = originalHighestSequenceId; @@ -537,8 +559,12 @@ public void run() { // stats producer.stats.recordMsgIn(batchSize, msgSize); producer.topic.recordAddLatency(System.nanoTime() - startTimeNs, TimeUnit.NANOSECONDS); - producer.cnx.getCommandSender().sendSendReceiptResponse(producer.producerId, sequenceId, highestSequenceId, - ledgerId, entryId); + if (producer.isRemoteOrShadow && producer.isSupportsReplDedupByLidAndEid()) { + sendSendReceiptResponseRepl(); + } else { + // Repl V1 is the same as normal for this handling. + sendSendReceiptResponseNormal(); + } producer.cnx.completedSendOperation(producer.isNonPersistentTopic, msgSize); if (this.chunked) { producer.stats.recordChunkedMsgIn(); @@ -551,8 +577,46 @@ public void run() { recycle(); } + private void sendSendReceiptResponseRepl() { + // Case-1: is a repl marker. + boolean isReplMarker = getProperty(MSG_PROP_IS_REPL_MARKER) != null; + if (isReplMarker) { + producer.cnx.getCommandSender().sendSendReceiptResponse(producer.producerId, sequenceId, Long.MIN_VALUE, + ledgerId, entryId); + + return; + } + // Case-2: is a repl message. + Object positionPairObj = getProperty(MSG_PROP_REPL_SOURCE_POSITION); + if (positionPairObj == null || !(positionPairObj instanceof long[]) + || ((long[]) positionPairObj).length < 2) { + log.error("[{}] Message can not determine whether the message is duplicated due to the acquired" + + " messages props were are invalid. producer={}. supportsReplDedupByLidAndEid: {}," + + " sequence-id {}, prop-{}: not in expected format", + producer.topic.getName(), producer.producerName, + supportsReplDedupByLidAndEid(), getSequenceId(), + MSG_PROP_REPL_SOURCE_POSITION); + producer.cnx.getCommandSender().sendSendError(producer.producerId, + Math.max(highestSequenceId, sequenceId), + ServerError.PersistenceError, "Message can not determine whether the message is" + + " duplicated due to the acquired messages props were are invalid"); + return; + } + long[] positionPair = (long[]) positionPairObj; + long replSequenceLId = positionPair[0]; + long replSequenceEId = positionPair[1]; + producer.cnx.getCommandSender().sendSendReceiptResponse(producer.producerId, replSequenceLId, + replSequenceEId, ledgerId, entryId); + } + + private void sendSendReceiptResponseNormal() { + producer.cnx.getCommandSender().sendSendReceiptResponse(producer.producerId, sequenceId, highestSequenceId, + ledgerId, entryId); + } + static MessagePublishContext get(Producer producer, long sequenceId, int msgSize, int batchSize, - boolean chunked, long startTimeNs, boolean isMarker, Position position) { + boolean chunked, long startTimeNs, boolean isMarker, Position position, + boolean supportsReplDedupByLidAndEid) { MessagePublishContext callback = RECYCLER.get(); callback.producer = producer; callback.sequenceId = sequenceId; @@ -563,6 +627,7 @@ static MessagePublishContext get(Producer producer, long sequenceId, int msgSize callback.originalSequenceId = -1L; callback.startTimeNs = startTimeNs; callback.isMarker = isMarker; + callback.supportsReplDedupByLidAndEid = supportsReplDedupByLidAndEid; callback.ledgerId = position == null ? -1 : position.getLedgerId(); callback.entryId = position == null ? -1 : position.getEntryId(); if (callback.propertyMap != null) { @@ -572,7 +637,8 @@ static MessagePublishContext get(Producer producer, long sequenceId, int msgSize } static MessagePublishContext get(Producer producer, long lowestSequenceId, long highestSequenceId, int msgSize, - int batchSize, boolean chunked, long startTimeNs, boolean isMarker, Position position) { + int batchSize, boolean chunked, long startTimeNs, boolean isMarker, Position position, + boolean supportsReplDedupByLidAndEid) { MessagePublishContext callback = RECYCLER.get(); callback.producer = producer; callback.sequenceId = lowestSequenceId; @@ -584,6 +650,7 @@ static MessagePublishContext get(Producer producer, long lowestSequenceId, long callback.startTimeNs = startTimeNs; callback.chunked = chunked; callback.isMarker = isMarker; + callback.supportsReplDedupByLidAndEid = supportsReplDedupByLidAndEid; callback.ledgerId = position == null ? -1 : position.getLedgerId(); callback.entryId = position == null ? -1 : position.getEntryId(); if (callback.propertyMap != null) { @@ -801,7 +868,8 @@ public void publishTxnMessage(TxnID txnID, long producerId, long sequenceId, lon } MessagePublishContext messagePublishContext = MessagePublishContext.get(this, sequenceId, highSequenceId, - headersAndPayload.readableBytes(), batchSize, isChunked, System.nanoTime(), isMarker, null); + headersAndPayload.readableBytes(), batchSize, isChunked, System.nanoTime(), isMarker, null, + cnx.isClientSupportsReplDedupByLidAndEid()); if (brokerInterceptor != null) { brokerInterceptor .onMessagePublish(this, headersAndPayload, messagePublishContext); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index d33264b171a18..7bdf1f3491c23 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -65,6 +65,7 @@ import javax.net.ssl.SSLSession; import javax.ws.rs.WebApplicationException; import javax.ws.rs.core.Response; +import lombok.Getter; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedLedger; @@ -237,6 +238,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { private boolean encryptionRequireOnProducer; + @Getter private FeatureFlags features; private PulsarCommandSender commandSender; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java index ec7889af6bbbe..8f66d9c0e3e0f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java @@ -126,6 +126,10 @@ default long getEntryTimestamp() { default void setEntryTimestamp(long entryTimestamp) { } + + default boolean supportsReplDedupByLidAndEid() { + return false; + } } CompletableFuture initialize(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java index eb2b318b7ead1..63599f09eef2e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java @@ -25,6 +25,7 @@ import java.util.concurrent.CompletableFuture; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData; +import org.apache.pulsar.common.api.proto.FeatureFlags; public interface TransportCnx { @@ -103,4 +104,11 @@ public interface TransportCnx { * previously called {@link #incrementThrottleCount()}. */ void decrementThrottleCount(); + + FeatureFlags getFeatures(); + + default boolean isClientSupportsReplDedupByLidAndEid() { + return getFeatures() != null && getFeatures().hasSupportsReplDedupByLidAndEid() + && getFeatures().isSupportsReplDedupByLidAndEid(); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java index cd5b2ba721215..c81ebe8d6ffcd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.service.persistent; +import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_REPL_SOURCE_POSITION; import io.netty.buffer.ByteBuf; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -194,11 +195,17 @@ protected boolean replicateEntries(List entries) { msg.setSchemaInfoForReplicator(schemaFuture.get()); msg.getMessageBuilder().clearTxnidMostBits(); msg.getMessageBuilder().clearTxnidLeastBits(); + // Add props for sequence checking. + msg.getMessageBuilder().addProperty().setKey(MSG_PROP_REPL_SOURCE_POSITION) + .setValue(String.format("%s:%s", entry.getLedgerId(), entry.getEntryId())); msgOut.recordEvent(headersAndPayload.readableBytes()); stats.incrementMsgOutCounter(); stats.incrementBytesOutCounter(headersAndPayload.readableBytes()); // Increment pending messages for messages produced locally PENDING_MESSAGES_UPDATER.incrementAndGet(this); + if (log.isDebugEnabled()) { + log.debug("[{}] Publishing {}:{}", replicatorId, entry.getLedgerId(), entry.getEntryId()); + } producer.sendAsync(msg, ProducerSendCallback.create(this, entry, msg)); atLeastOneMessageSentForReplication = true; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java index dfb8b9d2edb12..085e4f688b80d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.broker.service.persistent; +import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_IS_REPL_MARKER; +import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_REPL_SOURCE_POSITION; import com.google.common.annotations.VisibleForTesting; import io.netty.buffer.ByteBuf; import java.util.Iterator; @@ -38,10 +40,14 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.PositionFactory; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.service.Producer; import org.apache.pulsar.broker.service.Topic.PublishContext; +import org.apache.pulsar.common.api.proto.KeyValue; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.protocol.Commands; +import org.apache.pulsar.common.protocol.Markers; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -321,27 +327,157 @@ public boolean isEnabled() { * @return true if the message should be published or false if it was recognized as a duplicate */ public MessageDupStatus isDuplicate(PublishContext publishContext, ByteBuf headersAndPayload) { + setContextPropsIfRepl(publishContext, headersAndPayload); if (!isEnabled() || publishContext.isMarkerMessage()) { return MessageDupStatus.NotDup; } + if (Producer.isRemoteOrShadow(publishContext.getProducerName(), replicatorPrefix)) { + if (!publishContext.supportsReplDedupByLidAndEid()){ + return isDuplicateReplV1(publishContext, headersAndPayload); + } else { + return isDuplicateReplV2(publishContext, headersAndPayload); + } + } + return isDuplicateNormal(publishContext, headersAndPayload, false); + } + + public MessageDupStatus isDuplicateReplV1(PublishContext publishContext, ByteBuf headersAndPayload) { + // Message is coming from replication, we need to use the original producer name and sequence id + // for the purpose of deduplication and not rely on the "replicator" name. + int readerIndex = headersAndPayload.readerIndex(); + MessageMetadata md = Commands.parseMessageMetadata(headersAndPayload); + headersAndPayload.readerIndex(readerIndex); + + String producerName = md.getProducerName(); + long sequenceId = md.getSequenceId(); + long highestSequenceId = Math.max(md.getHighestSequenceId(), sequenceId); + publishContext.setOriginalProducerName(producerName); + publishContext.setOriginalSequenceId(sequenceId); + publishContext.setOriginalHighestSequenceId(highestSequenceId); + return isDuplicateNormal(publishContext, headersAndPayload, true); + } + + private void setContextPropsIfRepl(PublishContext publishContext, ByteBuf headersAndPayload) { + // Case-1: is a replication marker. + if (publishContext.isMarkerMessage()) { + // Message is coming from replication, we need to use the replication's producer name, ledger id and entry + // id for the purpose of deduplication. + MessageMetadata md = Commands.peekMessageMetadata(headersAndPayload, "Check-Deduplicate", -1); + if (Markers.isReplicationMarker(md.getMarkerType())) { + publishContext.setProperty(MSG_PROP_IS_REPL_MARKER, ""); + } + return; + } + + // Case-2: is a replicated message. + if (Producer.isRemoteOrShadow(publishContext.getProducerName(), replicatorPrefix)) { + // Message is coming from replication, we need to use the replication's producer name, source cluster's + // ledger id and entry id for the purpose of deduplication. + int readerIndex = headersAndPayload.readerIndex(); + MessageMetadata md = Commands.parseMessageMetadata(headersAndPayload); + headersAndPayload.readerIndex(readerIndex); + + List kvPairList = md.getPropertiesList(); + for (KeyValue kvPair : kvPairList) { + if (kvPair.getKey().equals(MSG_PROP_REPL_SOURCE_POSITION)) { + if (!kvPair.getValue().contains(":")) { + log.warn("[{}] Unexpected {}: {}", publishContext.getProducerName(), + MSG_PROP_REPL_SOURCE_POSITION, + kvPair.getValue()); + break; + } + String[] ledgerIdAndEntryId = kvPair.getValue().split(":"); + if (ledgerIdAndEntryId.length != 2 || !StringUtils.isNumeric(ledgerIdAndEntryId[0]) + || !StringUtils.isNumeric(ledgerIdAndEntryId[1])) { + log.warn("[{}] Unexpected {}: {}", publishContext.getProducerName(), + MSG_PROP_REPL_SOURCE_POSITION, + kvPair.getValue()); + break; + } + long[] positionPair = new long[]{Long.valueOf(ledgerIdAndEntryId[0]).longValue(), + Long.valueOf(ledgerIdAndEntryId[1]).longValue()}; + publishContext.setProperty(MSG_PROP_REPL_SOURCE_POSITION, positionPair); + break; + } + } + } + } + + public MessageDupStatus isDuplicateReplV2(PublishContext publishContext, ByteBuf headersAndPayload) { + Object positionPairObj = publishContext.getProperty(MSG_PROP_REPL_SOURCE_POSITION); + if (positionPairObj == null || !(positionPairObj instanceof long[])) { + log.error("[{}] Message can not determine whether the message is duplicated due to the acquired messages" + + " props were are invalid. producer={}. supportsReplDedupByLidAndEid: {}, sequence-id {}," + + " prop-{}: not in expected format", + topic.getName(), publishContext.getProducerName(), + publishContext.supportsReplDedupByLidAndEid(), publishContext.getSequenceId(), + MSG_PROP_REPL_SOURCE_POSITION); + return MessageDupStatus.Unknown; + } + + long[] positionPair = (long[]) positionPairObj; + long replSequenceLId = positionPair[0]; + long replSequenceEId = positionPair[1]; + + String lastSequenceLIdKey = publishContext.getProducerName() + "_LID"; + String lastSequenceEIdKey = publishContext.getProducerName() + "_EID"; + synchronized (highestSequencedPushed) { + Long lastSequenceLIdPushed = highestSequencedPushed.get(lastSequenceLIdKey); + Long lastSequenceEIdPushed = highestSequencedPushed.get(lastSequenceEIdKey); + if (lastSequenceLIdPushed != null && lastSequenceEIdPushed != null + && (replSequenceLId < lastSequenceLIdPushed.longValue() + || (replSequenceLId == lastSequenceLIdPushed.longValue() + && replSequenceEId <= lastSequenceEIdPushed.longValue()))) { + if (log.isDebugEnabled()) { + log.debug("[{}] Message identified as duplicated producer={}. publishing {}:{}, latest publishing" + + " in-progress {}:{}", + topic.getName(), publishContext.getProducerName(), lastSequenceLIdPushed, + lastSequenceEIdPushed, lastSequenceLIdPushed, lastSequenceEIdPushed); + } + + // Also need to check sequence ids that has been persisted. + // If current message's seq id is smaller or equals to the + // "lastSequenceLIdPersisted:lastSequenceEIdPersisted" than its definitely a dup + // If current message's seq id is between "lastSequenceLIdPushed:lastSequenceEIdPushed" and + // "lastSequenceLIdPersisted:lastSequenceEIdPersisted", then we cannot be sure whether the message + // is a dup or not we should return an error to the producer for the latter case so that it can retry + // at a future time + Long lastSequenceLIdPersisted = highestSequencedPersisted.get(lastSequenceLIdKey); + Long lastSequenceEIdPersisted = highestSequencedPersisted.get(lastSequenceEIdKey); + if (log.isDebugEnabled()) { + log.debug("[{}] Message identified as duplicated producer={}. publishing {}:{}, latest" + + " persisted {}:{}", + topic.getName(), publishContext.getProducerName(), replSequenceLId, + replSequenceEId, lastSequenceLIdPersisted, lastSequenceEIdPersisted); + } + if (lastSequenceLIdPersisted != null && lastSequenceEIdPersisted != null + && (replSequenceLId < lastSequenceLIdPersisted.longValue() + || (replSequenceLId == lastSequenceLIdPersisted.longValue() + && replSequenceEId <= lastSequenceEIdPersisted))) { + return MessageDupStatus.Dup; + } else { + return MessageDupStatus.Unknown; + } + } + highestSequencedPushed.put(lastSequenceLIdKey, replSequenceLId); + highestSequencedPushed.put(lastSequenceEIdKey, replSequenceEId); + } + if (log.isDebugEnabled()) { + log.debug("[{}] Message identified as non-duplicated producer={}. publishing {}:{}", + topic.getName(), publishContext.getProducerName(), replSequenceLId, replSequenceEId); + } + return MessageDupStatus.NotDup; + } + public MessageDupStatus isDuplicateNormal(PublishContext publishContext, ByteBuf headersAndPayload, + boolean useOriginalProducerName) { String producerName = publishContext.getProducerName(); + if (useOriginalProducerName) { + producerName = publishContext.getOriginalProducerName(); + } long sequenceId = publishContext.getSequenceId(); long highestSequenceId = Math.max(publishContext.getHighestSequenceId(), sequenceId); MessageMetadata md = null; - if (producerName.startsWith(replicatorPrefix)) { - // Message is coming from replication, we need to use the original producer name and sequence id - // for the purpose of deduplication and not rely on the "replicator" name. - int readerIndex = headersAndPayload.readerIndex(); - md = Commands.parseMessageMetadata(headersAndPayload); - producerName = md.getProducerName(); - sequenceId = md.getSequenceId(); - highestSequenceId = Math.max(md.getHighestSequenceId(), sequenceId); - publishContext.setOriginalProducerName(producerName); - publishContext.setOriginalSequenceId(sequenceId); - publishContext.setOriginalHighestSequenceId(highestSequenceId); - headersAndPayload.readerIndex(readerIndex); - } long chunkID = -1; long totalChunk = -1; if (publishContext.isChunked()) { @@ -399,7 +535,37 @@ public void recordMessagePersisted(PublishContext publishContext, Position posit if (!isEnabled() || publishContext.isMarkerMessage()) { return; } + if (publishContext.getProducerName().startsWith(replicatorPrefix) + && publishContext.supportsReplDedupByLidAndEid()) { + recordMessagePersistedRepl(publishContext, position); + } else { + recordMessagePersistedNormal(publishContext, position); + } + } + + public void recordMessagePersistedRepl(PublishContext publishContext, Position position) { + Object positionPairObj = publishContext.getProperty(MSG_PROP_REPL_SOURCE_POSITION); + if (positionPairObj == null || !(positionPairObj instanceof long[])) { + log.error("[{}] Can not persist highest sequence-id due to the acquired messages" + + " props are invalid. producer={}. supportsReplDedupByLidAndEid: {}, sequence-id {}," + + " prop-{}: not in expected format", + topic.getName(), publishContext.getProducerName(), + publishContext.supportsReplDedupByLidAndEid(), publishContext.getSequenceId(), + MSG_PROP_REPL_SOURCE_POSITION); + recordMessagePersistedNormal(publishContext, position); + return; + } + long[] positionPair = (long[]) positionPairObj; + long replSequenceLId = positionPair[0]; + long replSequenceEId = positionPair[1]; + String lastSequenceLIdKey = publishContext.getProducerName() + "_LID"; + String lastSequenceEIdKey = publishContext.getProducerName() + "_EID"; + highestSequencedPersisted.put(lastSequenceLIdKey, replSequenceLId); + highestSequencedPersisted.put(lastSequenceEIdKey, replSequenceEId); + increaseSnapshotCounterAndTakeSnapshotIfNeeded(position); + } + public void recordMessagePersistedNormal(PublishContext publishContext, Position position) { String producerName = publishContext.getProducerName(); long sequenceId = publishContext.getSequenceId(); long highestSequenceId = publishContext.getHighestSequenceId(); @@ -413,9 +579,18 @@ public void recordMessagePersisted(PublishContext publishContext, Position posit if (isLastChunk == null || isLastChunk) { highestSequencedPersisted.put(producerName, Math.max(highestSequenceId, sequenceId)); } + increaseSnapshotCounterAndTakeSnapshotIfNeeded(position); + } + + private void increaseSnapshotCounterAndTakeSnapshotIfNeeded(Position position) { if (++snapshotCounter >= snapshotInterval) { snapshotCounter = 0; takeSnapshot(position); + } else { + if (log.isDebugEnabled()) { + log.debug("[{}] Waiting for sequence-id snapshot {}/{}", topic.getName(), snapshotCounter, + snapshotInterval); + } } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java index 65bcbfd131f12..f8a602f68b908 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.service.persistent; +import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_REPL_SOURCE_POSITION; import io.netty.buffer.ByteBuf; import java.util.List; import lombok.extern.slf4j.Slf4j; @@ -98,6 +99,9 @@ protected boolean replicateEntries(List entries) { msg.setReplicatedFrom(localCluster); msg.setMessageId(new MessageIdImpl(entry.getLedgerId(), entry.getEntryId(), -1)); + // Add props for sequence checking. + msg.getMessageBuilder().addProperty().setKey(MSG_PROP_REPL_SOURCE_POSITION) + .setValue(String.format("%s:%s", entry.getLedgerId(), entry.getEntryId())); headersAndPayload.retain(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerMessageDeduplicationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerMessageDeduplicationTest.java index 50f8ac9594433..eb1109272216e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerMessageDeduplicationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerMessageDeduplicationTest.java @@ -24,10 +24,14 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.UnpooledByteBufAllocator; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.persistent.MessageDeduplication; import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.common.api.proto.MarkerType; +import org.apache.pulsar.common.api.proto.MessageMetadata; import org.testng.annotations.Test; public class BrokerMessageDeduplicationTest { @@ -42,7 +46,18 @@ public void markerMessageNotDeduplicated() { doReturn(true).when(deduplication).isEnabled(); Topic.PublishContext context = mock(Topic.PublishContext.class); doReturn(true).when(context).isMarkerMessage(); - MessageDeduplication.MessageDupStatus status = deduplication.isDuplicate(context, null); + + MessageMetadata msgMetadata = new MessageMetadata(); + msgMetadata.setMarkerType(MarkerType.TXN_ABORT_VALUE); + msgMetadata.setProducerName("p1"); + msgMetadata.setSequenceId(0); + msgMetadata.setPublishTime(System.currentTimeMillis()); + byte[] metadataData = msgMetadata.toByteArray(); + ByteBuf byteBuf = UnpooledByteBufAllocator.DEFAULT.heapBuffer(metadataData.length + 4); + byteBuf.writeInt(metadataData.length); + byteBuf.writeBytes(metadataData); + + MessageDeduplication.MessageDupStatus status = deduplication.isDuplicate(context, byteBuf); assertEquals(status, MessageDeduplication.MessageDupStatus.NotDup); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorDeduplicationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorDeduplicationTest.java new file mode 100644 index 0000000000000..d4c2de05a28eb --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorDeduplicationTest.java @@ -0,0 +1,933 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import static com.google.common.base.Preconditions.checkArgument; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.spy; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.UnpooledByteBufAllocator; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; +import io.vertx.core.impl.ConcurrentHashSet; +import java.lang.reflect.Field; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.ManagedCursor; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.service.persistent.MessageDeduplication; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.broker.service.schema.SchemaRegistryService; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.InjectedClientCnxClientBuilder; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MessageIdAdv; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.schema.GenericRecord; +import org.apache.pulsar.client.impl.ClientBuilderImpl; +import org.apache.pulsar.client.impl.ClientCnx; +import org.apache.pulsar.client.impl.ProducerImpl; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.client.impl.metrics.InstrumentProvider; +import org.apache.pulsar.common.api.AuthData; +import org.apache.pulsar.common.api.proto.BaseCommand; +import org.apache.pulsar.common.api.proto.CommandGetOrCreateSchemaResponse; +import org.apache.pulsar.common.api.proto.CommandGetSchemaResponse; +import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; +import org.apache.pulsar.common.protocol.ByteBufPair; +import org.apache.pulsar.common.protocol.Commands; +import org.apache.pulsar.common.protocol.schema.SchemaData; +import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; +import org.apache.pulsar.zookeeper.ZookeeperServerTest; +import org.awaitility.Awaitility; +import org.awaitility.reflect.WhiteboxImpl; +import org.mockito.stubbing.Answer; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +@Slf4j +@Test(groups = "broker") +public class OneWayReplicatorDeduplicationTest extends OneWayReplicatorTestBase { + + static final ObjectMapper JACKSON = new ObjectMapper(); + + @Override + @BeforeClass(alwaysRun = true, timeOut = 300000) + public void setup() throws Exception { + super.setup(); + waitInternalClientCreated(); + } + + @Override + @AfterClass(alwaysRun = true, timeOut = 300000) + public void cleanup() throws Exception { + super.cleanup(); + } + + @Override + protected void setConfigDefaults(ServiceConfiguration config, String clusterName, + LocalBookkeeperEnsemble bookkeeperEnsemble, ZookeeperServerTest brokerConfigZk) { + super.setConfigDefaults(config, clusterName, bookkeeperEnsemble, brokerConfigZk); + // For check whether deduplication snapshot has done. + config.setBrokerDeduplicationEntriesInterval(10); + config.setReplicationStartAt("earliest"); + // To cover more cases, write more than one ledger. + config.setManagedLedgerMaxEntriesPerLedger(100); + config.setManagedLedgerMinLedgerRolloverTimeMinutes(0); + config.setManagedLedgerMaxLedgerRolloverTimeMinutes(1); + } + + protected void waitReplicatorStopped(String topicName) { + Awaitility.await().untilAsserted(() -> { + Optional topicOptional2 = pulsar2.getBrokerService().getTopic(topicName, false).get(); + assertTrue(topicOptional2.isPresent()); + PersistentTopic persistentTopic2 = (PersistentTopic) topicOptional2.get(); + assertTrue(persistentTopic2.getProducers().isEmpty()); + Optional topicOptional1 = pulsar2.getBrokerService().getTopic(topicName, false).get(); + assertTrue(topicOptional1.isPresent()); + PersistentTopic persistentTopic1 = (PersistentTopic) topicOptional2.get(); + assertTrue(persistentTopic1.getReplicators().isEmpty() + || !persistentTopic1.getReplicators().get(cluster2).isConnected()); + }); + } + + protected void waitInternalClientCreated() throws Exception { + // Wait for the internal client created. + final String topicNameTriggerInternalClientCreate = + BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp_"); + admin1.topics().createNonPartitionedTopic(topicNameTriggerInternalClientCreate); + waitReplicatorStarted(topicNameTriggerInternalClientCreate); + cleanupTopics(() -> { + admin1.topics().delete(topicNameTriggerInternalClientCreate); + admin2.topics().delete(topicNameTriggerInternalClientCreate); + }); + } + + protected Runnable injectReplicatorClientCnx( + InjectedClientCnxClientBuilder.ClientCnxFactory clientCnxFactory) throws Exception { + String cluster2 = pulsar2.getConfig().getClusterName(); + BrokerService brokerService = pulsar1.getBrokerService(); + ClientBuilderImpl clientBuilder2 = (ClientBuilderImpl) PulsarClient.builder().serviceUrl(url2.toString()); + + // Inject spy client. + final var replicationClients = brokerService.getReplicationClients(); + PulsarClientImpl internalClient = (PulsarClientImpl) replicationClients.get(cluster2); + PulsarClientImpl injectedClient = InjectedClientCnxClientBuilder.create(clientBuilder2, clientCnxFactory); + assertTrue(replicationClients.remove(cluster2, internalClient)); + assertNull(replicationClients.putIfAbsent(cluster2, injectedClient)); + + // Return a cleanup injection task; + return () -> { + assertTrue(replicationClients.remove(cluster2, injectedClient)); + assertNull(replicationClients.putIfAbsent(cluster2, internalClient)); + injectedClient.closeAsync(); + }; + } + + @DataProvider(name = "deduplicationArgs") + public Object[][] deduplicationArgs() { + return new Object[][] { + {true/* inject repeated publishing*/, 1/* repeated messages window */, + true /* supportsReplDedupByLidAndEid */, false/* multi schemas */}, + {true/* inject repeated publishing*/, 2/* repeated messages window */, + true /* supportsReplDedupByLidAndEid */, false/* multi schemas */}, + {true/* inject repeated publishing*/, 3/* repeated messages window */, + true /* supportsReplDedupByLidAndEid */, false/* multi schemas */}, + {true/* inject repeated publishing*/, 4/* repeated messages window */, + true /* supportsReplDedupByLidAndEid */, false/* multi schemas */}, + {true/* inject repeated publishing*/, 5/* repeated messages window */, + true /* supportsReplDedupByLidAndEid */, false/* multi schemas */}, + {true/* inject repeated publishing*/, 10/* repeated messages window */, + true /* supportsReplDedupByLidAndEid */, false/* multi schemas */}, + // ===== multi schema + {true/* inject repeated publishing*/, 1/* repeated messages window */, + true /* supportsReplDedupByLidAndEid */, true/* multi schemas */}, + {true/* inject repeated publishing*/, 2/* repeated messages window */, + true /* supportsReplDedupByLidAndEid */, true/* multi schemas */}, + {true/* inject repeated publishing*/, 3/* repeated messages window */, + true /* supportsReplDedupByLidAndEid */, true/* multi schemas */}, + {true/* inject repeated publishing*/, 4/* repeated messages window */, + true /* supportsReplDedupByLidAndEid */, true/* multi schemas */}, + {true/* inject repeated publishing*/, 5/* repeated messages window */, + true /* supportsReplDedupByLidAndEid */, true/* multi schemas */}, + {true/* inject repeated publishing*/, 10/* repeated messages window */, + true /* supportsReplDedupByLidAndEid */, true/* multi schemas */}, + // ===== Compatability "source-cluster: old, target-cluster: new". + {false/* inject repeated publishing*/, 0/* repeated messages window */, + false /* supportsReplDedupByLidAndEid */, false/* multi schemas */}, + {false/* inject repeated publishing*/, 0/* repeated messages window */, + false /* supportsReplDedupByLidAndEid */, true/* multi schemas */}, + {true/* inject repeated publishing*/, 3/* repeated messages window */, + false /* supportsReplDedupByLidAndEid */, true/* multi schemas */}, + }; + } + + // TODO + // - Review the code to confirm that multi source-brokers can work when the source topic switch. + @Test(timeOut = 360 * 1000, dataProvider = "deduplicationArgs") + public void testDeduplication(final boolean injectRepeatedPublish, final int repeatedMessagesWindow, + final boolean supportsReplDedupByLidAndEid, boolean multiSchemas) throws Exception { + // 0. Inject a mechanism that duplicate all Send-Command for the replicator. + final List duplicatedMsgs = new ArrayList<>(); + Runnable taskToClearInjection = injectReplicatorClientCnx( + (conf, eventLoopGroup) -> new ClientCnx(InstrumentProvider.NOOP, conf, eventLoopGroup) { + + @Override + protected ByteBuf newConnectCommand() throws Exception { + if (supportsReplDedupByLidAndEid) { + return super.newConnectCommand(); + } + authenticationDataProvider = authentication.getAuthData(remoteHostName); + AuthData authData = authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA); + BaseCommand cmd = Commands.newConnectWithoutSerialize(authentication.getAuthMethodName(), authData, + this.protocolVersion, clientVersion, proxyToTargetBrokerAddress, null, null, null, null); + cmd.getConnect().getFeatureFlags().setSupportsReplDedupByLidAndEid(false); + return Commands.serializeWithSize(cmd); + } + + @Override + public boolean isBrokerSupportsReplDedupByLidAndEid() { + return supportsReplDedupByLidAndEid; + } + + @Override + public ChannelHandlerContext ctx() { + if (!injectRepeatedPublish) { + return super.ctx(); + } + final ChannelHandlerContext originalCtx = super.ctx; + ChannelHandlerContext spyContext = spy(originalCtx); + Answer injectedAnswer = invocation -> { + // Do not repeat the messages re-sending, and clear the previous cached messages when + // calling re-sending, to avoid publishing outs of order. + for (StackTraceElement stackTraceElement : Thread.currentThread().getStackTrace()) { + if (stackTraceElement.toString().contains("recoverProcessOpSendMsgFrom") + || stackTraceElement.toString().contains("resendMessages")) { + duplicatedMsgs.clear(); + return invocation.callRealMethod(); + } + } + + Object data = invocation.getArguments()[0]; + if (true && !(data instanceof ByteBufPair)) { + return invocation.callRealMethod(); + } + // Repeatedly send every message. + ByteBufPair byteBufPair = (ByteBufPair) data; + ByteBuf buf1 = byteBufPair.getFirst(); + ByteBuf buf2 = byteBufPair.getSecond(); + int bufferIndex1 = buf1.readerIndex(); + int bufferIndex2 = buf2.readerIndex(); + // Skip totalSize. + buf1.readInt(); + int cmdSize = buf1.readInt(); + BaseCommand cmd = new BaseCommand(); + cmd.parseFrom(buf1, cmdSize); + buf1.readerIndex(bufferIndex1); + if (cmd.getType().equals(BaseCommand.Type.SEND)) { + synchronized (duplicatedMsgs) { + if (duplicatedMsgs.size() >= repeatedMessagesWindow) { + for (ByteBufPair bufferPair : duplicatedMsgs) { + originalCtx.channel().write(bufferPair, originalCtx.voidPromise()); + originalCtx.channel().flush(); + } + duplicatedMsgs.clear(); + } + } + ByteBuf newBuffer1 = UnpooledByteBufAllocator.DEFAULT.heapBuffer( + buf1.readableBytes()); + buf1.readBytes(newBuffer1); + buf1.readerIndex(bufferIndex1); + ByteBuf newBuffer2 = UnpooledByteBufAllocator.DEFAULT.heapBuffer( + buf2.readableBytes()); + buf2.readBytes(newBuffer2); + buf2.readerIndex(bufferIndex2); + synchronized (duplicatedMsgs) { + if (newBuffer2.readableBytes() > 0) { + duplicatedMsgs.add(ByteBufPair.get(newBuffer1, newBuffer2)); + } + } + return invocation.callRealMethod(); + } else { + return invocation.callRealMethod(); + } + }; + doAnswer(injectedAnswer).when(spyContext).write(any()); + doAnswer(injectedAnswer).when(spyContext).write(any(), any(ChannelPromise.class)); + doAnswer(injectedAnswer).when(spyContext).writeAndFlush(any()); + doAnswer(injectedAnswer).when(spyContext).writeAndFlush(any(), any(ChannelPromise.class)); + return spyContext; + } + }); + + // 1. Create topics and enable deduplication. + final String topicName = BrokerTestUtil.newUniqueName("persistent://" + nonReplicatedNamespace + "/tp_"); + admin1.topics().createNonPartitionedTopic(topicName); + admin1.topics().createSubscription(topicName, "s1", MessageId.earliest); + admin2.topics().createNonPartitionedTopic(topicName); + admin2.topics().createSubscription(topicName, "s1", MessageId.earliest); + Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> { + PersistentTopic persistentTopic1 = + (PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get(); + PersistentTopic persistentTopic2 = + (PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get(); + admin1.topicPolicies().setDeduplicationStatus(topicName, true); + admin1.topicPolicies().setSchemaCompatibilityStrategy(topicName, + SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE); + admin2.topicPolicies().setDeduplicationStatus(topicName, true); + admin2.topicPolicies().setSchemaCompatibilityStrategy(topicName, + SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE); + MessageDeduplication messageDeduplication1 = persistentTopic1.getMessageDeduplication(); + if (messageDeduplication1 != null) { + int snapshotInterval1 = WhiteboxImpl.getInternalState(messageDeduplication1, "snapshotInterval"); + assertEquals(snapshotInterval1, 10); + } + MessageDeduplication messageDeduplication2 = persistentTopic2.getMessageDeduplication(); + if (messageDeduplication2 != null) { + int snapshotInterval2 = WhiteboxImpl.getInternalState(messageDeduplication2, "snapshotInterval"); + assertEquals(snapshotInterval2, 10); + } + assertEquals(persistentTopic1.getHierarchyTopicPolicies().getDeduplicationEnabled().get(), Boolean.TRUE); + assertEquals(persistentTopic1.getHierarchyTopicPolicies().getSchemaCompatibilityStrategy().get(), + SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE); + assertEquals(persistentTopic2.getHierarchyTopicPolicies().getDeduplicationEnabled().get(), Boolean.TRUE); + assertEquals(persistentTopic2.getHierarchyTopicPolicies().getSchemaCompatibilityStrategy().get(), + SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE); + }); + PersistentTopic tp1 = + (PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get(); + PersistentTopic tp2 = + (PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get(); + + // 2, Publish messages. + List msgSent = new ArrayList<>(); + Producer p1 = client1.newProducer(Schema.INT32).topic(topicName).create(); + Producer p2 = client1.newProducer(Schema.INT32).topic(topicName).create(); + Producer p3 = client1.newProducer(Schema.STRING).topic(topicName).create(); + Producer p4 = client1.newProducer(Schema.BOOL).topic(topicName).create(); + for (int i = 0; i < 10; i++) { + p1.send(i); + msgSent.add(String.valueOf(i)); + } + for (int i = 10; i < 200; i++) { + int msg1 = i; + int msg2 = 1000 + i; + String msg3 = (2000 + i) + ""; + boolean msg4 = i % 2 == 0; + p1.send(msg1); + p2.send(msg2); + msgSent.add(String.valueOf(msg1)); + msgSent.add(String.valueOf(msg2)); + if (multiSchemas) { + p3.send(msg3); + p4.send(msg4); + msgSent.add(String.valueOf(msg3)); + msgSent.add(String.valueOf(msg4)); + } + } + p1.close(); + p2.close(); + p3.close(); + p4.close(); + + // 3. Enable replication and wait the task to be finished. + admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1, cluster2)); + waitReplicatorStarted(topicName); + Awaitility.await().atMost(Duration.ofSeconds(60)).untilAsserted(() -> { + for (ManagedCursor cursor : tp1.getManagedLedger().getCursors()) { + if (cursor.getName().equals("pulsar.repl.r2")) { + long replBacklog = cursor.getNumberOfEntriesInBacklog(true); + log.info("repl backlog: {}", replBacklog); + assertEquals(replBacklog, 0); + } + } + }); + + // Verify: all messages were copied correctly. + List msgReceived = new ArrayList<>(); + Consumer consumer = client2.newConsumer(Schema.AUTO_CONSUME()).topic(topicName) + .subscriptionName("s1").subscribe(); + while (true) { + Message msg = consumer.receive(10, TimeUnit.SECONDS); + if (msg == null) { + break; + } + MessageIdAdv messageIdAdv = (MessageIdAdv) msg.getMessageId(); + log.info("received msg. source {}, target {}:{}", StringUtils.join(msg.getProperties().values(), ":"), + messageIdAdv.getLedgerId(), messageIdAdv.getEntryId()); + msgReceived.add(String.valueOf(msg.getValue())); + consumer.acknowledgeAsync(msg); + } + log.info("c1 topic stats-internal: " + + JACKSON.writeValueAsString(admin1.topics().getInternalStats(topicName))); + log.info("c2 topic stats-internal: " + + JACKSON.writeValueAsString(admin2.topics().getInternalStats(topicName))); + log.info("c1 topic stats-internal: " + + JACKSON.writeValueAsString(admin1.topics().getStats(topicName))); + log.info("c2 topic stats-internal: " + + JACKSON.writeValueAsString(admin2.topics().getStats(topicName))); + assertEquals(msgReceived, msgSent); + consumer.close(); + + // Verify: the deduplication cursor has been acked. + // "topic-policy.DeduplicationSnapshotInterval" is "10". + Awaitility.await().untilAsserted(() -> { + for (ManagedCursor cursor : tp1.getManagedLedger().getCursors()) { + if (cursor.getName().equals("pulsar.dedup")) { + assertTrue(cursor.getNumberOfEntriesInBacklog(true) < 10); + } + } + for (ManagedCursor cursor : tp2.getManagedLedger().getCursors()) { + if (cursor.getName().equals("pulsar.dedup")) { + assertTrue(cursor.getNumberOfEntriesInBacklog(true) < 10); + } + } + }); + // Remove the injection. + taskToClearInjection.run(); + + log.info("====== Verify: all messages will be replicated after reopening replication ======"); + + // Verify: all messages will be replicated after reopening replication. + // Reopen replication: stop replication. + admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1)); + waitReplicatorStopped(topicName); + admin2.topics().unload(topicName); + admin2.topics().delete(topicName); + // Reopen replication: enable replication. + admin2.topics().createNonPartitionedTopic(topicName); + admin2.topics().createSubscription(topicName, "s1", MessageId.earliest); + Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> { + PersistentTopic persistentTopic2 = + (PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get(); + admin2.topicPolicies().setDeduplicationStatus(topicName, true); + admin2.topicPolicies().setSchemaCompatibilityStrategy(topicName, + SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE); + MessageDeduplication messageDeduplication2 = persistentTopic2.getMessageDeduplication(); + if (messageDeduplication2 != null) { + int snapshotInterval2 = WhiteboxImpl.getInternalState(messageDeduplication2, "snapshotInterval"); + assertEquals(snapshotInterval2, 10); + } + assertEquals(persistentTopic2.getHierarchyTopicPolicies().getDeduplicationEnabled().get(), Boolean.TRUE); + assertEquals(persistentTopic2.getHierarchyTopicPolicies().getSchemaCompatibilityStrategy().get(), + SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE); + }); + admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1, cluster2)); + Awaitility.await().atMost(Duration.ofSeconds(60)).untilAsserted(() -> { + for (ManagedCursor cursor : tp2.getManagedLedger().getCursors()) { + if (cursor.getName().equals("pulsar.repl.c2")) { + assertEquals(cursor.getNumberOfEntriesInBacklog(true), 0); + } + } + }); + // Reopen replication: consumption. + List msgReceived2 = new ArrayList<>(); + Consumer consumer2 = client2.newConsumer(Schema.AUTO_CONSUME()).topic(topicName) + .subscriptionName("s1").subscribe(); + while (true) { + Message msg = consumer2.receive(10, TimeUnit.SECONDS); + if (msg == null) { + break; + } + MessageIdAdv messageIdAdv = (MessageIdAdv) msg.getMessageId(); + log.info("received msg. source {}, target {}:{}", StringUtils.join(msg.getProperties().values(), ":"), + messageIdAdv.getLedgerId(), messageIdAdv.getEntryId()); + msgReceived2.add(String.valueOf(msg.getValue())); + consumer2.acknowledgeAsync(msg); + } + // Verify: all messages were copied correctly. + log.info("c1 topic stats-internal: " + + JACKSON.writeValueAsString(admin1.topics().getInternalStats(topicName))); + log.info("c2 topic stats-internal: " + + JACKSON.writeValueAsString(admin2.topics().getInternalStats(topicName))); + log.info("c1 topic stats-internal: " + + JACKSON.writeValueAsString(admin1.topics().getStats(topicName))); + log.info("c2 topic stats-internal: " + + JACKSON.writeValueAsString(admin2.topics().getStats(topicName))); + assertEquals(msgReceived2, msgSent); + consumer2.close(); + + // cleanup. + admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1)); + waitReplicatorStopped(topicName); + Awaitility.await().until(() -> { + for (ManagedCursor cursor : tp1.getManagedLedger().getCursors()) { + if (cursor.getName().equals("pulsar.repl.r2")) { + return false; + } + } + return true; + }); + admin1.topics().delete(topicName); + admin2.topics().delete(topicName); + } + + @DataProvider(name = "enabledDeduplication") + public Object[][] enabledDeduplication() { + return new Object[][] { + {true}, + {false} + }; + } + + /*** + * To reproduce the issue that replication loss message if enabled deduplication + * 1. Publishing in the source cluster + * 1-1. Producer-1 send 2 messages: M1, M2 + * 1-2. Producer-2 send 2 messages: M3, M4 + * 2. Replicate messages to the remote cluster + * 2-1. Copies M1 and M2 + * 2-2. Repeatedly copies M1 and M2. and copies M3 and M4. + * 2-2-1. After repeatedly copies M1 and M2, the network broke. + * 3. After a topic unloading. + * 3-1. The replicator will start after the topic is loaded up. + * 3-2. The client will create a new connection. + * 4. Verify: All 4 messages are copied to the remote cluster. + */ + @Test(timeOut = 360 * 1000, dataProvider = "enabledDeduplication") + public void testDeduplicationNotLostMessage(boolean enabledDeduplication) throws Exception { + waitInternalClientCreated(); + + /** + * step-2: Inject a mechanism that makes the client connect broke after repeatedly copied M1 and M2. + */ + final List duplicatedMsgs = new ArrayList<>(); + final int repeatMsgIndex = 2; + AtomicInteger msgSent = new AtomicInteger(0); + ConcurrentHashSet injectedChannel = new ConcurrentHashSet<>(); + Runnable taskToClearInjection = injectReplicatorClientCnx( + (conf, eventLoopGroup) -> new ClientCnx(InstrumentProvider.NOOP, conf, eventLoopGroup) { + + @Override + public ChannelHandlerContext ctx() { + final ChannelHandlerContext originalCtx = super.ctx; + ChannelHandlerContext spyContext = spy(originalCtx); + Answer injectedAnswer = invocation -> { + // Do not repeat the messages re-sending, and clear the previous cached messages when + // calling re-sending, to avoid publishing outs of order. + for (StackTraceElement stackTraceElement : Thread.currentThread().getStackTrace()) { + if (stackTraceElement.toString().contains("recoverProcessOpSendMsgFrom") + || stackTraceElement.toString().contains("resendMessages")) { + duplicatedMsgs.clear(); + return invocation.callRealMethod(); + } + } + + Object data = invocation.getArguments()[0]; + if (true && !(data instanceof ByteBufPair)) { + return invocation.callRealMethod(); + } + // Repeatedly send every message. + ByteBufPair byteBufPair = (ByteBufPair) data; + ByteBuf buf1 = byteBufPair.getFirst(); + ByteBuf buf2 = byteBufPair.getSecond(); + int bufferIndex1 = buf1.readerIndex(); + int bufferIndex2 = buf2.readerIndex(); + // Skip totalSize. + buf1.readInt(); + int cmdSize = buf1.readInt(); + BaseCommand cmd = new BaseCommand(); + cmd.parseFrom(buf1, cmdSize); + buf1.readerIndex(bufferIndex1); + if (cmd.getType().equals(BaseCommand.Type.SEND)) { + synchronized (duplicatedMsgs) { + if (duplicatedMsgs.isEmpty() && msgSent.get() == repeatMsgIndex) { + return null; + } + if (msgSent.get() == repeatMsgIndex) { + for (ByteBufPair bufferPair : duplicatedMsgs) { + originalCtx.channel().write(bufferPair, originalCtx.voidPromise()); + originalCtx.channel().flush(); + } + duplicatedMsgs.clear(); + return null; + } + } + ByteBuf newBuffer1 = UnpooledByteBufAllocator.DEFAULT.heapBuffer( + buf1.readableBytes()); + buf1.readBytes(newBuffer1); + buf1.readerIndex(bufferIndex1); + ByteBuf newBuffer2 = UnpooledByteBufAllocator.DEFAULT.heapBuffer( + buf2.readableBytes()); + buf2.readBytes(newBuffer2); + buf2.readerIndex(bufferIndex2); + synchronized (duplicatedMsgs) { + if (newBuffer2.readableBytes() > 0 && msgSent.incrementAndGet() <= repeatMsgIndex) { + duplicatedMsgs.add(ByteBufPair.get(newBuffer1, newBuffer2)); + } + } + return invocation.callRealMethod(); + } else { + return invocation.callRealMethod(); + } + }; + doAnswer(injectedAnswer).when(spyContext).write(any()); + doAnswer(injectedAnswer).when(spyContext).write(any(), any(ChannelPromise.class)); + doAnswer(injectedAnswer).when(spyContext).writeAndFlush(any()); + doAnswer(injectedAnswer).when(spyContext).writeAndFlush(any(), any(ChannelPromise.class)); + injectedChannel.add(originalCtx.channel()); + return spyContext; + } + }); + + // Create topics and enable deduplication. + final String topicName = BrokerTestUtil.newUniqueName("persistent://" + nonReplicatedNamespace + "/tp_"); + admin1.topics().createNonPartitionedTopic(topicName); + admin1.topics().createSubscription(topicName, "s1", MessageId.earliest); + admin2.topics().createNonPartitionedTopic(topicName); + admin2.topics().createSubscription(topicName, "s1", MessageId.earliest); + PersistentTopic tp1 = + (PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get(); + PersistentTopic tp2 = + (PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get(); + ManagedLedgerImpl ml2 = (ManagedLedgerImpl) tp2.getManagedLedger(); + if (enabledDeduplication) { + Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> { + PersistentTopic persistentTopic1 = + (PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get(); + PersistentTopic persistentTopic2 = + (PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get(); + admin1.topicPolicies().setDeduplicationStatus(topicName, true); + admin2.topicPolicies().setDeduplicationStatus(topicName, true); + assertEquals(persistentTopic1.getHierarchyTopicPolicies().getDeduplicationEnabled().get(), + Boolean.TRUE); + assertEquals(persistentTopic2.getHierarchyTopicPolicies().getDeduplicationEnabled().get(), + Boolean.TRUE); + }); + } + // Let broker persist messages one by one, in other words, it starts to persist the next message after the + // previous has been written into BKs. + PersistentTopic spyTp2 = spy(tp2); + doAnswer(invocation -> { + try { + Awaitility.await().atMost(Duration.ofSeconds(3)).untilAsserted(() -> { + assertEquals(ml2.getPendingAddEntriesCount(), 0); + }); + } catch (Throwable throwable) { + // Ignore this timeout error. + } + return invocation.callRealMethod(); + }).when(spyTp2).publishMessage(any(ByteBuf.class), any(Topic.PublishContext.class)); + CompletableFuture> originalTp2 = pulsar2.getBrokerService().getTopics().put(tp2.getName(), + CompletableFuture.completedFuture(Optional.of(spyTp2))); + + /** + * Step-1: Publishes messages in the source cluster and start replication, + */ + ProducerImpl p1 = (ProducerImpl) client1.newProducer().topic(topicName).producerName("p1").create(); + ProducerImpl p2 = (ProducerImpl) client1.newProducer().topic(topicName).producerName("p2").create(); + p1.send("1".toString().getBytes(StandardCharsets.UTF_8)); + p1.send("2".toString().getBytes(StandardCharsets.UTF_8)); + p2.send("3".toString().getBytes(StandardCharsets.UTF_8)); + p2.send("4".toString().getBytes(StandardCharsets.UTF_8)); + + // Enable replication and wait the task to be finished, it should not finish if no bug. + admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1, cluster2)); + waitReplicatorStarted(topicName); + try { + Awaitility.await().atMost(Duration.ofSeconds(20)).pollInterval(Duration.ofSeconds(1)).untilAsserted(() -> { + for (ManagedCursor cursor : tp1.getManagedLedger().getCursors()) { + if (cursor.getName().equals("pulsar.repl.r2")) { + long replBacklog = cursor.getNumberOfEntriesInBacklog(true); + log.info("repl backlog: {}", replBacklog); + assertEquals(replBacklog, 0); + } + } + }); + } catch (Throwable t) { + // Ignore the error. + } + + /** + * Step-3: remove the injections, unload topics and rebuild connections of the replicator. + */ + taskToClearInjection.run(); + pulsar2.getBrokerService().getTopics().put(tp2.getName(), originalTp2); + admin1.topics().unload(topicName); + admin2.topics().unload(topicName); + for (Channel channel : injectedChannel) { + channel.close(); + } + waitReplicatorStarted(topicName); + PersistentTopic tp12 = + (PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get(); + PersistentTopic tp22 = + (PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get(); + Awaitility.await().atMost(Duration.ofSeconds(20)).pollInterval(Duration.ofSeconds(1)).untilAsserted(() -> { + for (ManagedCursor cursor : tp12.getManagedLedger().getCursors()) { + if (cursor.getName().equals("pulsar.repl.r2")) { + long replBacklog = cursor.getNumberOfEntriesInBacklog(true); + log.info("repl backlog: {}", replBacklog); + assertEquals(replBacklog, 0); + } + } + }); + + /** + * Verify: All 4 messages are copied to the remote cluster. + */ + List msgReceived = new ArrayList<>(); + Consumer consumer = client2.newConsumer().topic(topicName) + .subscriptionName("s1").subscribe(); + while (true) { + Message msg = consumer.receive(10, TimeUnit.SECONDS); + if (msg == null) { + break; + } + MessageIdAdv messageIdAdv = (MessageIdAdv) msg.getMessageId(); + log.info("received msg. source {}, target {}:{}", StringUtils.join(msg.getProperties().values(), ":"), + messageIdAdv.getLedgerId(), messageIdAdv.getEntryId()); + msgReceived.add(new String(msg.getData(), StandardCharsets.UTF_8)); + consumer.acknowledgeAsync(msg); + } + + log.info("received msgs: {}", msgReceived); + assertTrue(msgReceived.contains("1")); + assertTrue(msgReceived.contains("2")); + assertTrue(msgReceived.contains("3")); + assertTrue(msgReceived.contains("4")); + if (enabledDeduplication) { + assertEquals(msgReceived, Arrays.asList("1", "2", "3", "4")); + } + + // cleanup. + consumer.close(); + p1.close(); + p2.close(); + admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1)); + waitReplicatorStopped(topicName); + Awaitility.await().until(() -> { + for (ManagedCursor cursor : tp12.getManagedLedger().getCursors()) { + if (cursor.getName().equals("pulsar.repl.r2")) { + return false; + } + } + return true; + }); + admin1.topics().delete(topicName); + admin2.topics().delete(topicName); + } + + @Test(timeOut = 360 * 1000, dataProvider = "enabledDeduplication") + public void testReplicationLoadSchemaTimeout(boolean enabledDeduplication) throws Exception { + waitInternalClientCreated(); + + /** + * Inject a timeout error for Get Schema. + */ + Field filedSchemaRegistryService = PulsarService.class.getDeclaredField("schemaRegistryService"); + filedSchemaRegistryService.setAccessible(true); + SchemaRegistryService originalSchemaRegistryService = + (SchemaRegistryService) filedSchemaRegistryService.get(pulsar2); + SchemaRegistryService spySchemaRegistryService = spy(originalSchemaRegistryService); + AtomicBoolean getSchemaSuccess = new AtomicBoolean(false); + doAnswer(invocation -> { + if (getSchemaSuccess.get()) { + getSchemaSuccess.set(false); + return invocation.callRealMethod(); + } else { + getSchemaSuccess.set(true); + } + Thread.sleep(60 * 1000); + return invocation.callRealMethod(); + }).when(spySchemaRegistryService).findSchemaVersion(any(String.class), any(SchemaData.class)); + filedSchemaRegistryService.set(pulsar2, spySchemaRegistryService); + Runnable taskToClearInjection = injectReplicatorClientCnx( + (conf, eventLoopGroup) -> new ClientCnx(InstrumentProvider.NOOP, conf, eventLoopGroup) { + @Override + protected void handleGetSchemaResponse(CommandGetSchemaResponse commandGetSchemaResponse) { + if (getSchemaSuccess.get()) { + getSchemaSuccess.set(false); + super.handleGetSchemaResponse(commandGetSchemaResponse); + return; + } else { + getSchemaSuccess.set(true); + } + checkArgument(state == State.Ready); + long requestId = commandGetSchemaResponse.getRequestId(); + CompletableFuture future = + (CompletableFuture) pendingRequests.remove(requestId); + if (future == null) { + duplicatedResponseCounter.incrementAndGet(); + log.warn("{} Received unknown request id from server: {}", ctx.channel(), requestId); + return; + } + future.completeExceptionally(new PulsarClientException.TimeoutException("Mocked timeout")); + } + + @Override + protected void handleGetOrCreateSchemaResponse(CommandGetOrCreateSchemaResponse + commandGetOrCreateSchemaResponse) { + + if (getSchemaSuccess.get()) { + getSchemaSuccess.set(false); + super.handleGetOrCreateSchemaResponse(commandGetOrCreateSchemaResponse); + return; + } else { + getSchemaSuccess.set(true); + } + + checkArgument(state == State.Ready); + long requestId = commandGetOrCreateSchemaResponse.getRequestId(); + CompletableFuture future = + (CompletableFuture) pendingRequests.remove(requestId); + if (future == null) { + duplicatedResponseCounter.incrementAndGet(); + log.warn("{} Received unknown request id from server: {}", ctx.channel(), requestId); + return; + } + future.completeExceptionally(new PulsarClientException.TimeoutException("Mocked timeout")); + } + }); + + // Create topics and enable deduplication. + final String topicName = BrokerTestUtil.newUniqueName("persistent://" + nonReplicatedNamespace + "/tp_"); + admin1.topics().createNonPartitionedTopic(topicName); + admin1.topics().createSubscription(topicName, "s1", MessageId.earliest); + admin2.topics().createNonPartitionedTopic(topicName); + admin2.topics().createSubscription(topicName, "s1", MessageId.earliest); + PersistentTopic tp1 = + (PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get(); + PersistentTopic tp2 = + (PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get(); + if (enabledDeduplication) { + Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> { + PersistentTopic persistentTopic1 = + (PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get(); + PersistentTopic persistentTopic2 = + (PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get(); + admin1.topicPolicies().setDeduplicationStatus(topicName, true); + admin2.topicPolicies().setDeduplicationStatus(topicName, true); + assertEquals(persistentTopic1.getHierarchyTopicPolicies().getDeduplicationEnabled().get(), + Boolean.TRUE); + assertEquals(persistentTopic2.getHierarchyTopicPolicies().getDeduplicationEnabled().get(), + Boolean.TRUE); + }); + } + Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> { + PersistentTopic persistentTopic1 = + (PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get(); + PersistentTopic persistentTopic2 = + (PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get(); + admin1.topicPolicies().setSchemaCompatibilityStrategy(topicName, + SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE); + admin2.topicPolicies().setSchemaCompatibilityStrategy(topicName, + SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE); + assertEquals(persistentTopic1.getHierarchyTopicPolicies().getSchemaCompatibilityStrategy().get(), + SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE); + assertEquals(persistentTopic2.getHierarchyTopicPolicies().getSchemaCompatibilityStrategy().get(), + SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE); + }); + + // Publishes messages in the source cluster. + Producer p1 = client1.newProducer().topic(topicName).producerName("p1").create(); + Producer p2 = client1.newProducer().topic(topicName).producerName("p2").create(); + Producer p3 = client1.newProducer(Schema.STRING).topic(topicName).producerName("p3").create(); + p1.send("1".toString().getBytes(StandardCharsets.UTF_8)); + p1.send("2".toString().getBytes(StandardCharsets.UTF_8)); + p3.send("2-1"); + p3.send("2-2"); + p2.send("3".toString().getBytes(StandardCharsets.UTF_8)); + p2.send("4".toString().getBytes(StandardCharsets.UTF_8)); + + // Enable replication and wait the task to be finished, it should not finish if no bug. + admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1, cluster2)); + waitReplicatorStarted(topicName); + Awaitility.await().atMost(Duration.ofSeconds(180)).pollInterval(Duration.ofSeconds(1)).untilAsserted(() -> { + for (ManagedCursor cursor : tp1.getManagedLedger().getCursors()) { + if (cursor.getName().equals("pulsar.repl.r2")) { + long replBacklog = cursor.getNumberOfEntriesInBacklog(true); + log.info("repl backlog: {}", replBacklog); + assertEquals(replBacklog, 0); + } + } + }); + + // Verify: All messages are copied to the remote cluster. + List msgReceived = new ArrayList<>(); + Consumer consumer = client2.newConsumer().topic(topicName) + .subscriptionName("s1").subscribe(); + while (true) { + Message msg = consumer.receive(10, TimeUnit.SECONDS); + if (msg == null) { + break; + } + MessageIdAdv messageIdAdv = (MessageIdAdv) msg.getMessageId(); + log.info("received msg. source {}, target {}:{}", StringUtils.join(msg.getProperties().values(), ":"), + messageIdAdv.getLedgerId(), messageIdAdv.getEntryId()); + msgReceived.add(new String(msg.getData(), StandardCharsets.UTF_8)); + consumer.acknowledgeAsync(msg); + } + log.info("received msgs: {}", msgReceived); + assertTrue(msgReceived.contains("1")); + assertTrue(msgReceived.contains("2")); + assertTrue(msgReceived.contains("2-1")); + assertTrue(msgReceived.contains("2-2")); + assertTrue(msgReceived.contains("3")); + assertTrue(msgReceived.contains("4")); + if (enabledDeduplication) { + assertEquals(msgReceived, Arrays.asList("1", "2", "2-1", "2-2", "3", "4")); + } + + // cleanup. + taskToClearInjection.run(); + filedSchemaRegistryService.set(pulsar2, originalSchemaRegistryService); + consumer.close(); + p1.close(); + p2.close(); + p3.close(); + admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1)); + waitReplicatorStopped(topicName); + Awaitility.await().until(() -> { + for (ManagedCursor cursor : tp1.getManagedLedger().getCursors()) { + if (cursor.getName().equals("pulsar.repl.r2")) { + return false; + } + } + return true; + }); + admin1.topics().delete(topicName); + admin2.topics().delete(topicName); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java index 2420ed58bed27..bf2276fdf4155 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java @@ -405,9 +405,9 @@ public void testReplicationWithSchema() throws Exception { int lastId = -1; for (int i = 0; i < totalMessages; i++) { - Message msg1 = consumer1.receive(); - Message msg2 = consumer2.receive(); - Message msg3 = consumer3.receive(); + Message msg1 = consumer1.receive(10, TimeUnit.SECONDS); + Message msg2 = consumer2.receive(10, TimeUnit.SECONDS); + Message msg3 = consumer3.receive(10, TimeUnit.SECONDS); assertTrue(msg1 != null && msg2 != null && msg3 != null); GenericRecord record1 = msg1.getValue(); GenericRecord record2 = msg2.getValue(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index ab02c89c298ed..e8b691b2eea17 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -122,10 +122,12 @@ public class ClientCnx extends PulsarHandler { protected final Authentication authentication; protected State state; - private AtomicLong duplicatedResponseCounter = new AtomicLong(0); + @VisibleForTesting + protected AtomicLong duplicatedResponseCounter = new AtomicLong(0); + @VisibleForTesting @Getter - private final ConcurrentLongHashMap> pendingRequests = + protected final ConcurrentLongHashMap> pendingRequests = ConcurrentLongHashMap.>newBuilder() .expectedItems(16) .concurrencyLevel(1) @@ -193,6 +195,8 @@ public class ClientCnx extends PulsarHandler { private boolean supportsTopicWatchers; @Getter private boolean supportsGetPartitionedMetadataWithoutAutoCreation; + @Getter + private boolean brokerSupportsReplDedupByLidAndEid; /** Idle stat. **/ @Getter @@ -201,7 +205,7 @@ public class ClientCnx extends PulsarHandler { @Getter private long lastDisconnectedTimestamp; - private final String clientVersion; + protected final String clientVersion; protected enum State { None, SentConnectFrame, Ready, Failed, Connecting @@ -405,6 +409,8 @@ protected void handleConnected(CommandConnected connected) { supportsGetPartitionedMetadataWithoutAutoCreation = connected.hasFeatureFlags() && connected.getFeatureFlags().isSupportsGetPartitionedMetadataWithoutAutoCreation(); + brokerSupportsReplDedupByLidAndEid = + connected.hasFeatureFlags() && connected.getFeatureFlags().isSupportsReplDedupByLidAndEid(); // set remote protocol version to the correct version before we complete the connection future setRemoteEndpointProtocolVersion(connected.getProtocolVersion()); @@ -474,18 +480,19 @@ protected void handleSendReceipt(CommandSendReceipt sendReceipt) { ledgerId = sendReceipt.getMessageId().getLedgerId(); entryId = sendReceipt.getMessageId().getEntryId(); } - + ProducerImpl producer = producers.get(producerId); if (ledgerId == -1 && entryId == -1) { - log.warn("{} Message with sequence-id {} published by producer {} has been dropped", ctx.channel(), - sequenceId, producerId); - } - - if (log.isDebugEnabled()) { - log.debug("{} Got receipt for producer: {} -- msg: {} -- id: {}:{}", ctx.channel(), producerId, sequenceId, - ledgerId, entryId); + log.warn("{} Message with sequence-id {}-{} published by producer [id:{}, name:{}] has been dropped", + ctx.channel(), sequenceId, highestSequenceId, producerId, + producer != null ? producer.getProducerName() : "null"); + } else { + if (log.isDebugEnabled()) { + log.debug("{} Got receipt for producer: [id:{}, name:{}] -- sequence-id: {}-{} -- entry-id: {}:{}", + ctx.channel(), producerId, producer.getProducerName(), sequenceId, highestSequenceId, + ledgerId, entryId); + } } - ProducerImpl producer = producers.get(producerId); if (producer != null) { producer.ackReceived(this, sequenceId, highestSequenceId, ledgerId, entryId); } else { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/GeoReplicationProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/GeoReplicationProducerImpl.java new file mode 100644 index 0000000000000..3201dde748f34 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/GeoReplicationProducerImpl.java @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import io.netty.util.ReferenceCountUtil; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; +import org.apache.pulsar.common.api.proto.KeyValue; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.protocol.Markers; + +@Slf4j +public class GeoReplicationProducerImpl extends ProducerImpl{ + + public static final String MSG_PROP_REPL_SOURCE_POSITION = "__MSG_PROP_REPL_SOURCE_POSITION"; + public static final String MSG_PROP_IS_REPL_MARKER = "__MSG_PROP_IS_REPL_MARKER"; + + private long lastPersistedSourceLedgerId; + private long lastPersistedSourceEntryId; + + private final boolean isPersistentTopic; + + public GeoReplicationProducerImpl(PulsarClientImpl client, String topic, + ProducerConfigurationData conf, + CompletableFuture producerCreatedFuture, int partitionIndex, + Schema schema, ProducerInterceptors interceptors, + Optional overrideProducerName) { + super(client, topic, conf, producerCreatedFuture, partitionIndex, schema, interceptors, overrideProducerName); + isPersistentTopic = TopicName.get(topic).isPersistent(); + } + + private boolean isBrokerSupportsReplDedupByLidAndEid(ClientCnx cnx) { + // Non-Persistent topic does not have ledger id or entry id, so it does not support. + return cnx.isBrokerSupportsReplDedupByLidAndEid() && isPersistentTopic; + } + + @Override + protected void ackReceived(ClientCnx cnx, long seq, long highSeq, long ledgerId, long entryId) { + if (!isBrokerSupportsReplDedupByLidAndEid(cnx)) { + // Repl V1 is the same as normal for this handling. + super.ackReceived(cnx, seq, highSeq, ledgerId, entryId); + return; + } + synchronized (this) { + OpSendMsg op = pendingMessages.peek(); + if (op == null) { + if (log.isDebugEnabled()) { + log.debug("[{}] [{}] Got ack for timed out msg {}:{}", topic, producerName, seq, highSeq); + } + return; + } + // Replicator send markers also, use sequenceId to check the marker send-receipt. + if (isReplicationMarker(highSeq)) { + ackReceivedReplMarker(cnx, op, seq, highSeq, ledgerId, entryId); + return; + } + ackReceivedReplicatedMsg(cnx, op, seq, highSeq, ledgerId, entryId); + } + } + + private void ackReceivedReplicatedMsg(ClientCnx cnx, OpSendMsg op, long sourceLId, long sourceEId, + long targetLId, long targetEid) { + // Parse source cluster's entry position. + Long pendingLId = null; + Long pendingEId = null; + List kvPairList = op.msg.getMessageBuilder().getPropertiesList(); + for (KeyValue kvPair : kvPairList) { + if (kvPair.getKey().equals(MSG_PROP_REPL_SOURCE_POSITION)) { + if (!kvPair.getValue().contains(":")) { + break; + } + String[] ledgerIdAndEntryId = kvPair.getValue().split(":"); + if (ledgerIdAndEntryId.length != 2 || !StringUtils.isNumeric(ledgerIdAndEntryId[0]) + || !StringUtils.isNumeric(ledgerIdAndEntryId[1])) { + break; + } + pendingLId = Long.valueOf(ledgerIdAndEntryId[0]); + pendingEId = Long.valueOf(ledgerIdAndEntryId[1]); + break; + } + } + + // Case-1: repeatedly publish. Source message was exactly resend by the Replicator after a cursor rewind. + // - The first time: Replicator --M1--> producer --> ... + // - Cursor rewind. + // - The second time: Replicator --M1--> producer --> ... + if (pendingLId != null && pendingEId != null + && (pendingLId < lastPersistedSourceLedgerId || (pendingLId.longValue() == lastPersistedSourceLedgerId + && pendingEId.longValue() <= lastPersistedSourceEntryId))) { + if (log.isDebugEnabled()) { + log.debug("[{}] [{}] Received an msg send receipt[pending send is repeated due to repl cursor rewind]:" + + " source entry {}:{}, pending send: {}:{}, latest persisted: {}:{}", + topic, producerName, sourceLId, sourceEId, pendingLId, pendingEId, + lastPersistedSourceLedgerId, lastPersistedSourceEntryId); + } + removeAndApplyCallback(op, sourceLId, sourceEId, targetLId, targetEid, false); + ackReceived(cnx, sourceLId, sourceEId, targetLId, targetEid); + return; + } + + // Case-2: repeatedly publish. Send command was executed again by the producer after a reconnect. + // - Replicator --M1--> producer --> ... + // - The first time: producer call Send-Command-1. + // - Producer reconnect. + // - The second time: producer call Send-Command-1. + if (sourceLId < lastPersistedSourceLedgerId + || (sourceLId == lastPersistedSourceLedgerId && sourceEId <= lastPersistedSourceEntryId)) { + if (log.isDebugEnabled()) { + log.debug("[{}] [{}] Received an msg send receipt[repeated]: source entry {}:{}, latest persisted:" + + " {}:{}", + topic, producerName, sourceLId, sourceEId, + lastPersistedSourceLedgerId, lastPersistedSourceEntryId); + } + return; + } + + // Case-3, which is expected. + if (pendingLId != null && pendingEId != null && sourceLId == pendingLId.longValue() + && sourceEId == pendingEId.longValue()) { + if (log.isDebugEnabled()) { + log.debug("[{}] [{}] Received an msg send receipt[expected]: source entry {}:{}, target entry:" + + " {}:{}", + topic, producerName, sourceLId, sourceEId, + targetLId, targetEid); + } + lastPersistedSourceLedgerId = sourceLId; + lastPersistedSourceEntryId = sourceEId; + removeAndApplyCallback(op, sourceLId, sourceEId, targetLId, targetEid, false); + return; + } + + // Case-4: Unexpected + // 4-1: got null source cluster's entry position, which is unexpected. + // 4-2: unknown error, which is unexpected. + log.error("[{}] [{}] Received an msg send receipt[error]: source entry {}:{}, target entry: {}:{}," + + " pending send: {}:{}, latest persisted: {}:{}, queue-size: {}", + topic, producerName, sourceLId, sourceEId, targetLId, targetEid, pendingLId, pendingEId, + lastPersistedSourceLedgerId, lastPersistedSourceEntryId, pendingMessages.messagesCount()); + cnx.channel().close(); + } + + protected void ackReceivedReplMarker(ClientCnx cnx, OpSendMsg op, long seq, long isSourceMarker, + long ledgerId, long entryId) { + // Case-1: repeatedly publish repl marker. + long lastSeqPersisted = LAST_SEQ_ID_PUBLISHED_UPDATER.get(this); + if (lastSeqPersisted != 0 && seq <= lastSeqPersisted) { + // Ignoring the ack since it's referring to a message that has already timed out. + if (log.isDebugEnabled()) { + log.debug("[{}] [{}] Received an repl marker send receipt[repeated]. seq: {}, seqPersisted: {}," + + " isSourceMarker: {}, target entry: {}:{}", + topic, producerName, seq, lastSeqPersisted, isSourceMarker, ledgerId, entryId); + } + return; + } + + // Case-2, which is expected: + // condition: broker responds SendReceipt who is a repl marker. + // and condition: the current pending msg is also a marker. + boolean pendingMsgIsReplMarker = isReplicationMarker(op); + if (pendingMsgIsReplMarker && seq == op.sequenceId) { + if (log.isDebugEnabled()) { + log.debug("[{}] [{}] Received an repl marker send receipt[expected]. seq: {}, seqPersisted: {}," + + " isReplMarker: {}, target entry: {}:{}", + topic, producerName, seq, lastSeqPersisted, isSourceMarker, ledgerId, entryId); + } + long calculatedSeq = getHighestSequenceId(op); + LAST_SEQ_ID_PUBLISHED_UPDATER.getAndUpdate(this, last -> Math.max(last, calculatedSeq)); + removeAndApplyCallback(op, seq, isSourceMarker, ledgerId, entryId, true); + return; + } + + // Case-3, unexpected. + // 3-1: if "lastSeqPersisted < seq <= lastInProgressSend", rather than going here, it should be a SendError. + // 3-2: unknown error. + long lastInProgressSend = LAST_SEQ_ID_PUSHED_UPDATER.get(this); + String logText = String.format("[%s] [%s] Received an repl marker send receipt[error]. seq: %s, seqPending: %s." + + " sequenceIdPersisted: %s, lastInProgressSend: %s," + + " isSourceMarker: %s, target entry: %s:%s, queue-size: %s", + topic, producerName, seq, pendingMsgIsReplMarker ? op.sequenceId : "unknown", + lastSeqPersisted, lastInProgressSend, + isSourceMarker, ledgerId, entryId, pendingMessages.messagesCount() + ); + if (seq < lastInProgressSend) { + log.warn(logText); + } else { + log.error(logText); + } + // Force connection closing so that messages can be re-transmitted in a new connection. + cnx.channel().close(); + } + + private void removeAndApplyCallback(OpSendMsg op, long lIdSent, long eIdSent, long ledgerId, long entryId, + boolean isMarker) { + pendingMessages.remove(); + releaseSemaphoreForSendOp(op); + // Since Geo-Replicator will not send batched message, skip to update the field + // "LAST_SEQ_ID_PUBLISHED_UPDATER". + op.setMessageId(ledgerId, entryId, partitionIndex); + try { + // Need to protect ourselves from any exception being thrown in the future handler from the + // application + op.sendComplete(null); + } catch (Throwable t) { + log.warn("[{}] [{}] Got exception while completing the callback for -- source-message: {}:{} --" + + " target-msg: {}:{} -- isMarker: {}", + topic, producerName, lIdSent, eIdSent, ledgerId, entryId, isMarker, t); + } + ReferenceCountUtil.safeRelease(op.cmd); + op.recycle(); + } + + private boolean isReplicationMarker(OpSendMsg op) { + return op.msg != null && op.msg.getMessageBuilder().hasMarkerType() + && Markers.isReplicationMarker(op.msg.getMessageBuilder().getMarkerType()); + } + + private boolean isReplicationMarker(long highestSeq) { + return Long.MIN_VALUE == highestSeq; + } + + @Override + protected void updateLastSeqPushed(OpSendMsg op) { + // Only update the value for repl marker. + if (isReplicationMarker(op)) { + super.updateLastSeqPushed(op); + } + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 935a6251ddaf8..039468386edca 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -115,7 +115,7 @@ public class ProducerImpl extends ProducerBase implements TimerTask, Conne // Variable is updated in a synchronized block private volatile long msgIdGenerator; - private final OpSendMsgQueue pendingMessages; + protected final OpSendMsgQueue pendingMessages; private final Optional semaphore; private volatile Timeout sendTimeout = null; private final long lookupDeadline; @@ -132,12 +132,12 @@ public class ProducerImpl extends ProducerBase implements TimerTask, Conne private LastSendFutureWrapper lastSendFutureWrapper = LastSendFutureWrapper.create(lastSendFuture); // Globally unique producer name - private String producerName; + protected String producerName; private final boolean userProvidedProducerName; private String connectionId; private String connectedSince; - private final int partitionIndex; + protected final int partitionIndex; private final ProducerStatsRecorder stats; @@ -1264,7 +1264,7 @@ public void terminated(ClientCnx cnx) { } } - void ackReceived(ClientCnx cnx, long sequenceId, long highestSequenceId, long ledgerId, long entryId) { + protected void ackReceived(ClientCnx cnx, long sequenceId, long highestSequenceId, long ledgerId, long entryId) { OpSendMsg op = null; synchronized (this) { op = pendingMessages.peek(); @@ -1339,11 +1339,11 @@ void ackReceived(ClientCnx cnx, long sequenceId, long highestSequenceId, long le op.recycle(); } - private long getHighestSequenceId(OpSendMsg op) { + protected long getHighestSequenceId(OpSendMsg op) { return Math.max(op.highestSequenceId, op.sequenceId); } - private void releaseSemaphoreForSendOp(OpSendMsg op) { + protected void releaseSemaphoreForSendOp(OpSendMsg op) { semaphoreRelease(isBatchMessagingEnabled() ? op.numMessagesInBatch : 1); @@ -2365,10 +2365,7 @@ protected synchronized void processOpSendMsg(OpSendMsg op) { return; } pendingMessages.add(op); - if (op.msg != null) { - LAST_SEQ_ID_PUSHED_UPDATER.getAndUpdate(this, - last -> Math.max(last, getHighestSequenceId(op))); - } + updateLastSeqPushed(op); final ClientCnx cnx = getCnxIfReady(); if (cnx != null) { @@ -2394,6 +2391,13 @@ protected synchronized void processOpSendMsg(OpSendMsg op) { } } + protected void updateLastSeqPushed(OpSendMsg op) { + if (op.msg != null) { + LAST_SEQ_ID_PUSHED_UPDATER.getAndUpdate(this, + last -> Math.max(last, getHighestSequenceId(op))); + } + } + // Must acquire a lock on ProducerImpl.this before calling method. private void recoverProcessOpSendMsgFrom(ClientCnx cnx, MessageImpl from, long expectedEpoch) { if (expectedEpoch != this.connectionHandler.getEpoch() || cnx() == null) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index 38df40dee400e..57e2493485968 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -513,6 +513,10 @@ protected ProducerImpl newProducerImpl(String topic, int partitionIndex, ProducerInterceptors interceptors, CompletableFuture> producerCreatedFuture, Optional overrideProducerName) { + if (conf.isReplProducer()) { + return new GeoReplicationProducerImpl(PulsarClientImpl.this, topic, conf, producerCreatedFuture, + partitionIndex, schema, interceptors, overrideProducerName); + } return new ProducerImpl<>(PulsarClientImpl.this, topic, conf, producerCreatedFuture, partitionIndex, schema, interceptors, overrideProducerName); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java index 0c770c7c9bd05..52fbf8f9270cf 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java @@ -208,6 +208,8 @@ public class ProducerConfigurationData implements Serializable, Cloneable { private boolean isNonPartitionedTopicExpected; + private boolean isReplProducer; + @ApiModelProperty( name = "initialSubscriptionName", value = "Use this configuration to automatically create an initial subscription when creating a topic." diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index 4f390cc99e610..95053e5e7e0a7 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -192,6 +192,7 @@ private static void setFeatureFlags(FeatureFlags flags) { flags.setSupportsBrokerEntryMetadata(true); flags.setSupportsPartialProducer(true); flags.setSupportsGetPartitionedMetadataWithoutAutoCreation(true); + flags.setSupportsReplDedupByLidAndEid(true); } public static ByteBuf newConnect(String authMethodName, String authData, int protocolVersion, String libVersion, @@ -245,6 +246,15 @@ public static ByteBuf newConnect(String authMethodName, AuthData authData, int p public static ByteBuf newConnect(String authMethodName, AuthData authData, int protocolVersion, String libVersion, String targetBroker, String originalPrincipal, AuthData originalAuthData, String originalAuthMethod, String proxyVersion) { + BaseCommand cmd = newConnectWithoutSerialize(authMethodName, authData, protocolVersion, libVersion, + targetBroker, originalPrincipal, originalAuthData, originalAuthMethod, proxyVersion); + return serializeWithSize(cmd); + } + + public static BaseCommand newConnectWithoutSerialize(String authMethodName, AuthData authData, + int protocolVersion, String libVersion, + String targetBroker, String originalPrincipal, AuthData originalAuthData, + String originalAuthMethod, String proxyVersion) { BaseCommand cmd = localCmd(Type.CONNECT); CommandConnect connect = cmd.setConnect() .setClientVersion(libVersion != null ? libVersion : "Pulsar Client") @@ -277,7 +287,7 @@ public static ByteBuf newConnect(String authMethodName, AuthData authData, int p connect.setProtocolVersion(protocolVersion); setFeatureFlags(connect.setFeatureFlags()); - return serializeWithSize(cmd); + return cmd; } public static ByteBuf newConnected(int clientProtocoVersion, boolean supportsTopicWatchers) { @@ -303,6 +313,7 @@ public static BaseCommand newConnectedCommand(int clientProtocolVersion, int max connected.setFeatureFlags().setSupportsTopicWatchers(supportsTopicWatchers); connected.setFeatureFlags().setSupportsGetPartitionedMetadataWithoutAutoCreation(true); + connected.setFeatureFlags().setSupportsReplDedupByLidAndEid(true); return cmd; } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Markers.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Markers.java index 2291aee781f60..de19b777ff015 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Markers.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Markers.java @@ -18,6 +18,10 @@ */ package org.apache.pulsar.common.protocol; +import static org.apache.pulsar.common.api.proto.MarkerType.REPLICATED_SUBSCRIPTION_SNAPSHOT; +import static org.apache.pulsar.common.api.proto.MarkerType.REPLICATED_SUBSCRIPTION_SNAPSHOT_REQUEST; +import static org.apache.pulsar.common.api.proto.MarkerType.REPLICATED_SUBSCRIPTION_SNAPSHOT_RESPONSE; +import static org.apache.pulsar.common.api.proto.MarkerType.REPLICATED_SUBSCRIPTION_UPDATE; import io.netty.buffer.ByteBuf; import io.netty.util.concurrent.FastThreadLocal; import java.io.IOException; @@ -98,6 +102,13 @@ public static boolean isServerOnlyMarker(MessageMetadata msgMetadata) { return msgMetadata.hasMarkerType(); } + public static boolean isReplicationMarker(int markerType) { + return markerType == REPLICATED_SUBSCRIPTION_SNAPSHOT_REQUEST.getValue() + || markerType == REPLICATED_SUBSCRIPTION_SNAPSHOT_RESPONSE.getValue() + || markerType == REPLICATED_SUBSCRIPTION_SNAPSHOT.getValue() + || markerType == REPLICATED_SUBSCRIPTION_UPDATE.getValue(); + } + public static boolean isReplicatedSubscriptionSnapshotMarker(MessageMetadata msgMetadata) { return msgMetadata != null && msgMetadata.hasMarkerType() diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index 19658c5e57ff9..eacec33169e34 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -302,6 +302,7 @@ message FeatureFlags { optional bool supports_partial_producer = 3 [default = false]; optional bool supports_topic_watchers = 4 [default = false]; optional bool supports_get_partitioned_metadata_without_auto_creation = 5 [default = false]; + optional bool supports_repl_dedup_by_lid_and_eid = 6 [default = false]; } message CommandConnected {