Skip to content

Commit

Permalink
[fix][broker] Geo Replication lost messages or frequently fails due t…
Browse files Browse the repository at this point in the history
…o Deduplication is not appropriate for Geo-Replication (#23697)
  • Loading branch information
poorbarcode authored Feb 21, 2025
1 parent dbc09e1 commit 4ac4f3c
Show file tree
Hide file tree
Showing 19 changed files with 1,553 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ public void startProducer() {
prepareCreateProducer().thenCompose(ignore -> {
ProducerBuilderImpl builderImpl = (ProducerBuilderImpl) producerBuilder;
builderImpl.getConf().setNonPartitionedTopicExpected(true);
builderImpl.getConf().setReplProducer(true);
return producerBuilder.createAsync().thenAccept(producer -> {
setProducerAndTriggerReadEntries(producer);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import static com.scurrilous.circe.checksum.Crc32cIntChecksum.computeChecksum;
import static org.apache.pulsar.broker.service.AbstractReplicator.REPL_PRODUCER_NAME_DELIMITER;
import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_IS_REPL_MARKER;
import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_REPL_SOURCE_POSITION;
import static org.apache.pulsar.common.protocol.Commands.hasChecksum;
import static org.apache.pulsar.common.protocol.Commands.readChecksum;
import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -91,6 +93,7 @@ public class Producer {
AtomicReferenceFieldUpdater.newUpdater(Producer.class, Attributes.class, "attributes");

private final boolean isRemote;
private final boolean isRemoteOrShadow;
private final String remoteCluster;
private final boolean isNonPersistentTopic;
private final boolean isShadowTopic;
Expand Down Expand Up @@ -148,6 +151,7 @@ public Producer(Topic topic, TransportCnx cnx, long producerId, String producerN

String replicatorPrefix = serviceConf.getReplicatorPrefix() + ".";
this.isRemote = producerName.startsWith(replicatorPrefix);
this.isRemoteOrShadow = isRemoteOrShadow(producerName, serviceConf.getReplicatorPrefix());
this.remoteCluster = parseRemoteClusterName(producerName, isRemote, replicatorPrefix);

this.isEncrypted = isEncrypted;
Expand All @@ -159,6 +163,13 @@ public Producer(Topic topic, TransportCnx cnx, long producerId, String producerN
this.brokerInterceptor = cnx.getBrokerService().getInterceptor();
}

/**
* Difference with "isRemote" is whether the prefix string is end with a dot.
*/
public static boolean isRemoteOrShadow(String producerName, String replicatorPrefix) {
return producerName != null && producerName.startsWith(replicatorPrefix);
}

/**
* Producer name for replicator is in format.
* "replicatorPrefix.localCluster" (old)
Expand Down Expand Up @@ -267,11 +278,16 @@ public boolean checkAndStartPublish(long producerId, long sequenceId, ByteBuf he
return true;
}

private boolean isSupportsReplDedupByLidAndEid() {
// Non-Persistent topic does not have ledger id or entry id, so it does not support.
return cnx.isClientSupportsReplDedupByLidAndEid() && topic.isPersistent();
}

private void publishMessageToTopic(ByteBuf headersAndPayload, long sequenceId, int batchSize, boolean isChunked,
boolean isMarker, Position position) {
MessagePublishContext messagePublishContext =
MessagePublishContext.get(this, sequenceId, headersAndPayload.readableBytes(),
batchSize, isChunked, System.nanoTime(), isMarker, position);
batchSize, isChunked, System.nanoTime(), isMarker, position, isSupportsReplDedupByLidAndEid());
if (brokerInterceptor != null) {
brokerInterceptor
.onMessagePublish(this, headersAndPayload, messagePublishContext);
Expand All @@ -283,7 +299,7 @@ private void publishMessageToTopic(ByteBuf headersAndPayload, long lowestSequenc
int batchSize, boolean isChunked, boolean isMarker, Position position) {
MessagePublishContext messagePublishContext = MessagePublishContext.get(this, lowestSequenceId,
highestSequenceId, headersAndPayload.readableBytes(), batchSize,
isChunked, System.nanoTime(), isMarker, position);
isChunked, System.nanoTime(), isMarker, position, isSupportsReplDedupByLidAndEid());
if (brokerInterceptor != null) {
brokerInterceptor
.onMessagePublish(this, headersAndPayload, messagePublishContext);
Expand Down Expand Up @@ -378,6 +394,7 @@ private static final class MessagePublishContext implements PublishContext, Runn
private int batchSize;
private boolean chunked;
private boolean isMarker;
private boolean supportsReplDedupByLidAndEid;

private long startTimeNs;

Expand Down Expand Up @@ -463,6 +480,11 @@ public long getOriginalSequenceId() {
return originalSequenceId;
}

@Override
public boolean supportsReplDedupByLidAndEid() {
return supportsReplDedupByLidAndEid;
}

@Override
public void setOriginalHighestSequenceId(long originalHighestSequenceId) {
this.originalHighestSequenceId = originalHighestSequenceId;
Expand Down Expand Up @@ -537,8 +559,12 @@ public void run() {
// stats
producer.stats.recordMsgIn(batchSize, msgSize);
producer.topic.recordAddLatency(System.nanoTime() - startTimeNs, TimeUnit.NANOSECONDS);
producer.cnx.getCommandSender().sendSendReceiptResponse(producer.producerId, sequenceId, highestSequenceId,
ledgerId, entryId);
if (producer.isRemoteOrShadow && producer.isSupportsReplDedupByLidAndEid()) {
sendSendReceiptResponseRepl();
} else {
// Repl V1 is the same as normal for this handling.
sendSendReceiptResponseNormal();
}
producer.cnx.completedSendOperation(producer.isNonPersistentTopic, msgSize);
if (this.chunked) {
producer.stats.recordChunkedMsgIn();
Expand All @@ -551,8 +577,46 @@ public void run() {
recycle();
}

private void sendSendReceiptResponseRepl() {
// Case-1: is a repl marker.
boolean isReplMarker = getProperty(MSG_PROP_IS_REPL_MARKER) != null;
if (isReplMarker) {
producer.cnx.getCommandSender().sendSendReceiptResponse(producer.producerId, sequenceId, Long.MIN_VALUE,
ledgerId, entryId);

return;
}
// Case-2: is a repl message.
Object positionPairObj = getProperty(MSG_PROP_REPL_SOURCE_POSITION);
if (positionPairObj == null || !(positionPairObj instanceof long[])
|| ((long[]) positionPairObj).length < 2) {
log.error("[{}] Message can not determine whether the message is duplicated due to the acquired"
+ " messages props were are invalid. producer={}. supportsReplDedupByLidAndEid: {},"
+ " sequence-id {}, prop-{}: not in expected format",
producer.topic.getName(), producer.producerName,
supportsReplDedupByLidAndEid(), getSequenceId(),
MSG_PROP_REPL_SOURCE_POSITION);
producer.cnx.getCommandSender().sendSendError(producer.producerId,
Math.max(highestSequenceId, sequenceId),
ServerError.PersistenceError, "Message can not determine whether the message is"
+ " duplicated due to the acquired messages props were are invalid");
return;
}
long[] positionPair = (long[]) positionPairObj;
long replSequenceLId = positionPair[0];
long replSequenceEId = positionPair[1];
producer.cnx.getCommandSender().sendSendReceiptResponse(producer.producerId, replSequenceLId,
replSequenceEId, ledgerId, entryId);
}

private void sendSendReceiptResponseNormal() {
producer.cnx.getCommandSender().sendSendReceiptResponse(producer.producerId, sequenceId, highestSequenceId,
ledgerId, entryId);
}

static MessagePublishContext get(Producer producer, long sequenceId, int msgSize, int batchSize,
boolean chunked, long startTimeNs, boolean isMarker, Position position) {
boolean chunked, long startTimeNs, boolean isMarker, Position position,
boolean supportsReplDedupByLidAndEid) {
MessagePublishContext callback = RECYCLER.get();
callback.producer = producer;
callback.sequenceId = sequenceId;
Expand All @@ -563,6 +627,7 @@ static MessagePublishContext get(Producer producer, long sequenceId, int msgSize
callback.originalSequenceId = -1L;
callback.startTimeNs = startTimeNs;
callback.isMarker = isMarker;
callback.supportsReplDedupByLidAndEid = supportsReplDedupByLidAndEid;
callback.ledgerId = position == null ? -1 : position.getLedgerId();
callback.entryId = position == null ? -1 : position.getEntryId();
if (callback.propertyMap != null) {
Expand All @@ -572,7 +637,8 @@ static MessagePublishContext get(Producer producer, long sequenceId, int msgSize
}

static MessagePublishContext get(Producer producer, long lowestSequenceId, long highestSequenceId, int msgSize,
int batchSize, boolean chunked, long startTimeNs, boolean isMarker, Position position) {
int batchSize, boolean chunked, long startTimeNs, boolean isMarker, Position position,
boolean supportsReplDedupByLidAndEid) {
MessagePublishContext callback = RECYCLER.get();
callback.producer = producer;
callback.sequenceId = lowestSequenceId;
Expand All @@ -584,6 +650,7 @@ static MessagePublishContext get(Producer producer, long lowestSequenceId, long
callback.startTimeNs = startTimeNs;
callback.chunked = chunked;
callback.isMarker = isMarker;
callback.supportsReplDedupByLidAndEid = supportsReplDedupByLidAndEid;
callback.ledgerId = position == null ? -1 : position.getLedgerId();
callback.entryId = position == null ? -1 : position.getEntryId();
if (callback.propertyMap != null) {
Expand Down Expand Up @@ -801,7 +868,8 @@ public void publishTxnMessage(TxnID txnID, long producerId, long sequenceId, lon
}
MessagePublishContext messagePublishContext =
MessagePublishContext.get(this, sequenceId, highSequenceId,
headersAndPayload.readableBytes(), batchSize, isChunked, System.nanoTime(), isMarker, null);
headersAndPayload.readableBytes(), batchSize, isChunked, System.nanoTime(), isMarker, null,
cnx.isClientSupportsReplDedupByLidAndEid());
if (brokerInterceptor != null) {
brokerInterceptor
.onMessagePublish(this, headersAndPayload, messagePublishContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import javax.net.ssl.SSLSession;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Response;
import lombok.Getter;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedger;
Expand Down Expand Up @@ -237,6 +238,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {

private boolean encryptionRequireOnProducer;

@Getter
private FeatureFlags features;

private PulsarCommandSender commandSender;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ default long getEntryTimestamp() {
default void setEntryTimestamp(long entryTimestamp) {

}

default boolean supportsReplDedupByLidAndEid() {
return false;
}
}

CompletableFuture<Void> initialize();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.common.api.proto.FeatureFlags;

public interface TransportCnx {

Expand Down Expand Up @@ -103,4 +104,11 @@ public interface TransportCnx {
* previously called {@link #incrementThrottleCount()}.
*/
void decrementThrottleCount();

FeatureFlags getFeatures();

default boolean isClientSupportsReplDedupByLidAndEid() {
return getFeatures() != null && getFeatures().hasSupportsReplDedupByLidAndEid()
&& getFeatures().isSupportsReplDedupByLidAndEid();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.service.persistent;

import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_REPL_SOURCE_POSITION;
import io.netty.buffer.ByteBuf;
import java.util.List;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -194,11 +195,17 @@ protected boolean replicateEntries(List<Entry> entries) {
msg.setSchemaInfoForReplicator(schemaFuture.get());
msg.getMessageBuilder().clearTxnidMostBits();
msg.getMessageBuilder().clearTxnidLeastBits();
// Add props for sequence checking.
msg.getMessageBuilder().addProperty().setKey(MSG_PROP_REPL_SOURCE_POSITION)
.setValue(String.format("%s:%s", entry.getLedgerId(), entry.getEntryId()));
msgOut.recordEvent(headersAndPayload.readableBytes());
stats.incrementMsgOutCounter();
stats.incrementBytesOutCounter(headersAndPayload.readableBytes());
// Increment pending messages for messages produced locally
PENDING_MESSAGES_UPDATER.incrementAndGet(this);
if (log.isDebugEnabled()) {
log.debug("[{}] Publishing {}:{}", replicatorId, entry.getLedgerId(), entry.getEntryId());
}
producer.sendAsync(msg, ProducerSendCallback.create(this, entry, msg));
atLeastOneMessageSentForReplication = true;
}
Expand Down
Loading

0 comments on commit 4ac4f3c

Please sign in to comment.