Skip to content

Commit

Permalink
Fixed possible deadlock in the initialization of MLTransactionLog (ap…
Browse files Browse the repository at this point in the history
…ache#11194)

* Fixed possible deadlock in the initialization of MLTransactionLog

* Fixed tests
  • Loading branch information
merlimat authored Jul 2, 2021
1 parent c716495 commit 555042a
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ public void testTransactionTopic() throws Exception {
ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
managedLedgerConfig.setMaxEntriesPerLedger(2);
new MLTransactionLogImpl(TransactionCoordinatorID.get(0),
pulsar.getManagedLedgerFactory(), managedLedgerConfig);
pulsar.getManagedLedgerFactory(), managedLedgerConfig)
.initialize().join();
ManagedLedgerMetrics metrics = new ManagedLedgerMetrics(pulsar);
metrics.generate();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@
*/
public interface TransactionLog {

/**
* Initialize the TransactionLog implementation
*/
CompletableFuture<Void> initialize();


/**
* Replay transaction log to load the transaction map.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,13 @@ public class MLTransactionLogImpl implements TransactionLog {

private static final Logger log = LoggerFactory.getLogger(MLTransactionLogImpl.class);

private final ManagedLedger managedLedger;
private final ManagedLedgerFactory managedLedgerFactory;
private final ManagedLedgerConfig managedLedgerConfig;
private ManagedLedger managedLedger;

public static final String TRANSACTION_LOG_PREFIX = "__transaction_log_";

private final ManagedCursor cursor;
private ManagedCursor cursor;

public static final String TRANSACTION_SUBSCRIPTION_NAME = "transaction.subscription";

Expand All @@ -70,18 +72,50 @@ public class MLTransactionLogImpl implements TransactionLog {

public MLTransactionLogImpl(TransactionCoordinatorID tcID,
ManagedLedgerFactory managedLedgerFactory,
ManagedLedgerConfig managedLedgerConfig) throws Exception {
ManagedLedgerConfig managedLedgerConfig) {
this.topicName = TopicName.get(TopicDomain.persistent.value(),
NamespaceName.SYSTEM_NAMESPACE, TRANSACTION_LOG_PREFIX + tcID.getId());
this.tcId = tcID.getId();
this.mlTransactionLogInterceptor = new MLTransactionLogInterceptor();
managedLedgerConfig.setManagedLedgerInterceptor(this.mlTransactionLogInterceptor);
this.managedLedger = managedLedgerFactory.open(topicName.getPersistenceNamingEncoding(), managedLedgerConfig);
this.cursor = managedLedger.openCursor(TRANSACTION_SUBSCRIPTION_NAME,
CommandSubscribe.InitialPosition.Earliest);
this.managedLedgerFactory = managedLedgerFactory;
this.managedLedgerConfig = managedLedgerConfig;
this.entryQueue = new SpscArrayQueue<>(2000);
}

@Override
public CompletableFuture<Void> initialize() {
CompletableFuture<Void> future = new CompletableFuture<>();
managedLedgerFactory.asyncOpen(topicName.getPersistenceNamingEncoding(),
managedLedgerConfig,
new AsyncCallbacks.OpenLedgerCallback() {
@Override
public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
MLTransactionLogImpl.this.managedLedger = ledger;

managedLedger.asyncOpenCursor(TRANSACTION_SUBSCRIPTION_NAME,
CommandSubscribe.InitialPosition.Earliest, new AsyncCallbacks.OpenCursorCallback() {
@Override
public void openCursorComplete(ManagedCursor cursor, Object ctx) {
MLTransactionLogImpl.this.cursor = cursor;
future.complete(null);
}

@Override
public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
future.completeExceptionally(exception);
}
}, null);
}

@Override
public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {
future.completeExceptionally(exception);
}
}, null, null);
return future;
}

@Override
public void replayAsync(TransactionLogReplayCallback transactionLogReplayCallback) {
new TransactionLogReplayer(transactionLogReplayCallback).start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionLog;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreProvider;
import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker;
Expand All @@ -44,16 +45,10 @@ public CompletableFuture<TransactionMetadataStore> openStore(TransactionCoordina
ManagedLedgerConfig managedLedgerConfig,
TransactionTimeoutTracker timeoutTracker,
TransactionRecoverTracker recoverTracker) {
TransactionMetadataStore transactionMetadataStore;
try {
transactionMetadataStore =
new MLTransactionMetadataStore(transactionCoordinatorId,
new MLTransactionLogImpl(transactionCoordinatorId,
managedLedgerFactory, managedLedgerConfig), timeoutTracker, recoverTracker);
} catch (Exception e) {
log.error("MLTransactionMetadataStore init fail", e);
return FutureUtil.failedFuture(e);
}
return CompletableFuture.completedFuture(transactionMetadataStore);
MLTransactionLogImpl txnLog = new MLTransactionLogImpl(transactionCoordinatorId,
managedLedgerFactory, managedLedgerConfig);

return txnLog.initialize().thenApply(__ ->
new MLTransactionMetadataStore(transactionCoordinatorId, txnLog, timeoutTracker, recoverTracker));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public void testTransactionOperation() throws Exception {
TransactionCoordinatorID transactionCoordinatorID = new TransactionCoordinatorID(1);
MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory,
new ManagedLedgerConfig());
mlTransactionLog.initialize().join();
MLTransactionMetadataStore transactionMetadataStore =
new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl());
Expand Down Expand Up @@ -138,6 +139,7 @@ public void testRecoverSequenceId(boolean isUseManagedLedger) throws Exception {
managedLedgerConfig.setMaxEntriesPerLedger(3);
MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory,
managedLedgerConfig);
mlTransactionLog.initialize().join();
MLTransactionMetadataStore transactionMetadataStore =
new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl());
Expand Down Expand Up @@ -181,6 +183,7 @@ public void testInitTransactionReader() throws Exception {
managedLedgerConfig.setMaxEntriesPerLedger(2);
MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory,
managedLedgerConfig);
mlTransactionLog.initialize().join();
MLTransactionMetadataStore transactionMetadataStore =
new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl());
Expand Down Expand Up @@ -220,10 +223,12 @@ public void testInitTransactionReader() throws Exception {

transactionMetadataStore.closeAsync();

MLTransactionLogImpl txnLog2 = new MLTransactionLogImpl(transactionCoordinatorID, factory,
new ManagedLedgerConfig());
txnLog2.initialize().join();
MLTransactionMetadataStore transactionMetadataStoreTest =
new MLTransactionMetadataStore(transactionCoordinatorID,
new MLTransactionLogImpl(transactionCoordinatorID, factory,
new ManagedLedgerConfig()), new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl());
txnLog2, new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl());

while (true) {
if (checkReplayRetryCount > 6) {
Expand Down Expand Up @@ -285,6 +290,7 @@ public void testDeleteLog() throws Exception {
TransactionCoordinatorID transactionCoordinatorID = new TransactionCoordinatorID(1);
MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory,
new ManagedLedgerConfig());
mlTransactionLog.initialize().join();
MLTransactionMetadataStore transactionMetadataStore =
new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl());
Expand Down Expand Up @@ -347,6 +353,7 @@ public void testRecoverWhenDeleteFromCursor() throws Exception {
TransactionCoordinatorID transactionCoordinatorID = new TransactionCoordinatorID(1);
MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory,
new ManagedLedgerConfig());
mlTransactionLog.initialize().join();
MLTransactionMetadataStore transactionMetadataStore =
new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl());
Expand All @@ -364,6 +371,7 @@ public void testRecoverWhenDeleteFromCursor() throws Exception {

mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory,
new ManagedLedgerConfig());
mlTransactionLog.initialize().join();
transactionMetadataStore =
new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl());
Expand All @@ -381,6 +389,7 @@ public void testManageLedgerWriteFailState() throws Exception {
TransactionCoordinatorID transactionCoordinatorID = new TransactionCoordinatorID(1);
MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory,
new ManagedLedgerConfig());
mlTransactionLog.initialize().join();
MLTransactionMetadataStore transactionMetadataStore =
new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl());
Expand Down

0 comments on commit 555042a

Please sign in to comment.