From f3703940e65b1b56f330a97c0008eaf9cfe25195 Mon Sep 17 00:00:00 2001 From: Sri Harsha CH <57220027+harshachinta@users.noreply.github.com> Date: Wed, 16 Oct 2024 11:00:26 +0530 Subject: [PATCH 1/4] chore(spanner): preserving lock order - R/W mux session (#3348) This PR introduces changes to support the lock order preservation protocol for multiplexed sessions in read/write transactions. According to this protocol, when a read/write transaction on a multiplexed session is retried, the transaction ID from the previous abort must be passed when creating a new transaction. This ensures that the retried transaction is recognized as older, rather than being treated as a new one. The transaction context object is structured as follows, ``` txn { transactionId: The transaction ID. previousTransactionId: The transaction ID of the most recently failed transaction. } ``` --- .../spanner/AsyncTransactionManagerImpl.java | 15 +- .../cloud/spanner/DatabaseClientImpl.java | 2 +- .../com/google/cloud/spanner/SessionImpl.java | 18 +- .../cloud/spanner/TransactionManagerImpl.java | 16 +- .../cloud/spanner/TransactionRunnerImpl.java | 38 +++- .../AsyncTransactionManagerImplTest.java | 71 ++++++- ...edSessionDatabaseClientMockServerTest.java | 181 ++++++++++++++++++ .../google/cloud/spanner/SessionPoolTest.java | 9 +- .../spanner/TransactionManagerImplTest.java | 77 +++++++- .../spanner/TransactionRunnerImplTest.java | 6 +- 10 files changed, 405 insertions(+), 28 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java index 882f709966b..0057bb15bea 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java @@ -28,6 +28,7 @@ import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.MoreExecutors; +import com.google.protobuf.ByteString; /** Implementation of {@link AsyncTransactionManager}. */ final class AsyncTransactionManagerImpl @@ -80,7 +81,19 @@ public TransactionContextFutureImpl beginAsync() { private ApiFuture internalBeginAsync(boolean firstAttempt) { txnState = TransactionState.STARTED; - txn = session.newTransaction(options); + + // Determine the latest transactionId when using a multiplexed session. + ByteString multiplexedSessionPreviousTransactionId = ByteString.EMPTY; + if (txn != null && session.getIsMultiplexed() && !firstAttempt) { + // Use the current transactionId if available, otherwise fallback to the previous aborted + // transactionId. + multiplexedSessionPreviousTransactionId = + txn.transactionId != null ? txn.transactionId : txn.getPreviousTransactionId(); + } + + txn = + session.newTransaction( + options, /* previousTransactionId = */ multiplexedSessionPreviousTransactionId); if (firstAttempt) { session.setActive(this); } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java index b9d1ce054d7..91edce79325 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java @@ -59,7 +59,7 @@ class DatabaseClientImpl implements DatabaseClient { /* useMultiplexedSessionBlindWrite = */ false, /* multiplexedSessionDatabaseClient = */ null, tracer, - false); + /* useMultiplexedSessionForRW = */ false); } DatabaseClientImpl( diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java index 7b9abc71a85..60c9d45d186 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java @@ -69,7 +69,8 @@ static void throwIfTransactionsPending() { } } - static TransactionOptions createReadWriteTransactionOptions(Options options) { + static TransactionOptions createReadWriteTransactionOptions( + Options options, ByteString previousTransactionId) { TransactionOptions.Builder transactionOptions = TransactionOptions.newBuilder(); if (options.withExcludeTxnFromChangeStreams() == Boolean.TRUE) { transactionOptions.setExcludeTxnFromChangeStreams(true); @@ -78,6 +79,10 @@ static TransactionOptions createReadWriteTransactionOptions(Options options) { if (options.withOptimisticLock() == Boolean.TRUE) { readWrite.setReadLockMode(TransactionOptions.ReadWrite.ReadLockMode.OPTIMISTIC); } + if (previousTransactionId != null + && previousTransactionId != com.google.protobuf.ByteString.EMPTY) { + readWrite.setMultiplexedSessionPreviousTransactionId(previousTransactionId); + } transactionOptions.setReadWrite(readWrite); return transactionOptions.build(); } @@ -427,13 +432,17 @@ public void close() { } ApiFuture beginTransactionAsync( - Options transactionOptions, boolean routeToLeader, Map channelHint) { + Options transactionOptions, + boolean routeToLeader, + Map channelHint, + ByteString previousTransactionId) { final SettableApiFuture res = SettableApiFuture.create(); final ISpan span = tracer.spanBuilder(SpannerImpl.BEGIN_TRANSACTION); final BeginTransactionRequest request = BeginTransactionRequest.newBuilder() .setSession(getName()) - .setOptions(createReadWriteTransactionOptions(transactionOptions)) + .setOptions( + createReadWriteTransactionOptions(transactionOptions, previousTransactionId)) .build(); final ApiFuture requestFuture; try (IScope ignore = tracer.withSpan(span)) { @@ -469,11 +478,12 @@ ApiFuture beginTransactionAsync( return res; } - TransactionContextImpl newTransaction(Options options) { + TransactionContextImpl newTransaction(Options options, ByteString previousTransactionId) { return TransactionContextImpl.newBuilder() .setSession(this) .setOptions(options) .setTransactionId(null) + .setPreviousTransactionId(previousTransactionId) .setOptions(options) .setTrackTransactionStarter(spanner.getOptions().isTrackTransactionStarter()) .setRpc(spanner.getRpc()) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java index bbbc9aeb447..cafb27ba6b7 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java @@ -20,6 +20,7 @@ import com.google.cloud.spanner.Options.TransactionOption; import com.google.cloud.spanner.SessionImpl.SessionTransaction; import com.google.common.base.Preconditions; +import com.google.protobuf.ByteString; /** Implementation of {@link TransactionManager}. */ final class TransactionManagerImpl implements TransactionManager, SessionTransaction { @@ -53,7 +54,7 @@ public void setSpan(ISpan span) { public TransactionContext begin() { Preconditions.checkState(txn == null, "begin can only be called once"); try (IScope s = tracer.withSpan(span)) { - txn = session.newTransaction(options); + txn = session.newTransaction(options, /* previousTransactionId = */ ByteString.EMPTY); session.setActive(this); txnState = TransactionState.STARTED; return txn; @@ -102,7 +103,18 @@ public TransactionContext resetForRetry() { } try (IScope s = tracer.withSpan(span)) { boolean useInlinedBegin = txn.transactionId != null; - txn = session.newTransaction(options); + + // Determine the latest transactionId when using a multiplexed session. + ByteString multiplexedSessionPreviousTransactionId = ByteString.EMPTY; + if (session.getIsMultiplexed()) { + // Use the current transactionId if available, otherwise fallback to the previous aborted + // transactionId. + multiplexedSessionPreviousTransactionId = + txn.transactionId != null ? txn.transactionId : txn.getPreviousTransactionId(); + } + txn = + session.newTransaction( + options, /* previousTransactionId = */ multiplexedSessionPreviousTransactionId); if (!useInlinedBegin) { txn.ensureTxn(); } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java index a7250e1ef79..48affde3558 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java @@ -93,6 +93,9 @@ static class Builder extends AbstractReadContext.Builder ensureTxnAsync() { private void createTxnAsync(final SettableApiFuture res) { span.addAnnotation("Creating Transaction"); final ApiFuture fut = - session.beginTransactionAsync(options, isRouteToLeader(), getTransactionChannelHint()); + session.beginTransactionAsync( + options, isRouteToLeader(), getTransactionChannelHint(), getPreviousTransactionId()); fut.addListener( () -> { try { @@ -558,7 +574,9 @@ TransactionSelector getTransactionSelector() { } if (tx == null) { return TransactionSelector.newBuilder() - .setBegin(SessionImpl.createReadWriteTransactionOptions(options)) + .setBegin( + SessionImpl.createReadWriteTransactionOptions( + options, getPreviousTransactionId())) .build(); } else { // Wait for the transaction to come available. The tx.get() call will fail with an @@ -1079,7 +1097,7 @@ public TransactionRunner allowNestedTransaction() { TransactionRunnerImpl(SessionImpl session, TransactionOption... options) { this.session = session; this.options = Options.fromTransactionOptions(options); - this.txn = session.newTransaction(this.options); + this.txn = session.newTransaction(this.options, /* previousTransactionId = */ ByteString.EMPTY); this.tracer = session.getTracer(); } @@ -1118,7 +1136,19 @@ private T runInternal(final TransactionCallable txCallable) { // Do not inline the BeginTransaction during a retry if the initial attempt did not // actually start a transaction. useInlinedBegin = txn.transactionId != null; - txn = session.newTransaction(options); + + // Determine the latest transactionId when using a multiplexed session. + ByteString multiplexedSessionPreviousTransactionId = ByteString.EMPTY; + if (session.getIsMultiplexed()) { + // Use the current transactionId if available, otherwise fallback to the previous + // transactionId. + multiplexedSessionPreviousTransactionId = + txn.transactionId != null ? txn.transactionId : txn.getPreviousTransactionId(); + } + + txn = + session.newTransaction( + options, /* previousTransactionId = */ multiplexedSessionPreviousTransactionId); } checkState( isValid, "TransactionRunner has been invalidated by a new operation on the session"); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncTransactionManagerImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncTransactionManagerImplTest.java index 08d22dd2d67..006a926e907 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncTransactionManagerImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncTransactionManagerImplTest.java @@ -16,12 +16,18 @@ package com.google.cloud.spanner; +import static org.junit.Assert.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.clearInvocations; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import com.google.api.core.ApiFutures; import com.google.cloud.Timestamp; +import com.google.protobuf.ByteString; import io.opentelemetry.api.trace.Span; import io.opentelemetry.context.Scope; import org.junit.Test; @@ -42,7 +48,7 @@ public void testCommitReturnsCommitStats() { when(oTspan.makeCurrent()).thenReturn(mock(Scope.class)); try (AsyncTransactionManagerImpl manager = new AsyncTransactionManagerImpl(session, span, Options.commitStats())) { - when(session.newTransaction(Options.fromTransactionOptions(Options.commitStats()))) + when(session.newTransaction(eq(Options.fromTransactionOptions(Options.commitStats())), any())) .thenReturn(transaction); when(transaction.ensureTxnAsync()).thenReturn(ApiFutures.immediateFuture(null)); Timestamp commitTimestamp = Timestamp.ofTimeMicroseconds(1); @@ -54,4 +60,67 @@ public void testCommitReturnsCommitStats() { verify(transaction).commitAsync(); } } + + @Test + public void testRetryUsesPreviousTransactionIdOnMultiplexedSession() { + // Set up mock transaction IDs + final ByteString mockTransactionId = ByteString.copyFromUtf8("mockTransactionId"); + final ByteString mockPreviousTransactionId = + ByteString.copyFromUtf8("mockPreviousTransactionId"); + + Span oTspan = mock(Span.class); + ISpan span = new OpenTelemetrySpan(oTspan); + when(oTspan.makeCurrent()).thenReturn(mock(Scope.class)); + // Mark the session as multiplexed. + when(session.getIsMultiplexed()).thenReturn(true); + + // Initialize a mock transaction with transactionId = null, previousTransactionId = null. + transaction = mock(TransactionRunnerImpl.TransactionContextImpl.class); + when(transaction.ensureTxnAsync()).thenReturn(ApiFutures.immediateFuture(null)); + when(session.newTransaction(eq(Options.fromTransactionOptions(Options.commitStats())), any())) + .thenReturn(transaction); + + // Simulate an ABORTED error being thrown when `commitAsync()` is called. + doThrow(SpannerExceptionFactory.newSpannerException(ErrorCode.ABORTED, "")) + .when(transaction) + .commitAsync(); + + try (AsyncTransactionManagerImpl manager = + new AsyncTransactionManagerImpl(session, span, Options.commitStats())) { + manager.beginAsync(); + + // Verify that for the first transaction attempt, the `previousTransactionId` is + // ByteString.EMPTY. + // This is because no transaction has been previously aborted at this point. + verify(session) + .newTransaction(Options.fromTransactionOptions(Options.commitStats()), ByteString.EMPTY); + assertThrows(AbortedException.class, manager::commitAsync); + clearInvocations(session); + + // Mock the transaction object to contain transactionID=null and + // previousTransactionId=mockPreviousTransactionId + when(transaction.getPreviousTransactionId()).thenReturn(mockPreviousTransactionId); + manager.resetForRetryAsync(); + // Verify that in the first retry attempt, the `previousTransactionId` + // (mockPreviousTransactionId) is passed to the new transaction. + // This allows Spanner to retry the transaction using the ID of the aborted transaction. + verify(session) + .newTransaction( + Options.fromTransactionOptions(Options.commitStats()), mockPreviousTransactionId); + assertThrows(AbortedException.class, manager::commitAsync); + clearInvocations(session); + + // Mock the transaction object to contain transactionID=mockTransactionId and + // previousTransactionId=mockPreviousTransactionId and transactionID = null + transaction.transactionId = mockTransactionId; + manager.resetForRetryAsync(); + // Verify that the latest `transactionId` (mockTransactionId) is used in the retry. + // This ensures the retry logic is working as expected with the latest transaction ID. + verify(session) + .newTransaction(Options.fromTransactionOptions(Options.commitStats()), mockTransactionId); + + when(transaction.rollbackAsync()).thenReturn(ApiFutures.immediateFuture(null)); + manager.closeAsync(); + } + } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java index 2e412537882..adf7ed2a403 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java @@ -16,6 +16,7 @@ package com.google.cloud.spanner; +import static com.google.cloud.spanner.MockSpannerTestUtil.INVALID_UPDATE_STATEMENT; import static com.google.cloud.spanner.MockSpannerTestUtil.UPDATE_COUNT; import static com.google.cloud.spanner.MockSpannerTestUtil.UPDATE_STATEMENT; import static com.google.common.truth.Truth.assertThat; @@ -36,11 +37,13 @@ import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime; import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult; import com.google.cloud.spanner.Options.RpcPriority; +import com.google.cloud.spanner.TransactionRunnerImpl.TransactionContextImpl; import com.google.cloud.spanner.connection.RandomResultSetGenerator; import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.ByteString; +import com.google.spanner.v1.BeginTransactionRequest; import com.google.spanner.v1.CommitRequest; import com.google.spanner.v1.ExecuteSqlRequest; import com.google.spanner.v1.RequestOptions.Priority; @@ -54,6 +57,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import org.junit.Before; import org.junit.BeforeClass; @@ -70,6 +74,10 @@ public static void setupResults() { mockSpanner.putStatementResults( StatementResult.query(STATEMENT, new RandomResultSetGenerator(1).generate())); mockSpanner.putStatementResult(StatementResult.update(UPDATE_STATEMENT, UPDATE_COUNT)); + mockSpanner.putStatementResult( + StatementResult.exception( + INVALID_UPDATE_STATEMENT, + Status.INVALID_ARGUMENT.withDescription("invalid statement").asRuntimeException())); } @Before @@ -745,6 +753,179 @@ public void testAsyncRunnerIsNonBlockingWithMultiplexedSession() throws Exceptio assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get()); } + @Test + public void testAbortedReadWriteTxnUsesPreviousTxnIdOnRetryWithInlineBegin() { + DatabaseClientImpl client = + (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + // Force the Commit RPC to return Aborted the first time it is called. The exception is cleared + // after the first call, so the retry should succeed. + mockSpanner.setCommitExecutionTime( + SimulatedExecutionTime.ofException( + mockSpanner.createAbortedException(ByteString.copyFromUtf8("test")))); + TransactionRunner runner = client.readWriteTransaction(); + AtomicReference validTransactionId = new AtomicReference<>(); + runner.run( + transaction -> { + try (ResultSet resultSet = transaction.executeQuery(STATEMENT)) { + while (resultSet.next()) {} + } + + TransactionContextImpl impl = (TransactionContextImpl) transaction; + if (validTransactionId.get() == null) { + // Track the first not-null transactionId. This transaction gets ABORTED during commit + // operation and gets retried. + validTransactionId.set(impl.transactionId); + } + return null; + }); + + List executeSqlRequests = + mockSpanner.getRequestsOfType(ExecuteSqlRequest.class); + assertEquals(2, executeSqlRequests.size()); + + // Verify the requests are executed using multiplexed sessions + for (ExecuteSqlRequest request : executeSqlRequests) { + assertTrue(mockSpanner.getSession(request.getSession()).getMultiplexed()); + } + + // Verify that the first request uses inline begin, and the previous transaction ID is set to + // ByteString.EMPTY + assertTrue(executeSqlRequests.get(0).hasTransaction()); + assertTrue(executeSqlRequests.get(0).getTransaction().hasBegin()); + assertTrue(executeSqlRequests.get(0).getTransaction().getBegin().hasReadWrite()); + assertNotNull( + executeSqlRequests + .get(0) + .getTransaction() + .getBegin() + .getReadWrite() + .getMultiplexedSessionPreviousTransactionId()); + assertEquals( + ByteString.EMPTY, + executeSqlRequests + .get(0) + .getTransaction() + .getBegin() + .getReadWrite() + .getMultiplexedSessionPreviousTransactionId()); + + // Verify that the second request uses inline begin, and the previous transaction ID is set + // appropriately + assertTrue(executeSqlRequests.get(1).hasTransaction()); + assertTrue(executeSqlRequests.get(1).getTransaction().hasBegin()); + assertTrue(executeSqlRequests.get(1).getTransaction().getBegin().hasReadWrite()); + assertNotNull( + executeSqlRequests + .get(1) + .getTransaction() + .getBegin() + .getReadWrite() + .getMultiplexedSessionPreviousTransactionId()); + assertNotEquals( + ByteString.EMPTY, + executeSqlRequests + .get(1) + .getTransaction() + .getBegin() + .getReadWrite() + .getMultiplexedSessionPreviousTransactionId()); + assertEquals( + validTransactionId.get(), + executeSqlRequests + .get(1) + .getTransaction() + .getBegin() + .getReadWrite() + .getMultiplexedSessionPreviousTransactionId()); + } + + @Test + public void testAbortedReadWriteTxnUsesPreviousTxnIdOnRetryWithExplicitBegin() { + DatabaseClientImpl client = + (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + // Force the Commit RPC to return Aborted the first time it is called. The exception is cleared + // after the first call, so the retry should succeed. + mockSpanner.setCommitExecutionTime( + SimulatedExecutionTime.ofException( + mockSpanner.createAbortedException(ByteString.copyFromUtf8("test")))); + TransactionRunner runner = client.readWriteTransaction(); + AtomicReference validTransactionId = new AtomicReference<>(); + Long updateCount = + runner.run( + transaction -> { + // This update statement carries the BeginTransaction, but fails. This will + // cause the entire transaction to be retried with an explicit + // BeginTransaction RPC to ensure all statements in the transaction are + // actually executed against the same transaction. + TransactionContextImpl impl = (TransactionContextImpl) transaction; + if (validTransactionId.get() == null) { + // Track the first not-null transactionId. This transaction gets ABORTED during + // commit operation and gets retried. + validTransactionId.set(impl.transactionId); + } + SpannerException e = + assertThrows( + SpannerException.class, + () -> transaction.executeUpdate(INVALID_UPDATE_STATEMENT)); + assertEquals(ErrorCode.INVALID_ARGUMENT, e.getErrorCode()); + return transaction.executeUpdate(UPDATE_STATEMENT); + }); + + assertThat(updateCount).isEqualTo(1L); + List beginTransactionRequests = + mockSpanner.getRequestsOfType(BeginTransactionRequest.class); + assertEquals(2, beginTransactionRequests.size()); + + // Verify the requests are executed using multiplexed sessions + for (BeginTransactionRequest request : beginTransactionRequests) { + assertTrue(mockSpanner.getSession(request.getSession()).getMultiplexed()); + } + + // Verify that explicit begin transaction is called during retry, and the previous transaction + // ID is set to ByteString.EMPTY + assertTrue(beginTransactionRequests.get(0).hasOptions()); + assertTrue(beginTransactionRequests.get(0).getOptions().hasReadWrite()); + assertNotNull( + beginTransactionRequests + .get(0) + .getOptions() + .getReadWrite() + .getMultiplexedSessionPreviousTransactionId()); + assertEquals( + ByteString.EMPTY, + beginTransactionRequests + .get(0) + .getOptions() + .getReadWrite() + .getMultiplexedSessionPreviousTransactionId()); + + // The previous transaction with id (txn1) fails during commit operation with ABORTED error. + // Verify that explicit begin transaction is called during retry, and the previous transaction + // ID is not ByteString.EMPTY (should be set to txn1) + assertTrue(beginTransactionRequests.get(1).hasOptions()); + assertTrue(beginTransactionRequests.get(1).getOptions().hasReadWrite()); + assertNotNull( + beginTransactionRequests + .get(1) + .getOptions() + .getReadWrite() + .getMultiplexedSessionPreviousTransactionId()); + assertNotEquals( + ByteString.EMPTY, + beginTransactionRequests + .get(1) + .getOptions() + .getReadWrite() + .getMultiplexedSessionPreviousTransactionId()); + assertEquals( + validTransactionId.get(), + beginTransactionRequests + .get(1) + .getOptions() + .getReadWrite() + .getMultiplexedSessionPreviousTransactionId()); + } + private void waitForSessionToBeReplaced(DatabaseClientImpl client) { assertNotNull(client.multiplexedSessionDatabaseClient); SessionReference sessionReference = diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java index 998678e4296..c3e8d887ded 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java @@ -1495,9 +1495,10 @@ public void testSessionNotFoundReadWriteTransaction() { .build(); when(closedSession.asyncClose()) .thenReturn(ApiFutures.immediateFuture(Empty.getDefaultInstance())); - when(closedSession.newTransaction(Options.fromTransactionOptions())) + when(closedSession.newTransaction(eq(Options.fromTransactionOptions()), any())) .thenReturn(closedTransactionContext); - when(closedSession.beginTransactionAsync(any(), eq(true), any())).thenThrow(sessionNotFound); + when(closedSession.beginTransactionAsync(any(), eq(true), any(), any())) + .thenThrow(sessionNotFound); when(closedSession.getTracer()).thenReturn(tracer); TransactionRunnerImpl closedTransactionRunner = new TransactionRunnerImpl(closedSession); closedTransactionRunner.setSpan(span); @@ -1510,9 +1511,9 @@ public void testSessionNotFoundReadWriteTransaction() { when(openSession.getName()) .thenReturn("projects/dummy/instances/dummy/database/dummy/sessions/session-open"); final TransactionContextImpl openTransactionContext = mock(TransactionContextImpl.class); - when(openSession.newTransaction(Options.fromTransactionOptions())) + when(openSession.newTransaction(eq(Options.fromTransactionOptions()), any())) .thenReturn(openTransactionContext); - when(openSession.beginTransactionAsync(any(), eq(true), any())) + when(openSession.beginTransactionAsync(any(), eq(true), any(), any())) .thenReturn(ApiFutures.immediateFuture(ByteString.copyFromUtf8("open-txn"))); when(openSession.getTracer()).thenReturn(tracer); TransactionRunnerImpl openTransactionRunner = new TransactionRunnerImpl(openSession); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerImplTest.java index c3fcf1c7480..10b13125152 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerImplTest.java @@ -20,7 +20,9 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThrows; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.clearInvocations; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; @@ -98,7 +100,7 @@ public void setUp() { @Test public void beginCalledTwiceFails() { - when(session.newTransaction(Options.fromTransactionOptions())).thenReturn(txn); + when(session.newTransaction(eq(Options.fromTransactionOptions()), any())).thenReturn(txn); assertThat(manager.begin()).isEqualTo(txn); assertThat(manager.getState()).isEqualTo(TransactionState.STARTED); IllegalStateException e = assertThrows(IllegalStateException.class, () -> manager.begin()); @@ -126,7 +128,7 @@ public void resetBeforeBeginFails() { @Test public void transactionRolledBackOnClose() { - when(session.newTransaction(Options.fromTransactionOptions())).thenReturn(txn); + when(session.newTransaction(eq(Options.fromTransactionOptions()), any())).thenReturn(txn); when(txn.isAborted()).thenReturn(false); manager.begin(); manager.close(); @@ -135,7 +137,7 @@ public void transactionRolledBackOnClose() { @Test public void commitSucceeds() { - when(session.newTransaction(Options.fromTransactionOptions())).thenReturn(txn); + when(session.newTransaction(eq(Options.fromTransactionOptions()), any())).thenReturn(txn); Timestamp commitTimestamp = Timestamp.ofTimeMicroseconds(1); CommitResponse response = new CommitResponse(commitTimestamp); when(txn.getCommitResponse()).thenReturn(response); @@ -147,7 +149,7 @@ public void commitSucceeds() { @Test public void resetAfterSuccessfulCommitFails() { - when(session.newTransaction(Options.fromTransactionOptions())).thenReturn(txn); + when(session.newTransaction(eq(Options.fromTransactionOptions()), any())).thenReturn(txn); manager.begin(); manager.commit(); IllegalStateException e = @@ -157,21 +159,21 @@ public void resetAfterSuccessfulCommitFails() { @Test public void resetAfterAbortSucceeds() { - when(session.newTransaction(Options.fromTransactionOptions())).thenReturn(txn); + when(session.newTransaction(eq(Options.fromTransactionOptions()), any())).thenReturn(txn); manager.begin(); doThrow(SpannerExceptionFactory.newSpannerException(ErrorCode.ABORTED, "")).when(txn).commit(); assertThrows(AbortedException.class, () -> manager.commit()); assertEquals(TransactionState.ABORTED, manager.getState()); txn = Mockito.mock(TransactionRunnerImpl.TransactionContextImpl.class); - when(session.newTransaction(Options.fromTransactionOptions())).thenReturn(txn); + when(session.newTransaction(eq(Options.fromTransactionOptions()), any())).thenReturn(txn); assertThat(manager.resetForRetry()).isEqualTo(txn); assertThat(manager.getState()).isEqualTo(TransactionState.STARTED); } @Test public void resetAfterErrorFails() { - when(session.newTransaction(Options.fromTransactionOptions())).thenReturn(txn); + when(session.newTransaction(eq(Options.fromTransactionOptions()), any())).thenReturn(txn); manager.begin(); doThrow(SpannerExceptionFactory.newSpannerException(ErrorCode.UNKNOWN, "")).when(txn).commit(); SpannerException e = assertThrows(SpannerException.class, () -> manager.commit()); @@ -184,7 +186,7 @@ public void resetAfterErrorFails() { @Test public void rollbackAfterCommitFails() { - when(session.newTransaction(Options.fromTransactionOptions())).thenReturn(txn); + when(session.newTransaction(eq(Options.fromTransactionOptions()), any())).thenReturn(txn); manager.begin(); manager.commit(); IllegalStateException e = assertThrows(IllegalStateException.class, () -> manager.rollback()); @@ -193,7 +195,7 @@ public void rollbackAfterCommitFails() { @Test public void commitAfterRollbackFails() { - when(session.newTransaction(Options.fromTransactionOptions())).thenReturn(txn); + when(session.newTransaction(eq(Options.fromTransactionOptions()), any())).thenReturn(txn); manager.begin(); manager.rollback(); IllegalStateException e = assertThrows(IllegalStateException.class, () -> manager.commit()); @@ -363,4 +365,61 @@ public void inlineBegin() { assertThat(transactionsStarted.get()).isEqualTo(1); } } + + // This test ensures that when a transaction is aborted in a multiplexed session, + // the transaction ID of the aborted transaction is saved during the retry when a new transaction + // is created. + @Test + public void storePreviousTxnIdOnAbortForMultiplexedSession() { + txn = Mockito.mock(TransactionRunnerImpl.TransactionContextImpl.class); + final ByteString mockTransactionId = ByteString.copyFromUtf8("mockTransactionId"); + txn.transactionId = mockTransactionId; + when(session.newTransaction(Options.fromTransactionOptions(), ByteString.EMPTY)) + .thenReturn(txn); + manager.begin(); + // Verify that for the first transaction attempt, the `previousTransactionId` is + // ByteString.EMPTY. + // This is because no transaction has been previously aborted at this point. + verify(session).newTransaction(Options.fromTransactionOptions(), ByteString.EMPTY); + doThrow(SpannerExceptionFactory.newSpannerException(ErrorCode.ABORTED, "")).when(txn).commit(); + assertThrows(AbortedException.class, () -> manager.commit()); + + txn = Mockito.mock(TransactionRunnerImpl.TransactionContextImpl.class); + when(txn.getPreviousTransactionId()).thenReturn(mockTransactionId); + when(session.newTransaction(Options.fromTransactionOptions(), mockTransactionId)) + .thenReturn(txn); + when(session.getIsMultiplexed()).thenReturn(true); + assertThat(manager.resetForRetry()).isEqualTo(txn); + // Verify that in the first retry attempt, the `previousTransactionId` is passed to the new + // transaction. + // This allows Spanner to retry the transaction using the ID of the aborted transaction. + verify(session).newTransaction(Options.fromTransactionOptions(), mockTransactionId); + } + + // This test ensures that when a transaction is aborted in a regular session, + // the transaction ID of the aborted transaction is not saved during the retry when a new + // transaction is created. + @Test + public void skipTxnIdStorageOnAbortForRegularSession() { + txn = Mockito.mock(TransactionRunnerImpl.TransactionContextImpl.class); + final ByteString mockTransactionId = ByteString.copyFromUtf8("mockTransactionId"); + txn.transactionId = mockTransactionId; + when(session.newTransaction(Options.fromTransactionOptions(), ByteString.EMPTY)) + .thenReturn(txn); + manager.begin(); + verify(session).newTransaction(Options.fromTransactionOptions(), ByteString.EMPTY); + doThrow(SpannerExceptionFactory.newSpannerException(ErrorCode.ABORTED, "")).when(txn).commit(); + assertThrows(AbortedException.class, () -> manager.commit()); + clearInvocations(session); + + txn = Mockito.mock(TransactionRunnerImpl.TransactionContextImpl.class); + when(session.newTransaction(Options.fromTransactionOptions(), ByteString.EMPTY)) + .thenReturn(txn); + when(session.getIsMultiplexed()).thenReturn(false); + assertThat(manager.resetForRetry()).isEqualTo(txn); + // Verify that in the first retry attempt, the `previousTransactionId` is not passed to the new + // transaction + // in case of regular sessions. + verify(session).newTransaction(Options.fromTransactionOptions(), ByteString.EMPTY); + } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java index c647bb3642a..1fd6817ea96 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java @@ -20,6 +20,7 @@ import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThrows; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.eq; @@ -117,7 +118,7 @@ public void setUp() { tracer = new TraceWrapper(Tracing.getTracer(), OpenTelemetry.noop().getTracer(""), false); firstRun = true; when(session.getErrorHandler()).thenReturn(DefaultErrorHandler.INSTANCE); - when(session.newTransaction(Options.fromTransactionOptions())).thenReturn(txn); + when(session.newTransaction(eq(Options.fromTransactionOptions()), any())).thenReturn(txn); when(session.getTracer()).thenReturn(tracer); when(rpc.executeQuery(Mockito.any(ExecuteSqlRequest.class), Mockito.anyMap(), eq(true))) .thenAnswer( @@ -343,7 +344,8 @@ private long[] batchDmlException(int status) { .setTracer(session.getTracer()) .setSpan(session.getTracer().getCurrentSpan()) .build(); - when(session.newTransaction(Options.fromTransactionOptions())).thenReturn(transaction); + when(session.newTransaction(eq(Options.fromTransactionOptions()), any())) + .thenReturn(transaction); when(session.getName()).thenReturn(SessionId.of("p", "i", "d", "test").getName()); TransactionRunnerImpl runner = new TransactionRunnerImpl(session); runner.setSpan(span); From 1e8b82c964360a20666e790874cd4d8c974af103 Mon Sep 17 00:00:00 2001 From: surbhigarg92 Date: Wed, 16 Oct 2024 22:46:25 +0530 Subject: [PATCH 2/4] chore: fix built in client metric data (#3415) This PR fixes the below issues for client metrics. - Use correct project id instead of defaultProjectId - Sets a default value for directpath_enabled attribute - Use map overload for addAttributes in compositeTracer --- .../cloud/spanner/BuiltInOpenTelemetryMetricsProvider.java | 2 ++ .../main/java/com/google/cloud/spanner/CompositeTracer.java | 2 +- .../main/java/com/google/cloud/spanner/SpannerOptions.java | 5 ++--- .../cloud/spanner/OpenTelemetryBuiltInMetricsTracerTest.java | 1 + 4 files changed, 6 insertions(+), 4 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BuiltInOpenTelemetryMetricsProvider.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BuiltInOpenTelemetryMetricsProvider.java index a7665f8556a..ef1d70eec25 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BuiltInOpenTelemetryMetricsProvider.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BuiltInOpenTelemetryMetricsProvider.java @@ -20,6 +20,7 @@ import static com.google.cloud.spanner.BuiltInMetricsConstant.CLIENT_HASH_KEY; import static com.google.cloud.spanner.BuiltInMetricsConstant.CLIENT_NAME_KEY; import static com.google.cloud.spanner.BuiltInMetricsConstant.CLIENT_UID_KEY; +import static com.google.cloud.spanner.BuiltInMetricsConstant.DIRECT_PATH_ENABLED_KEY; import static com.google.cloud.spanner.BuiltInMetricsConstant.INSTANCE_CONFIG_ID_KEY; import static com.google.cloud.spanner.BuiltInMetricsConstant.LOCATION_ID_KEY; import static com.google.cloud.spanner.BuiltInMetricsConstant.PROJECT_ID_KEY; @@ -83,6 +84,7 @@ Map createClientAttributes(String projectId, String client_name) clientAttributes.put(LOCATION_ID_KEY.getKey(), detectClientLocation()); clientAttributes.put(PROJECT_ID_KEY.getKey(), projectId); // TODO: Replace this with real value. + clientAttributes.put(DIRECT_PATH_ENABLED_KEY.getKey(), "false"); clientAttributes.put(INSTANCE_CONFIG_ID_KEY.getKey(), "unknown"); clientAttributes.put(CLIENT_NAME_KEY.getKey(), client_name); String clientUid = getDefaultTaskValue(); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/CompositeTracer.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/CompositeTracer.java index 085a91fb88e..050116af8de 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/CompositeTracer.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/CompositeTracer.java @@ -184,7 +184,7 @@ public void addAttributes(Map attributes) { for (ApiTracer child : children) { if (child instanceof MetricsTracer) { MetricsTracer metricsTracer = (MetricsTracer) child; - attributes.forEach((key, value) -> metricsTracer.addAttributes(key, value)); + metricsTracer.addAttributes(attributes); } } } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java index 5756ff64b89..d7ba20b2d1b 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java @@ -1726,14 +1726,13 @@ private ApiTracerFactory getDefaultApiTracerFactory() { private ApiTracerFactory createMetricsApiTracerFactory() { OpenTelemetry openTelemetry = this.builtInOpenTelemetryMetricsProvider.getOrCreateOpenTelemetry( - getDefaultProjectId(), getCredentials()); + this.getProjectId(), getCredentials()); return openTelemetry != null ? new MetricsTracerFactory( new OpenTelemetryMetricsRecorder(openTelemetry, BuiltInMetricsConstant.METER_NAME), builtInOpenTelemetryMetricsProvider.createClientAttributes( - getDefaultProjectId(), - "spanner-java/" + GaxProperties.getLibraryVersion(getClass()))) + this.getProjectId(), "spanner-java/" + GaxProperties.getLibraryVersion(getClass()))) : null; } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetryBuiltInMetricsTracerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetryBuiltInMetricsTracerTest.java index d9586acc956..8e3d0986343 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetryBuiltInMetricsTracerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetryBuiltInMetricsTracerTest.java @@ -94,6 +94,7 @@ public static void setup() { Attributes.builder() .put(BuiltInMetricsConstant.PROJECT_ID_KEY, "test-project") .put(BuiltInMetricsConstant.INSTANCE_CONFIG_ID_KEY, "unknown") + .put(BuiltInMetricsConstant.DIRECT_PATH_ENABLED_KEY, "false") .put( BuiltInMetricsConstant.LOCATION_ID_KEY, BuiltInOpenTelemetryMetricsProvider.detectClientLocation()) From aeeea3cd1a1f9e8b5d7847df8db5baafe91e2874 Mon Sep 17 00:00:00 2001 From: Sri Harsha CH <57220027+harshachinta@users.noreply.github.com> Date: Fri, 18 Oct 2024 13:06:50 +0530 Subject: [PATCH 3/4] chore(spanner): track precommit token for R/W multiplexed session (#3411) When a read-write transaction is executed on a multiplexed session, the RPC responses of that transaction return a `MultiplexedSessionPrecommitToken`. In client library, the precommit token with the highest sequence number is tracked at the transaction context level. During the commit, this latest precommit token is fetched and set in the CommitRequest. If the precommit token is not set during the commit, the backend will throw an `INVALID_ARGUMENT` error. Including the latest token in the CommitRequest is essential to prevent latency regression, though it does not impact the correctness of the transaction. This PR tracks the precommit token from the following RPC responses, 1. ResultSet 2. PartialResultSet 3. ExecuteBatchDmlResponse --- .../cloud/spanner/AbstractReadContext.java | 8 + .../cloud/spanner/AbstractResultSet.java | 7 + .../google/cloud/spanner/GrpcResultSet.java | 2 +- .../cloud/spanner/GrpcValueIterator.java | 9 +- .../cloud/spanner/TransactionRunnerImpl.java | 49 ++++++ .../cloud/spanner/GrpcResultSetTest.java | 4 + .../cloud/spanner/MockSpannerServiceImpl.java | 92 ++++++++-- ...edSessionDatabaseClientMockServerTest.java | 157 ++++++++++++++++++ .../cloud/spanner/ReadFormatTestRunner.java | 4 + .../cloud/spanner/ResultSetsHelper.java | 4 + 10 files changed, 322 insertions(+), 14 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java index caf0e06379e..a89090e34d9 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java @@ -48,6 +48,7 @@ import com.google.spanner.v1.ExecuteSqlRequest; import com.google.spanner.v1.ExecuteSqlRequest.QueryMode; import com.google.spanner.v1.ExecuteSqlRequest.QueryOptions; +import com.google.spanner.v1.MultiplexedSessionPrecommitToken; import com.google.spanner.v1.PartialResultSet; import com.google.spanner.v1.ReadRequest; import com.google.spanner.v1.RequestOptions; @@ -893,6 +894,13 @@ public void onDone(boolean withBeginTransaction) { this.session.onReadDone(); } + /** + * For transactions other than read-write, the MultiplexedSessionPrecommitToken will not be + * present in the RPC response. In such cases, this method will be a no-op. + */ + @Override + public void onPrecommitToken(MultiplexedSessionPrecommitToken token) {} + private ResultSet readInternal( String table, @Nullable String index, diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java index 2cf93fb92ec..fdc0398d5fe 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java @@ -27,6 +27,7 @@ import com.google.protobuf.ListValue; import com.google.protobuf.ProtocolMessageEnum; import com.google.protobuf.Value.KindCase; +import com.google.spanner.v1.MultiplexedSessionPrecommitToken; import com.google.spanner.v1.Transaction; import java.io.IOException; import java.io.Serializable; @@ -57,6 +58,12 @@ void onTransactionMetadata(Transaction transaction, boolean shouldIncludeId) /** Called when the read finishes normally. */ void onDone(boolean withBeginTransaction); + + /** + * Called when the RPC response contains a MultiplexedSessionPrecommitToken. A precommit token + * will be included if the read-write transaction is executed on a multiplexed session. + */ + void onPrecommitToken(MultiplexedSessionPrecommitToken token); } static final class LazyByteArray implements Serializable { diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcResultSet.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcResultSet.java index be75c1e5c4e..23c9dd7c2d3 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcResultSet.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcResultSet.java @@ -47,7 +47,7 @@ class GrpcResultSet extends AbstractResultSet> implements ProtobufR GrpcResultSet( CloseableIterator iterator, Listener listener, DecodeMode decodeMode) { - this.iterator = new GrpcValueIterator(iterator); + this.iterator = new GrpcValueIterator(iterator, listener); this.listener = listener; this.decodeMode = decodeMode; } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcValueIterator.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcValueIterator.java index 0a2e17bd2b5..1a3df8b9123 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcValueIterator.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcValueIterator.java @@ -20,6 +20,7 @@ import static com.google.common.base.Preconditions.checkState; import com.google.cloud.spanner.AbstractResultSet.CloseableIterator; +import com.google.cloud.spanner.AbstractResultSet.Listener; import com.google.common.collect.AbstractIterator; import com.google.protobuf.ListValue; import com.google.protobuf.Value.KindCase; @@ -44,9 +45,11 @@ private enum StreamValue { private PartialResultSet current; private int pos; private ResultSetStats statistics; + private final Listener listener; - GrpcValueIterator(CloseableIterator stream) { + GrpcValueIterator(CloseableIterator stream, Listener listener) { this.stream = stream; + this.listener = listener; } @SuppressWarnings("unchecked") @@ -154,6 +157,10 @@ private boolean ensureReady(StreamValue requiredValue) throws SpannerException { ErrorCode.INTERNAL, "Invalid type metadata: " + e.getMessage(), e); } } + // collect the precommit token from each PartialResultSet + if (current.hasPrecommitToken()) { + listener.onPrecommitToken(current.getPrecommitToken()); + } if (current.hasStats()) { statistics = current.getStats(); } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java index 48affde3558..92d9c50aa9e 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java @@ -46,6 +46,7 @@ import com.google.spanner.v1.ExecuteBatchDmlResponse; import com.google.spanner.v1.ExecuteSqlRequest; import com.google.spanner.v1.ExecuteSqlRequest.QueryMode; +import com.google.spanner.v1.MultiplexedSessionPrecommitToken; import com.google.spanner.v1.RequestOptions; import com.google.spanner.v1.ResultSet; import com.google.spanner.v1.ResultSetStats; @@ -179,6 +180,11 @@ public void removeListener(Runnable listener) { @GuardedBy("committingLock") private volatile boolean committing; + private final Object precommitTokenLock = new Object(); + + @GuardedBy("precommitTokenLock") + private MultiplexedSessionPrecommitToken latestPrecommitToken; + @GuardedBy("lock") private volatile SettableApiFuture finishedAsyncOperations = SettableApiFuture.create(); @@ -439,6 +445,10 @@ public void run() { } requestBuilder.setRequestOptions(requestOptionsBuilder.build()); } + if (session.getIsMultiplexed() && getLatestPrecommitToken() != null) { + // Set the precommit token in the CommitRequest for multiplexed sessions. + requestBuilder.setPrecommitToken(getLatestPrecommitToken()); + } final CommitRequest commitRequest = requestBuilder.build(); span.addAnnotation("Starting Commit"); final ApiFuture commitFuture; @@ -643,6 +653,25 @@ public void onTransactionMetadata(Transaction transaction, boolean shouldInclude } } + /** + * In read-write transactions, the precommit token with the highest sequence number from this + * transaction attempt will be tracked and included in the + * [Commit][google.spanner.v1.Spanner.Commit] request for the transaction. + */ + @Override + public void onPrecommitToken(MultiplexedSessionPrecommitToken token) { + if (token == null) { + return; + } + synchronized (precommitTokenLock) { + if (this.latestPrecommitToken == null + || token.getSeqNum() > this.latestPrecommitToken.getSeqNum()) { + this.latestPrecommitToken = token; + txnLogger.log(Level.FINE, "Updating precommit token to " + this.latestPrecommitToken); + } + } + } + @Nullable String getTransactionTag() { if (this.options.hasTag()) { @@ -651,6 +680,13 @@ String getTransactionTag() { return null; } + @Nullable + MultiplexedSessionPrecommitToken getLatestPrecommitToken() { + synchronized (precommitTokenLock) { + return this.latestPrecommitToken; + } + } + @Override public SpannerException onError(SpannerException e, boolean withBeginTransaction) { e = super.onError(e, withBeginTransaction); @@ -829,6 +865,9 @@ private ResultSet internalExecuteUpdate( throw new IllegalArgumentException( "DML response missing stats possibly due to non-DML statement as input"); } + if (resultSet.hasPrecommitToken()) { + onPrecommitToken(resultSet.getPrecommitToken()); + } return resultSet; } catch (Throwable t) { throw onError( @@ -903,6 +942,9 @@ public ApiFuture executeUpdateAsync(Statement statement, UpdateOption... u resultSet.get().getMetadata().getTransaction(), builder.getTransaction().hasBegin()); } + if (resultSet.get().hasPrecommitToken()) { + onPrecommitToken(resultSet.get().getPrecommitToken()); + } } catch (Throwable e) { // Ignore this error here as it is handled by the future that is returned by the // executeUpdateAsync method. @@ -958,6 +1000,10 @@ public long[] batchUpdate(Iterable statements, UpdateOption... update } } + if (response.hasPrecommitToken()) { + onPrecommitToken(response.getPrecommitToken()); + } + // If one of the DML statements was aborted, we should throw an aborted exception. // In all other cases, we should throw a BatchUpdateException. if (response.getStatus().getCode() == Code.ABORTED_VALUE) { @@ -1022,6 +1068,9 @@ public ApiFuture batchUpdateAsync( builder.getTransaction().hasBegin()); } } + if (batchDmlResponse.hasPrecommitToken()) { + onPrecommitToken(batchDmlResponse.getPrecommitToken()); + } // If one of the DML statements was aborted, we should throw an aborted exception. // In all other cases, we should throw a BatchUpdateException. if (batchDmlResponse.getStatus().getCode() == Code.ABORTED_VALUE) { diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/GrpcResultSetTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/GrpcResultSetTest.java index 62336163eaf..59a18a3ab79 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/GrpcResultSetTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/GrpcResultSetTest.java @@ -36,6 +36,7 @@ import com.google.common.collect.ImmutableMap; import com.google.protobuf.ByteString; import com.google.spanner.v1.ExecuteSqlRequest.QueryMode; +import com.google.spanner.v1.MultiplexedSessionPrecommitToken; import com.google.spanner.v1.PartialResultSet; import com.google.spanner.v1.QueryPlan; import com.google.spanner.v1.ResultSetMetadata; @@ -77,6 +78,9 @@ public SpannerException onError(SpannerException e, boolean withBeginTransaction @Override public void onDone(boolean withBeginTransaction) {} + + @Override + public void onPrecommitToken(MultiplexedSessionPrecommitToken token) {} } @Before diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java index 9f0a2822d87..014ab7e94e3 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java @@ -54,6 +54,7 @@ import com.google.spanner.v1.GetSessionRequest; import com.google.spanner.v1.ListSessionsRequest; import com.google.spanner.v1.ListSessionsResponse; +import com.google.spanner.v1.MultiplexedSessionPrecommitToken; import com.google.spanner.v1.PartialResultSet; import com.google.spanner.v1.Partition; import com.google.spanner.v1.PartitionOptions; @@ -197,10 +198,15 @@ private static class PartialResultSetsIterator implements Iterator transactionCounters = new ConcurrentHashMap<>(); private ConcurrentMap> partitionTokens = new ConcurrentHashMap<>(); private ConcurrentMap transactionLastUsed = new ConcurrentHashMap<>(); + + // Stores the latest sequence number needed for the precommit token. + // The transaction entry is created only if the transaction is read-write and executed on a + // multiplexed session. + private static ConcurrentMap transactionSequenceNo = + new ConcurrentHashMap<>(); private int maxNumSessionsInOneBatch = 100; private int maxTotalSessions = Integer.MAX_VALUE; private Iterable batchWriteResult = new ArrayList<>(); @@ -1020,7 +1035,11 @@ public void executeSql(ExecuteSqlRequest request, StreamObserver resp throw result.getException(); case RESULT_SET: returnResultSet( - result.getResultSet(), transactionId, request.getTransaction(), responseObserver); + result.getResultSet(), + transactionId, + request.getTransaction(), + responseObserver, + session); break; case UPDATE_COUNT: if (isPartitionedDmlTransaction(transactionId)) { @@ -1033,7 +1052,7 @@ public void executeSql(ExecuteSqlRequest request, StreamObserver resp .build()) .build()); } else { - responseObserver.onNext( + ResultSet.Builder resultSetBuilder = ResultSet.newBuilder() .setStats( ResultSetStats.newBuilder() @@ -1045,8 +1064,11 @@ public void executeSql(ExecuteSqlRequest request, StreamObserver resp ignoreNextInlineBeginRequest.getAndSet(false) ? Transaction.getDefaultInstance() : Transaction.newBuilder().setId(transactionId).build()) - .build()) - .build()); + .build()); + if (session.getMultiplexed() && isReadWriteTransaction(transactionId)) { + resultSetBuilder.setPrecommitToken(getResultSetPrecommitToken(transactionId)); + } + responseObserver.onNext(resultSetBuilder.build()); } break; default: @@ -1064,7 +1086,8 @@ private void returnResultSet( ResultSet resultSet, ByteString transactionId, TransactionSelector transactionSelector, - StreamObserver responseObserver) { + StreamObserver responseObserver, + Session session) { ResultSetMetadata metadata = resultSet.getMetadata(); if (transactionId != null) { metadata = @@ -1079,7 +1102,12 @@ private void returnResultSet( Transaction transaction = getTemporaryTransactionOrNull(transactionSelector); metadata = metadata.toBuilder().setTransaction(transaction).build(); } - resultSet = resultSet.toBuilder().setMetadata(metadata).build(); + ResultSet.Builder resultSetBuilder = resultSet.toBuilder(); + resultSetBuilder.setMetadata(metadata); + if (session.getMultiplexed() && isReadWriteTransaction(transactionId)) { + resultSetBuilder.setPrecommitToken(getResultSetPrecommitToken(transactionId)); + } + resultSet = resultSetBuilder.build(); responseObserver.onNext(resultSet); } @@ -1174,6 +1202,9 @@ public void executeBatchDml( .build()); } builder.setStatus(status); + if (session.getMultiplexed() && isReadWriteTransaction(transactionId)) { + builder.setPrecommitToken(getExecuteBatchDmlResponsePrecommitToken(transactionId)); + } responseObserver.onNext(builder.build()); responseObserver.onCompleted(); } catch (StatusRuntimeException e) { @@ -1242,7 +1273,8 @@ public void executeStreamingSql( transactionId, request.getTransaction(), responseObserver, - getExecuteStreamingSqlExecutionTime()); + getExecuteStreamingSqlExecutionTime(), + session.getMultiplexed()); break; case UPDATE_COUNT: if (isPartitioned) { @@ -1612,7 +1644,7 @@ public void read(final ReadRequest request, StreamObserver responseOb cols); StatementResult res = getResult(statement); returnResultSet( - res.getResultSet(), transactionId, request.getTransaction(), responseObserver); + res.getResultSet(), transactionId, request.getTransaction(), responseObserver, session); responseObserver.onCompleted(); } catch (StatusRuntimeException e) { responseObserver.onError(e); @@ -1670,7 +1702,8 @@ public void streamingRead( transactionId, request.getTransaction(), responseObserver, - getStreamingReadExecutionTime()); + getStreamingReadExecutionTime(), + session.getMultiplexed()); } catch (StatusRuntimeException e) { responseObserver.onError(e); } catch (Throwable t) { @@ -1683,7 +1716,8 @@ private void returnPartialResultSet( ByteString transactionId, TransactionSelector transactionSelector, StreamObserver responseObserver, - SimulatedExecutionTime executionTime) + SimulatedExecutionTime executionTime, + boolean isMultiplexedSession) throws Exception { ResultSetMetadata metadata = resultSet.getMetadata(); if (transactionId == null) { @@ -1700,7 +1734,11 @@ private void returnPartialResultSet( .build(); } resultSet = resultSet.toBuilder().setMetadata(metadata).build(); - PartialResultSetsIterator iterator = new PartialResultSetsIterator(resultSet); + PartialResultSetsIterator iterator = + new PartialResultSetsIterator( + resultSet, + isMultiplexedSession && isReadWriteTransaction(transactionId), + transactionId); long index = 0L; while (iterator.hasNext()) { SimulatedExecutionTime.checkStreamException( @@ -2034,6 +2072,7 @@ private void commitTransaction(ByteString transactionId) { transactions.remove(transactionId); isPartitionedDmlTransaction.remove(transactionId); transactionLastUsed.remove(transactionId); + transactionSequenceNo.remove(transactionId); } @Override @@ -2065,6 +2104,7 @@ void rollbackTransaction(ByteString transactionId) { transactions.remove(transactionId); isPartitionedDmlTransaction.remove(transactionId); transactionLastUsed.remove(transactionId); + transactionSequenceNo.remove(transactionId); } void markAbortedTransaction(ByteString transactionId) { @@ -2072,6 +2112,7 @@ void markAbortedTransaction(ByteString transactionId) { transactions.remove(transactionId); isPartitionedDmlTransaction.remove(transactionId); transactionLastUsed.remove(transactionId); + transactionSequenceNo.remove(transactionId); } @Override @@ -2276,6 +2317,7 @@ public void reset() { transactionCounters = new ConcurrentHashMap<>(); partitionTokens = new ConcurrentHashMap<>(); transactionLastUsed = new ConcurrentHashMap<>(); + transactionSequenceNo = new ConcurrentHashMap<>(); numSessionsCreated.set(0); stickyGlobalExceptions = false; @@ -2447,4 +2489,30 @@ Session getSession(String name) { } return null; } + + static MultiplexedSessionPrecommitToken getResultSetPrecommitToken(ByteString transactionId) { + return getPrecommitToken("ResultSetPrecommitToken", transactionId); + } + + static MultiplexedSessionPrecommitToken getPartialResultSetPrecommitToken( + ByteString transactionId) { + return getPrecommitToken("PartialResultSetPrecommitToken", transactionId); + } + + static MultiplexedSessionPrecommitToken getExecuteBatchDmlResponsePrecommitToken( + ByteString transactionId) { + return getPrecommitToken("ExecuteBatchDmlResponsePrecommitToken", transactionId); + } + + static MultiplexedSessionPrecommitToken getPrecommitToken( + String value, ByteString transactionId) { + transactionSequenceNo.putIfAbsent(transactionId, new AtomicInteger(0)); + + // Generates an incrementing sequence number + int seqNum = transactionSequenceNo.get(transactionId).incrementAndGet(); + return MultiplexedSessionPrecommitToken.newBuilder() + .setPrecommitToken(ByteString.copyFromUtf8(value)) + .setSeqNum(seqNum) + .build(); + } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java index adf7ed2a403..c7d7b697d64 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java @@ -41,6 +41,7 @@ import com.google.cloud.spanner.connection.RandomResultSetGenerator; import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.ByteString; import com.google.spanner.v1.BeginTransactionRequest; @@ -926,6 +927,162 @@ public void testAbortedReadWriteTxnUsesPreviousTxnIdOnRetryWithExplicitBegin() { .getMultiplexedSessionPreviousTransactionId()); } + @Test + public void testPrecommitTokenForResultSet() { + // This test verifies that the precommit token received from the ResultSet is properly tracked + // and set in the CommitRequest. + DatabaseClientImpl client = + (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + + Long count = + client + .readWriteTransaction() + .run( + transaction -> { + long res = transaction.executeUpdate(UPDATE_STATEMENT); + + // Verify that the latest precommit token is tracked in the transaction context. + TransactionContextImpl impl = (TransactionContextImpl) transaction; + assertNotNull(impl.getLatestPrecommitToken()); + assertEquals( + ByteString.copyFromUtf8("ResultSetPrecommitToken"), + impl.getLatestPrecommitToken().getPrecommitToken()); + return res; + }); + + assertNotNull(count); + assertEquals(1, count.longValue()); + + // Verify that the latest precommit token is set in the CommitRequest + List commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class); + assertEquals(1, commitRequests.size()); + assertTrue(mockSpanner.getSession(commitRequests.get(0).getSession()).getMultiplexed()); + assertNotNull(commitRequests.get(0).getPrecommitToken()); + assertEquals( + ByteString.copyFromUtf8("ResultSetPrecommitToken"), + commitRequests.get(0).getPrecommitToken().getPrecommitToken()); + } + + @Test + public void testPrecommitTokenForExecuteBatchDmlResponse() { + // This test verifies that the precommit token received from the ExecuteBatchDmlResponse is + // properly tracked and set in the CommitRequest. + DatabaseClientImpl client = + (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + + long[] count = + client + .readWriteTransaction() + .run( + transaction -> { + long[] res = transaction.batchUpdate(Lists.newArrayList(UPDATE_STATEMENT)); + + // Verify that the latest precommit token is tracked in the transaction context. + TransactionContextImpl impl = (TransactionContextImpl) transaction; + assertNotNull(impl.getLatestPrecommitToken()); + assertEquals( + ByteString.copyFromUtf8("ExecuteBatchDmlResponsePrecommitToken"), + impl.getLatestPrecommitToken().getPrecommitToken()); + return res; + }); + + assertNotNull(count); + assertEquals(1, count.length); + + // Verify that the latest precommit token is set in the CommitRequest + List commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class); + assertEquals(1, commitRequests.size()); + assertTrue(mockSpanner.getSession(commitRequests.get(0).getSession()).getMultiplexed()); + assertNotNull(commitRequests.get(0).getPrecommitToken()); + assertEquals( + ByteString.copyFromUtf8("ExecuteBatchDmlResponsePrecommitToken"), + commitRequests.get(0).getPrecommitToken().getPrecommitToken()); + } + + @Test + public void testPrecommitTokenForPartialResultSet() { + // This test verifies that the precommit token received from the PartialResultSet is properly + // tracked and set in the CommitRequest. + DatabaseClientImpl client = + (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + + client + .readWriteTransaction() + .run( + transaction -> { + ResultSet resultSet = transaction.executeQuery(STATEMENT); + //noinspection StatementWithEmptyBody + while (resultSet.next()) { + // ignore + } + + // Verify that the latest precommit token is tracked in the transaction context. + TransactionContextImpl impl = (TransactionContextImpl) transaction; + assertNotNull(impl.getLatestPrecommitToken()); + assertEquals( + ByteString.copyFromUtf8("PartialResultSetPrecommitToken"), + impl.getLatestPrecommitToken().getPrecommitToken()); + return null; + }); + + // Verify that the latest precommit token is set in the CommitRequest + List commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class); + assertEquals(1, commitRequests.size()); + assertTrue(mockSpanner.getSession(commitRequests.get(0).getSession()).getMultiplexed()); + assertNotNull(commitRequests.get(0).getPrecommitToken()); + assertEquals( + ByteString.copyFromUtf8("PartialResultSetPrecommitToken"), + commitRequests.get(0).getPrecommitToken().getPrecommitToken()); + } + + @Test + public void testTxnTracksPrecommitTokenWithLatestSeqNo() { + // This test ensures that the read-write transaction tracks the precommit token with the + // highest sequence number and sets it in the CommitRequest. + DatabaseClientImpl client = + (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + + client + .readWriteTransaction() + .run( + transaction -> { + // Returns a ResultSet containing the precommit token (ResultSetPrecommitToken) + transaction.executeUpdate(UPDATE_STATEMENT); + + // Returns a PartialResultSet containing the precommit token + // (PartialResultSetPrecommitToken) + ResultSet resultSet = transaction.executeQuery(STATEMENT); + //noinspection StatementWithEmptyBody + while (resultSet.next()) { + // ignore + } + + // Returns an ExecuteBatchDmlResponse containing the precommit token + // (ExecuteBatchDmlResponsePrecommitToken). + // Since this is the last request received by the mock Spanner, it should be the most + // recent precommit token tracked by the transaction context. + transaction.batchUpdate(Lists.newArrayList(UPDATE_STATEMENT)); + + // Verify that the latest precommit token with highest sequence number is tracked in + // the transaction context. + TransactionContextImpl impl = (TransactionContextImpl) transaction; + assertNotNull(impl.getLatestPrecommitToken()); + assertEquals( + ByteString.copyFromUtf8("ExecuteBatchDmlResponsePrecommitToken"), + impl.getLatestPrecommitToken().getPrecommitToken()); + return null; + }); + + // Verify that the latest precommit token is set in the CommitRequest + List commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class); + assertEquals(1, commitRequests.size()); + assertTrue(mockSpanner.getSession(commitRequests.get(0).getSession()).getMultiplexed()); + assertNotNull(commitRequests.get(0).getPrecommitToken()); + assertEquals( + ByteString.copyFromUtf8("ExecuteBatchDmlResponsePrecommitToken"), + commitRequests.get(0).getPrecommitToken().getPrecommitToken()); + } + private void waitForSessionToBeReplaced(DatabaseClientImpl client) { assertNotNull(client.multiplexedSessionDatabaseClient); SessionReference sessionReference = diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ReadFormatTestRunner.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ReadFormatTestRunner.java index c973b7e471e..2a399e6f486 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ReadFormatTestRunner.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ReadFormatTestRunner.java @@ -24,6 +24,7 @@ import com.google.cloud.spanner.spi.v1.SpannerRpc; import com.google.common.io.Resources; import com.google.protobuf.util.JsonFormat; +import com.google.spanner.v1.MultiplexedSessionPrecommitToken; import com.google.spanner.v1.PartialResultSet; import com.google.spanner.v1.Transaction; import java.math.BigDecimal; @@ -56,6 +57,9 @@ public SpannerException onError(SpannerException e, boolean withBeginTransaction @Override public void onDone(boolean withBeginTransaction) {} + + @Override + public void onPrecommitToken(MultiplexedSessionPrecommitToken token) {} } public ReadFormatTestRunner(Class clazz) throws InitializationError { diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ResultSetsHelper.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ResultSetsHelper.java index fc494c6f3ff..404973336ba 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ResultSetsHelper.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ResultSetsHelper.java @@ -19,6 +19,7 @@ import com.google.cloud.spanner.AbstractResultSet.CloseableIterator; import com.google.cloud.spanner.AbstractResultSet.Listener; import com.google.protobuf.ListValue; +import com.google.spanner.v1.MultiplexedSessionPrecommitToken; import com.google.spanner.v1.PartialResultSet; import com.google.spanner.v1.Transaction; import java.util.Iterator; @@ -82,6 +83,9 @@ public SpannerException onError(SpannerException e, boolean withBeginTransaction @Override public void onDone(boolean withBeginTransaction) {} + + @Override + public void onPrecommitToken(MultiplexedSessionPrecommitToken token) {} }); } } From 16cc6eed58cf735026d7757a28f61f29821a14bf Mon Sep 17 00:00:00 2001 From: Ankit Agarwal <146331865+ankiaga@users.noreply.github.com> Date: Tue, 22 Oct 2024 09:28:12 +0530 Subject: [PATCH 4/4] feat: Enabling endToEndTracing support in Connection API (#3412) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: Enabling endToEndTracing support in Connection API * fix formatting * Comments incorporated * formatting fixed * formatting fixed * Update google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionOptions.java Co-authored-by: Knut Olav Løite * Update google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionProperties.java Co-authored-by: Knut Olav Løite --------- Co-authored-by: Knut Olav Løite --- .../spanner/connection/ConnectionOptions.java | 17 +++++- .../connection/ConnectionProperties.java | 12 +++++ .../cloud/spanner/connection/SpannerPool.java | 11 +++- .../connection/ConnectionOptionsTest.java | 21 ++++++++ .../spanner/connection/SpannerPoolTest.java | 52 +++++++++++++++++++ 5 files changed, 110 insertions(+), 3 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionOptions.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionOptions.java index dcc4c663bb3..66b09c35afd 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionOptions.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionOptions.java @@ -26,6 +26,7 @@ import static com.google.cloud.spanner.connection.ConnectionProperties.DATA_BOOST_ENABLED; import static com.google.cloud.spanner.connection.ConnectionProperties.DIALECT; import static com.google.cloud.spanner.connection.ConnectionProperties.ENABLE_API_TRACING; +import static com.google.cloud.spanner.connection.ConnectionProperties.ENABLE_END_TO_END_TRACING; import static com.google.cloud.spanner.connection.ConnectionProperties.ENABLE_EXTENDED_TRACING; import static com.google.cloud.spanner.connection.ConnectionProperties.ENCODED_CREDENTIALS; import static com.google.cloud.spanner.connection.ConnectionProperties.ENDPOINT; @@ -249,6 +250,7 @@ public String[] getValidValues() { static final int DEFAULT_MAX_PARTITIONED_PARALLELISM = 1; static final Boolean DEFAULT_ENABLE_EXTENDED_TRACING = null; static final Boolean DEFAULT_ENABLE_API_TRACING = null; + static final boolean DEFAULT_ENABLE_END_TO_END_TRACING = false; static final boolean DEFAULT_AUTO_BATCH_DML = false; static final long DEFAULT_AUTO_BATCH_DML_UPDATE_COUNT = 1L; static final boolean DEFAULT_AUTO_BATCH_DML_UPDATE_COUNT_VERIFICATION = true; @@ -335,6 +337,7 @@ public String[] getValidValues() { public static final String ENABLE_EXTENDED_TRACING_PROPERTY_NAME = "enableExtendedTracing"; public static final String ENABLE_API_TRACING_PROPERTY_NAME = "enableApiTracing"; + public static final String ENABLE_END_TO_END_TRACING_PROPERTY_NAME = "enableEndToEndTracing"; public static final String AUTO_BATCH_DML_PROPERTY_NAME = "auto_batch_dml"; public static final String AUTO_BATCH_DML_UPDATE_COUNT_PROPERTY_NAME = @@ -537,7 +540,14 @@ static boolean isEnableTransactionalConnectionStateForPostgreSQL() { + "to get a detailed view of each RPC that is being executed by your application, " + "or if you want to debug potential latency problems caused by RPCs that are " + "being retried.", - DEFAULT_ENABLE_API_TRACING)))); + DEFAULT_ENABLE_API_TRACING), + ConnectionProperty.createBooleanProperty( + ENABLE_END_TO_END_TRACING_PROPERTY_NAME, + "Enable end-to-end tracing (true/false) to generate traces for both the time " + + "that is spent in the client, as well as time that is spent in the Spanner server. " + + "Server side traces can only go to Google Cloud Trace, so to see end to end traces, " + + "the application should configure an exporter that exports the traces to Google Cloud Trace.", + DEFAULT_ENABLE_END_TO_END_TRACING)))); private static final Set INTERNAL_PROPERTIES = Collections.unmodifiableSet( @@ -1205,6 +1215,11 @@ public boolean isRouteToLeader() { return getInitialConnectionPropertyValue(ROUTE_TO_LEADER); } + /** Whether end-to-end tracing is enabled. */ + public boolean isEndToEndTracingEnabled() { + return getInitialConnectionPropertyValue(ENABLE_END_TO_END_TRACING); + } + /** * The initial retryAbortsInternally value for connections created by this {@link * ConnectionOptions} diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionProperties.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionProperties.java index c62571c6c03..0ca9b7256e2 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionProperties.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionProperties.java @@ -39,6 +39,7 @@ import static com.google.cloud.spanner.connection.ConnectionOptions.DEFAULT_DDL_IN_TRANSACTION_MODE; import static com.google.cloud.spanner.connection.ConnectionOptions.DEFAULT_DELAY_TRANSACTION_START_UNTIL_FIRST_WRITE; import static com.google.cloud.spanner.connection.ConnectionOptions.DEFAULT_ENABLE_API_TRACING; +import static com.google.cloud.spanner.connection.ConnectionOptions.DEFAULT_ENABLE_END_TO_END_TRACING; import static com.google.cloud.spanner.connection.ConnectionOptions.DEFAULT_ENABLE_EXTENDED_TRACING; import static com.google.cloud.spanner.connection.ConnectionOptions.DEFAULT_ENDPOINT; import static com.google.cloud.spanner.connection.ConnectionOptions.DEFAULT_KEEP_TRANSACTION_ALIVE; @@ -65,6 +66,7 @@ import static com.google.cloud.spanner.connection.ConnectionOptions.DELAY_TRANSACTION_START_UNTIL_FIRST_WRITE_NAME; import static com.google.cloud.spanner.connection.ConnectionOptions.DIALECT_PROPERTY_NAME; import static com.google.cloud.spanner.connection.ConnectionOptions.ENABLE_API_TRACING_PROPERTY_NAME; +import static com.google.cloud.spanner.connection.ConnectionOptions.ENABLE_END_TO_END_TRACING_PROPERTY_NAME; import static com.google.cloud.spanner.connection.ConnectionOptions.ENABLE_EXTENDED_TRACING_PROPERTY_NAME; import static com.google.cloud.spanner.connection.ConnectionOptions.ENCODED_CREDENTIALS_PROPERTY_NAME; import static com.google.cloud.spanner.connection.ConnectionOptions.ENDPOINT_PROPERTY_NAME; @@ -292,6 +294,16 @@ class ConnectionProperties { DEFAULT_ENABLE_API_TRACING, BooleanConverter.INSTANCE, Context.STARTUP); + static final ConnectionProperty ENABLE_END_TO_END_TRACING = + create( + ENABLE_END_TO_END_TRACING_PROPERTY_NAME, + "Enable end-to-end tracing (true/false) to generate traces for both the time " + + "that is spent in the client, as well as time that is spent in the Spanner server. " + + "Server side traces can only go to Google Cloud Trace, so to see end to end traces, " + + "the application should configure an exporter that exports the traces to Google Cloud Trace.", + DEFAULT_ENABLE_END_TO_END_TRACING, + BooleanConverter.INSTANCE, + Context.STARTUP); static final ConnectionProperty MIN_SESSIONS = create( MIN_SESSIONS_PROPERTY_NAME, diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/SpannerPool.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/SpannerPool.java index 246d340b070..81246e41938 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/SpannerPool.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/SpannerPool.java @@ -160,6 +160,7 @@ static class SpannerPoolKey { private final OpenTelemetry openTelemetry; private final Boolean enableExtendedTracing; private final Boolean enableApiTracing; + private final boolean enableEndToEndTracing; @VisibleForTesting static SpannerPoolKey of(ConnectionOptions options) { @@ -190,6 +191,7 @@ private SpannerPoolKey(ConnectionOptions options) throws IOException { this.openTelemetry = options.getOpenTelemetry(); this.enableExtendedTracing = options.isEnableExtendedTracing(); this.enableApiTracing = options.isEnableApiTracing(); + this.enableEndToEndTracing = options.isEndToEndTracingEnabled(); } @Override @@ -211,7 +213,8 @@ public boolean equals(Object o) { this.useVirtualGrpcTransportThreads, other.useVirtualGrpcTransportThreads) && Objects.equals(this.openTelemetry, other.openTelemetry) && Objects.equals(this.enableExtendedTracing, other.enableExtendedTracing) - && Objects.equals(this.enableApiTracing, other.enableApiTracing); + && Objects.equals(this.enableApiTracing, other.enableApiTracing) + && Objects.equals(this.enableEndToEndTracing, other.enableEndToEndTracing); } @Override @@ -229,7 +232,8 @@ public int hashCode() { this.useVirtualGrpcTransportThreads, this.openTelemetry, this.enableExtendedTracing, - this.enableApiTracing); + this.enableApiTracing, + this.enableEndToEndTracing); } } @@ -380,6 +384,9 @@ Spanner createSpanner(SpannerPoolKey key, ConnectionOptions options) { if (!options.isRouteToLeader()) { builder.disableLeaderAwareRouting(); } + if (options.isEndToEndTracingEnabled()) { + builder.setEnableEndToEndTracing(true); + } if (key.usePlainText) { // Credentials may not be sent over a plain text channel. builder.setCredentials(NoCredentials.getInstance()); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ConnectionOptionsTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ConnectionOptionsTest.java index 94a44579acf..f826ec08dfc 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ConnectionOptionsTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ConnectionOptionsTest.java @@ -322,6 +322,27 @@ public void testBuildWithRouteToLeader() { assertTrue(options.isRouteToLeader()); } + @Test + public void testBuildWithEndToEndTracingEnabled() { + final String BASE_URI = + "cloudspanner:/projects/test-project-123/instances/test-instance-123/databases/test-database-123"; + ConnectionOptions.Builder builder = ConnectionOptions.newBuilder(); + builder.setUri(BASE_URI + "?enableEndToEndTracing=true"); + builder.setCredentialsUrl(FILE_TEST_PATH); + ConnectionOptions options = builder.build(); + assertEquals(options.getHost(), DEFAULT_HOST); + assertEquals(options.getProjectId(), TEST_PROJECT); + assertEquals(options.getInstanceId(), TEST_INSTANCE); + assertEquals(options.getDatabaseName(), TEST_DATABASE); + assertTrue(options.isEndToEndTracingEnabled()); + + // Test for default behavior for enableEndToEndTracing property. + builder = ConnectionOptions.newBuilder().setUri(BASE_URI); + builder.setCredentialsUrl(FILE_TEST_PATH); + options = builder.build(); + assertFalse(options.isEndToEndTracingEnabled()); + } + @Test public void testBuildWithAutoConfigEmulatorAndHost() { ConnectionOptions.Builder builder = ConnectionOptions.newBuilder(); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/SpannerPoolTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/SpannerPoolTest.java index 19d49139635..fea0b8aa6cf 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/SpannerPoolTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/SpannerPoolTest.java @@ -563,6 +563,58 @@ public void testEnableApiTracing() { .build())); } + @Test + public void testEnableEndToEndTracing() { + SpannerPoolKey keyWithoutApiTracingConfig = + SpannerPoolKey.of( + ConnectionOptions.newBuilder() + .setUri("cloudspanner:/projects/p/instances/i/databases/d") + .setCredentials(NoCredentials.getInstance()) + .build()); + SpannerPoolKey keyWithApiTracingEnabled = + SpannerPoolKey.of( + ConnectionOptions.newBuilder() + .setUri( + "cloudspanner:/projects/p/instances/i/databases/d?enableEndToEndTracing=true") + .setCredentials(NoCredentials.getInstance()) + .build()); + SpannerPoolKey keyWithApiTracingDisabled = + SpannerPoolKey.of( + ConnectionOptions.newBuilder() + .setUri( + "cloudspanner:/projects/p/instances/i/databases/d?enableEndToEndTracing=false") + .setCredentials(NoCredentials.getInstance()) + .build()); + + assertNotEquals(keyWithoutApiTracingConfig, keyWithApiTracingEnabled); + assertEquals(keyWithoutApiTracingConfig, keyWithApiTracingDisabled); + assertNotEquals(keyWithApiTracingEnabled, keyWithApiTracingDisabled); + + assertEquals( + keyWithApiTracingEnabled, + SpannerPoolKey.of( + ConnectionOptions.newBuilder() + .setUri( + "cloudspanner:/projects/p/instances/i/databases/d?enableEndToEndTracing=true") + .setCredentials(NoCredentials.getInstance()) + .build())); + assertEquals( + keyWithApiTracingDisabled, + SpannerPoolKey.of( + ConnectionOptions.newBuilder() + .setUri( + "cloudspanner:/projects/p/instances/i/databases/d?enableEndToEndTracing=false") + .setCredentials(NoCredentials.getInstance()) + .build())); + assertEquals( + keyWithoutApiTracingConfig, + SpannerPoolKey.of( + ConnectionOptions.newBuilder() + .setUri("cloudspanner:/projects/p/instances/i/databases/d") + .setCredentials(NoCredentials.getInstance()) + .build())); + } + @Test public void testOpenTelemetry() { SpannerPool pool = createSubjectAndMocks();