diff --git a/google-cloud-spanner/pom.xml b/google-cloud-spanner/pom.xml
index 1f33ef2d822..384c21594ab 100644
--- a/google-cloud-spanner/pom.xml
+++ b/google-cloud-spanner/pom.xml
@@ -460,12 +460,6 @@
opentelemetry-sdk-testing
test
-
- org.jetbrains
- annotations
- 24.1.0
- compile
-
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java
index caf0e06379e..a89090e34d9 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java
@@ -48,6 +48,7 @@
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.ExecuteSqlRequest.QueryMode;
import com.google.spanner.v1.ExecuteSqlRequest.QueryOptions;
+import com.google.spanner.v1.MultiplexedSessionPrecommitToken;
import com.google.spanner.v1.PartialResultSet;
import com.google.spanner.v1.ReadRequest;
import com.google.spanner.v1.RequestOptions;
@@ -893,6 +894,13 @@ public void onDone(boolean withBeginTransaction) {
this.session.onReadDone();
}
+ /**
+ * For transactions other than read-write, the MultiplexedSessionPrecommitToken will not be
+ * present in the RPC response. In such cases, this method will be a no-op.
+ */
+ @Override
+ public void onPrecommitToken(MultiplexedSessionPrecommitToken token) {}
+
private ResultSet readInternal(
String table,
@Nullable String index,
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java
index 2d4088e4866..734d3bcffc3 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java
@@ -27,6 +27,7 @@
import com.google.protobuf.ListValue;
import com.google.protobuf.ProtocolMessageEnum;
import com.google.protobuf.Value.KindCase;
+import com.google.spanner.v1.MultiplexedSessionPrecommitToken;
import com.google.spanner.v1.Transaction;
import java.io.IOException;
import java.io.Serializable;
@@ -57,6 +58,12 @@ void onTransactionMetadata(Transaction transaction, boolean shouldIncludeId)
/** Called when the read finishes normally. */
void onDone(boolean withBeginTransaction);
+
+ /**
+ * Called when the RPC response contains a MultiplexedSessionPrecommitToken. A precommit token
+ * will be included if the read-write transaction is executed on a multiplexed session.
+ */
+ void onPrecommitToken(MultiplexedSessionPrecommitToken token);
}
static final class LazyByteArray implements Serializable {
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java
index 882f709966b..0057bb15bea 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java
@@ -28,6 +28,7 @@
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.MoreExecutors;
+import com.google.protobuf.ByteString;
/** Implementation of {@link AsyncTransactionManager}. */
final class AsyncTransactionManagerImpl
@@ -80,7 +81,19 @@ public TransactionContextFutureImpl beginAsync() {
private ApiFuture internalBeginAsync(boolean firstAttempt) {
txnState = TransactionState.STARTED;
- txn = session.newTransaction(options);
+
+ // Determine the latest transactionId when using a multiplexed session.
+ ByteString multiplexedSessionPreviousTransactionId = ByteString.EMPTY;
+ if (txn != null && session.getIsMultiplexed() && !firstAttempt) {
+ // Use the current transactionId if available, otherwise fallback to the previous aborted
+ // transactionId.
+ multiplexedSessionPreviousTransactionId =
+ txn.transactionId != null ? txn.transactionId : txn.getPreviousTransactionId();
+ }
+
+ txn =
+ session.newTransaction(
+ options, /* previousTransactionId = */ multiplexedSessionPreviousTransactionId);
if (firstAttempt) {
session.setActive(this);
}
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BuiltInOpenTelemetryMetricsProvider.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BuiltInOpenTelemetryMetricsProvider.java
index a7665f8556a..ef1d70eec25 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BuiltInOpenTelemetryMetricsProvider.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BuiltInOpenTelemetryMetricsProvider.java
@@ -20,6 +20,7 @@
import static com.google.cloud.spanner.BuiltInMetricsConstant.CLIENT_HASH_KEY;
import static com.google.cloud.spanner.BuiltInMetricsConstant.CLIENT_NAME_KEY;
import static com.google.cloud.spanner.BuiltInMetricsConstant.CLIENT_UID_KEY;
+import static com.google.cloud.spanner.BuiltInMetricsConstant.DIRECT_PATH_ENABLED_KEY;
import static com.google.cloud.spanner.BuiltInMetricsConstant.INSTANCE_CONFIG_ID_KEY;
import static com.google.cloud.spanner.BuiltInMetricsConstant.LOCATION_ID_KEY;
import static com.google.cloud.spanner.BuiltInMetricsConstant.PROJECT_ID_KEY;
@@ -83,6 +84,7 @@ Map createClientAttributes(String projectId, String client_name)
clientAttributes.put(LOCATION_ID_KEY.getKey(), detectClientLocation());
clientAttributes.put(PROJECT_ID_KEY.getKey(), projectId);
// TODO: Replace this with real value.
+ clientAttributes.put(DIRECT_PATH_ENABLED_KEY.getKey(), "false");
clientAttributes.put(INSTANCE_CONFIG_ID_KEY.getKey(), "unknown");
clientAttributes.put(CLIENT_NAME_KEY.getKey(), client_name);
String clientUid = getDefaultTaskValue();
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/CompositeTracer.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/CompositeTracer.java
index 085a91fb88e..050116af8de 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/CompositeTracer.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/CompositeTracer.java
@@ -184,7 +184,7 @@ public void addAttributes(Map attributes) {
for (ApiTracer child : children) {
if (child instanceof MetricsTracer) {
MetricsTracer metricsTracer = (MetricsTracer) child;
- attributes.forEach((key, value) -> metricsTracer.addAttributes(key, value));
+ metricsTracer.addAttributes(attributes);
}
}
}
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java
index b9d1ce054d7..91edce79325 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java
@@ -59,7 +59,7 @@ class DatabaseClientImpl implements DatabaseClient {
/* useMultiplexedSessionBlindWrite = */ false,
/* multiplexedSessionDatabaseClient = */ null,
tracer,
- false);
+ /* useMultiplexedSessionForRW = */ false);
}
DatabaseClientImpl(
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcResultSet.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcResultSet.java
index be75c1e5c4e..23c9dd7c2d3 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcResultSet.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcResultSet.java
@@ -47,7 +47,7 @@ class GrpcResultSet extends AbstractResultSet> implements ProtobufR
GrpcResultSet(
CloseableIterator iterator, Listener listener, DecodeMode decodeMode) {
- this.iterator = new GrpcValueIterator(iterator);
+ this.iterator = new GrpcValueIterator(iterator, listener);
this.listener = listener;
this.decodeMode = decodeMode;
}
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcValueIterator.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcValueIterator.java
index 0a2e17bd2b5..1a3df8b9123 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcValueIterator.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcValueIterator.java
@@ -20,6 +20,7 @@
import static com.google.common.base.Preconditions.checkState;
import com.google.cloud.spanner.AbstractResultSet.CloseableIterator;
+import com.google.cloud.spanner.AbstractResultSet.Listener;
import com.google.common.collect.AbstractIterator;
import com.google.protobuf.ListValue;
import com.google.protobuf.Value.KindCase;
@@ -44,9 +45,11 @@ private enum StreamValue {
private PartialResultSet current;
private int pos;
private ResultSetStats statistics;
+ private final Listener listener;
- GrpcValueIterator(CloseableIterator stream) {
+ GrpcValueIterator(CloseableIterator stream, Listener listener) {
this.stream = stream;
+ this.listener = listener;
}
@SuppressWarnings("unchecked")
@@ -154,6 +157,10 @@ private boolean ensureReady(StreamValue requiredValue) throws SpannerException {
ErrorCode.INTERNAL, "Invalid type metadata: " + e.getMessage(), e);
}
}
+ // collect the precommit token from each PartialResultSet
+ if (current.hasPrecommitToken()) {
+ listener.onPrecommitToken(current.getPrecommitToken());
+ }
if (current.hasStats()) {
statistics = current.getStats();
}
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java
index 7b9abc71a85..60c9d45d186 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java
@@ -69,7 +69,8 @@ static void throwIfTransactionsPending() {
}
}
- static TransactionOptions createReadWriteTransactionOptions(Options options) {
+ static TransactionOptions createReadWriteTransactionOptions(
+ Options options, ByteString previousTransactionId) {
TransactionOptions.Builder transactionOptions = TransactionOptions.newBuilder();
if (options.withExcludeTxnFromChangeStreams() == Boolean.TRUE) {
transactionOptions.setExcludeTxnFromChangeStreams(true);
@@ -78,6 +79,10 @@ static TransactionOptions createReadWriteTransactionOptions(Options options) {
if (options.withOptimisticLock() == Boolean.TRUE) {
readWrite.setReadLockMode(TransactionOptions.ReadWrite.ReadLockMode.OPTIMISTIC);
}
+ if (previousTransactionId != null
+ && previousTransactionId != com.google.protobuf.ByteString.EMPTY) {
+ readWrite.setMultiplexedSessionPreviousTransactionId(previousTransactionId);
+ }
transactionOptions.setReadWrite(readWrite);
return transactionOptions.build();
}
@@ -427,13 +432,17 @@ public void close() {
}
ApiFuture beginTransactionAsync(
- Options transactionOptions, boolean routeToLeader, Map channelHint) {
+ Options transactionOptions,
+ boolean routeToLeader,
+ Map channelHint,
+ ByteString previousTransactionId) {
final SettableApiFuture res = SettableApiFuture.create();
final ISpan span = tracer.spanBuilder(SpannerImpl.BEGIN_TRANSACTION);
final BeginTransactionRequest request =
BeginTransactionRequest.newBuilder()
.setSession(getName())
- .setOptions(createReadWriteTransactionOptions(transactionOptions))
+ .setOptions(
+ createReadWriteTransactionOptions(transactionOptions, previousTransactionId))
.build();
final ApiFuture requestFuture;
try (IScope ignore = tracer.withSpan(span)) {
@@ -469,11 +478,12 @@ ApiFuture beginTransactionAsync(
return res;
}
- TransactionContextImpl newTransaction(Options options) {
+ TransactionContextImpl newTransaction(Options options, ByteString previousTransactionId) {
return TransactionContextImpl.newBuilder()
.setSession(this)
.setOptions(options)
.setTransactionId(null)
+ .setPreviousTransactionId(previousTransactionId)
.setOptions(options)
.setTrackTransactionStarter(spanner.getOptions().isTrackTransactionStarter())
.setRpc(spanner.getRpc())
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java
index 5756ff64b89..d7ba20b2d1b 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java
@@ -1726,14 +1726,13 @@ private ApiTracerFactory getDefaultApiTracerFactory() {
private ApiTracerFactory createMetricsApiTracerFactory() {
OpenTelemetry openTelemetry =
this.builtInOpenTelemetryMetricsProvider.getOrCreateOpenTelemetry(
- getDefaultProjectId(), getCredentials());
+ this.getProjectId(), getCredentials());
return openTelemetry != null
? new MetricsTracerFactory(
new OpenTelemetryMetricsRecorder(openTelemetry, BuiltInMetricsConstant.METER_NAME),
builtInOpenTelemetryMetricsProvider.createClientAttributes(
- getDefaultProjectId(),
- "spanner-java/" + GaxProperties.getLibraryVersion(getClass())))
+ this.getProjectId(), "spanner-java/" + GaxProperties.getLibraryVersion(getClass())))
: null;
}
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java
index bbbc9aeb447..cafb27ba6b7 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java
@@ -20,6 +20,7 @@
import com.google.cloud.spanner.Options.TransactionOption;
import com.google.cloud.spanner.SessionImpl.SessionTransaction;
import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
/** Implementation of {@link TransactionManager}. */
final class TransactionManagerImpl implements TransactionManager, SessionTransaction {
@@ -53,7 +54,7 @@ public void setSpan(ISpan span) {
public TransactionContext begin() {
Preconditions.checkState(txn == null, "begin can only be called once");
try (IScope s = tracer.withSpan(span)) {
- txn = session.newTransaction(options);
+ txn = session.newTransaction(options, /* previousTransactionId = */ ByteString.EMPTY);
session.setActive(this);
txnState = TransactionState.STARTED;
return txn;
@@ -102,7 +103,18 @@ public TransactionContext resetForRetry() {
}
try (IScope s = tracer.withSpan(span)) {
boolean useInlinedBegin = txn.transactionId != null;
- txn = session.newTransaction(options);
+
+ // Determine the latest transactionId when using a multiplexed session.
+ ByteString multiplexedSessionPreviousTransactionId = ByteString.EMPTY;
+ if (session.getIsMultiplexed()) {
+ // Use the current transactionId if available, otherwise fallback to the previous aborted
+ // transactionId.
+ multiplexedSessionPreviousTransactionId =
+ txn.transactionId != null ? txn.transactionId : txn.getPreviousTransactionId();
+ }
+ txn =
+ session.newTransaction(
+ options, /* previousTransactionId = */ multiplexedSessionPreviousTransactionId);
if (!useInlinedBegin) {
txn.ensureTxn();
}
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java
index a7250e1ef79..92d9c50aa9e 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java
@@ -46,6 +46,7 @@
import com.google.spanner.v1.ExecuteBatchDmlResponse;
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.ExecuteSqlRequest.QueryMode;
+import com.google.spanner.v1.MultiplexedSessionPrecommitToken;
import com.google.spanner.v1.RequestOptions;
import com.google.spanner.v1.ResultSet;
import com.google.spanner.v1.ResultSetStats;
@@ -93,6 +94,9 @@ static class Builder extends AbstractReadContext.Builder finishedAsyncOperations = SettableApiFuture.create();
@@ -201,6 +215,8 @@ public void removeListener(Runnable listener) {
volatile ByteString transactionId;
+ final ByteString previousTransactionId;
+
private CommitResponse commitResponse;
private final Clock clock;
@@ -216,6 +232,7 @@ private TransactionContextImpl(Builder builder) {
this.channelHint =
getChannelHintOptions(
session.getOptions(), ThreadLocalRandom.current().nextLong(Long.MAX_VALUE));
+ this.previousTransactionId = builder.previousTransactionId;
}
@Override
@@ -246,6 +263,10 @@ private void decreaseAsyncOperations() {
}
}
+ ByteString getPreviousTransactionId() {
+ return this.previousTransactionId;
+ }
+
@Override
public void close() {
// Only mark the context as closed, but do not end the tracer span, as that is done by the
@@ -283,7 +304,8 @@ ApiFuture ensureTxnAsync() {
private void createTxnAsync(final SettableApiFuture res) {
span.addAnnotation("Creating Transaction");
final ApiFuture fut =
- session.beginTransactionAsync(options, isRouteToLeader(), getTransactionChannelHint());
+ session.beginTransactionAsync(
+ options, isRouteToLeader(), getTransactionChannelHint(), getPreviousTransactionId());
fut.addListener(
() -> {
try {
@@ -423,6 +445,10 @@ public void run() {
}
requestBuilder.setRequestOptions(requestOptionsBuilder.build());
}
+ if (session.getIsMultiplexed() && getLatestPrecommitToken() != null) {
+ // Set the precommit token in the CommitRequest for multiplexed sessions.
+ requestBuilder.setPrecommitToken(getLatestPrecommitToken());
+ }
final CommitRequest commitRequest = requestBuilder.build();
span.addAnnotation("Starting Commit");
final ApiFuture commitFuture;
@@ -558,7 +584,9 @@ TransactionSelector getTransactionSelector() {
}
if (tx == null) {
return TransactionSelector.newBuilder()
- .setBegin(SessionImpl.createReadWriteTransactionOptions(options))
+ .setBegin(
+ SessionImpl.createReadWriteTransactionOptions(
+ options, getPreviousTransactionId()))
.build();
} else {
// Wait for the transaction to come available. The tx.get() call will fail with an
@@ -625,6 +653,25 @@ public void onTransactionMetadata(Transaction transaction, boolean shouldInclude
}
}
+ /**
+ * In read-write transactions, the precommit token with the highest sequence number from this
+ * transaction attempt will be tracked and included in the
+ * [Commit][google.spanner.v1.Spanner.Commit] request for the transaction.
+ */
+ @Override
+ public void onPrecommitToken(MultiplexedSessionPrecommitToken token) {
+ if (token == null) {
+ return;
+ }
+ synchronized (precommitTokenLock) {
+ if (this.latestPrecommitToken == null
+ || token.getSeqNum() > this.latestPrecommitToken.getSeqNum()) {
+ this.latestPrecommitToken = token;
+ txnLogger.log(Level.FINE, "Updating precommit token to " + this.latestPrecommitToken);
+ }
+ }
+ }
+
@Nullable
String getTransactionTag() {
if (this.options.hasTag()) {
@@ -633,6 +680,13 @@ String getTransactionTag() {
return null;
}
+ @Nullable
+ MultiplexedSessionPrecommitToken getLatestPrecommitToken() {
+ synchronized (precommitTokenLock) {
+ return this.latestPrecommitToken;
+ }
+ }
+
@Override
public SpannerException onError(SpannerException e, boolean withBeginTransaction) {
e = super.onError(e, withBeginTransaction);
@@ -811,6 +865,9 @@ private ResultSet internalExecuteUpdate(
throw new IllegalArgumentException(
"DML response missing stats possibly due to non-DML statement as input");
}
+ if (resultSet.hasPrecommitToken()) {
+ onPrecommitToken(resultSet.getPrecommitToken());
+ }
return resultSet;
} catch (Throwable t) {
throw onError(
@@ -885,6 +942,9 @@ public ApiFuture executeUpdateAsync(Statement statement, UpdateOption... u
resultSet.get().getMetadata().getTransaction(),
builder.getTransaction().hasBegin());
}
+ if (resultSet.get().hasPrecommitToken()) {
+ onPrecommitToken(resultSet.get().getPrecommitToken());
+ }
} catch (Throwable e) {
// Ignore this error here as it is handled by the future that is returned by the
// executeUpdateAsync method.
@@ -940,6 +1000,10 @@ public long[] batchUpdate(Iterable statements, UpdateOption... update
}
}
+ if (response.hasPrecommitToken()) {
+ onPrecommitToken(response.getPrecommitToken());
+ }
+
// If one of the DML statements was aborted, we should throw an aborted exception.
// In all other cases, we should throw a BatchUpdateException.
if (response.getStatus().getCode() == Code.ABORTED_VALUE) {
@@ -1004,6 +1068,9 @@ public ApiFuture batchUpdateAsync(
builder.getTransaction().hasBegin());
}
}
+ if (batchDmlResponse.hasPrecommitToken()) {
+ onPrecommitToken(batchDmlResponse.getPrecommitToken());
+ }
// If one of the DML statements was aborted, we should throw an aborted exception.
// In all other cases, we should throw a BatchUpdateException.
if (batchDmlResponse.getStatus().getCode() == Code.ABORTED_VALUE) {
@@ -1079,7 +1146,7 @@ public TransactionRunner allowNestedTransaction() {
TransactionRunnerImpl(SessionImpl session, TransactionOption... options) {
this.session = session;
this.options = Options.fromTransactionOptions(options);
- this.txn = session.newTransaction(this.options);
+ this.txn = session.newTransaction(this.options, /* previousTransactionId = */ ByteString.EMPTY);
this.tracer = session.getTracer();
}
@@ -1118,7 +1185,19 @@ private T runInternal(final TransactionCallable txCallable) {
// Do not inline the BeginTransaction during a retry if the initial attempt did not
// actually start a transaction.
useInlinedBegin = txn.transactionId != null;
- txn = session.newTransaction(options);
+
+ // Determine the latest transactionId when using a multiplexed session.
+ ByteString multiplexedSessionPreviousTransactionId = ByteString.EMPTY;
+ if (session.getIsMultiplexed()) {
+ // Use the current transactionId if available, otherwise fallback to the previous
+ // transactionId.
+ multiplexedSessionPreviousTransactionId =
+ txn.transactionId != null ? txn.transactionId : txn.getPreviousTransactionId();
+ }
+
+ txn =
+ session.newTransaction(
+ options, /* previousTransactionId = */ multiplexedSessionPreviousTransactionId);
}
checkState(
isValid, "TransactionRunner has been invalidated by a new operation on the session");
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionOptions.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionOptions.java
index dcc4c663bb3..66b09c35afd 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionOptions.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionOptions.java
@@ -26,6 +26,7 @@
import static com.google.cloud.spanner.connection.ConnectionProperties.DATA_BOOST_ENABLED;
import static com.google.cloud.spanner.connection.ConnectionProperties.DIALECT;
import static com.google.cloud.spanner.connection.ConnectionProperties.ENABLE_API_TRACING;
+import static com.google.cloud.spanner.connection.ConnectionProperties.ENABLE_END_TO_END_TRACING;
import static com.google.cloud.spanner.connection.ConnectionProperties.ENABLE_EXTENDED_TRACING;
import static com.google.cloud.spanner.connection.ConnectionProperties.ENCODED_CREDENTIALS;
import static com.google.cloud.spanner.connection.ConnectionProperties.ENDPOINT;
@@ -249,6 +250,7 @@ public String[] getValidValues() {
static final int DEFAULT_MAX_PARTITIONED_PARALLELISM = 1;
static final Boolean DEFAULT_ENABLE_EXTENDED_TRACING = null;
static final Boolean DEFAULT_ENABLE_API_TRACING = null;
+ static final boolean DEFAULT_ENABLE_END_TO_END_TRACING = false;
static final boolean DEFAULT_AUTO_BATCH_DML = false;
static final long DEFAULT_AUTO_BATCH_DML_UPDATE_COUNT = 1L;
static final boolean DEFAULT_AUTO_BATCH_DML_UPDATE_COUNT_VERIFICATION = true;
@@ -335,6 +337,7 @@ public String[] getValidValues() {
public static final String ENABLE_EXTENDED_TRACING_PROPERTY_NAME = "enableExtendedTracing";
public static final String ENABLE_API_TRACING_PROPERTY_NAME = "enableApiTracing";
+ public static final String ENABLE_END_TO_END_TRACING_PROPERTY_NAME = "enableEndToEndTracing";
public static final String AUTO_BATCH_DML_PROPERTY_NAME = "auto_batch_dml";
public static final String AUTO_BATCH_DML_UPDATE_COUNT_PROPERTY_NAME =
@@ -537,7 +540,14 @@ static boolean isEnableTransactionalConnectionStateForPostgreSQL() {
+ "to get a detailed view of each RPC that is being executed by your application, "
+ "or if you want to debug potential latency problems caused by RPCs that are "
+ "being retried.",
- DEFAULT_ENABLE_API_TRACING))));
+ DEFAULT_ENABLE_API_TRACING),
+ ConnectionProperty.createBooleanProperty(
+ ENABLE_END_TO_END_TRACING_PROPERTY_NAME,
+ "Enable end-to-end tracing (true/false) to generate traces for both the time "
+ + "that is spent in the client, as well as time that is spent in the Spanner server. "
+ + "Server side traces can only go to Google Cloud Trace, so to see end to end traces, "
+ + "the application should configure an exporter that exports the traces to Google Cloud Trace.",
+ DEFAULT_ENABLE_END_TO_END_TRACING))));
private static final Set INTERNAL_PROPERTIES =
Collections.unmodifiableSet(
@@ -1205,6 +1215,11 @@ public boolean isRouteToLeader() {
return getInitialConnectionPropertyValue(ROUTE_TO_LEADER);
}
+ /** Whether end-to-end tracing is enabled. */
+ public boolean isEndToEndTracingEnabled() {
+ return getInitialConnectionPropertyValue(ENABLE_END_TO_END_TRACING);
+ }
+
/**
* The initial retryAbortsInternally value for connections created by this {@link
* ConnectionOptions}
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionProperties.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionProperties.java
index c62571c6c03..0ca9b7256e2 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionProperties.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionProperties.java
@@ -39,6 +39,7 @@
import static com.google.cloud.spanner.connection.ConnectionOptions.DEFAULT_DDL_IN_TRANSACTION_MODE;
import static com.google.cloud.spanner.connection.ConnectionOptions.DEFAULT_DELAY_TRANSACTION_START_UNTIL_FIRST_WRITE;
import static com.google.cloud.spanner.connection.ConnectionOptions.DEFAULT_ENABLE_API_TRACING;
+import static com.google.cloud.spanner.connection.ConnectionOptions.DEFAULT_ENABLE_END_TO_END_TRACING;
import static com.google.cloud.spanner.connection.ConnectionOptions.DEFAULT_ENABLE_EXTENDED_TRACING;
import static com.google.cloud.spanner.connection.ConnectionOptions.DEFAULT_ENDPOINT;
import static com.google.cloud.spanner.connection.ConnectionOptions.DEFAULT_KEEP_TRANSACTION_ALIVE;
@@ -65,6 +66,7 @@
import static com.google.cloud.spanner.connection.ConnectionOptions.DELAY_TRANSACTION_START_UNTIL_FIRST_WRITE_NAME;
import static com.google.cloud.spanner.connection.ConnectionOptions.DIALECT_PROPERTY_NAME;
import static com.google.cloud.spanner.connection.ConnectionOptions.ENABLE_API_TRACING_PROPERTY_NAME;
+import static com.google.cloud.spanner.connection.ConnectionOptions.ENABLE_END_TO_END_TRACING_PROPERTY_NAME;
import static com.google.cloud.spanner.connection.ConnectionOptions.ENABLE_EXTENDED_TRACING_PROPERTY_NAME;
import static com.google.cloud.spanner.connection.ConnectionOptions.ENCODED_CREDENTIALS_PROPERTY_NAME;
import static com.google.cloud.spanner.connection.ConnectionOptions.ENDPOINT_PROPERTY_NAME;
@@ -292,6 +294,16 @@ class ConnectionProperties {
DEFAULT_ENABLE_API_TRACING,
BooleanConverter.INSTANCE,
Context.STARTUP);
+ static final ConnectionProperty ENABLE_END_TO_END_TRACING =
+ create(
+ ENABLE_END_TO_END_TRACING_PROPERTY_NAME,
+ "Enable end-to-end tracing (true/false) to generate traces for both the time "
+ + "that is spent in the client, as well as time that is spent in the Spanner server. "
+ + "Server side traces can only go to Google Cloud Trace, so to see end to end traces, "
+ + "the application should configure an exporter that exports the traces to Google Cloud Trace.",
+ DEFAULT_ENABLE_END_TO_END_TRACING,
+ BooleanConverter.INSTANCE,
+ Context.STARTUP);
static final ConnectionProperty MIN_SESSIONS =
create(
MIN_SESSIONS_PROPERTY_NAME,
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/SpannerPool.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/SpannerPool.java
index 246d340b070..81246e41938 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/SpannerPool.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/SpannerPool.java
@@ -160,6 +160,7 @@ static class SpannerPoolKey {
private final OpenTelemetry openTelemetry;
private final Boolean enableExtendedTracing;
private final Boolean enableApiTracing;
+ private final boolean enableEndToEndTracing;
@VisibleForTesting
static SpannerPoolKey of(ConnectionOptions options) {
@@ -190,6 +191,7 @@ private SpannerPoolKey(ConnectionOptions options) throws IOException {
this.openTelemetry = options.getOpenTelemetry();
this.enableExtendedTracing = options.isEnableExtendedTracing();
this.enableApiTracing = options.isEnableApiTracing();
+ this.enableEndToEndTracing = options.isEndToEndTracingEnabled();
}
@Override
@@ -211,7 +213,8 @@ public boolean equals(Object o) {
this.useVirtualGrpcTransportThreads, other.useVirtualGrpcTransportThreads)
&& Objects.equals(this.openTelemetry, other.openTelemetry)
&& Objects.equals(this.enableExtendedTracing, other.enableExtendedTracing)
- && Objects.equals(this.enableApiTracing, other.enableApiTracing);
+ && Objects.equals(this.enableApiTracing, other.enableApiTracing)
+ && Objects.equals(this.enableEndToEndTracing, other.enableEndToEndTracing);
}
@Override
@@ -229,7 +232,8 @@ public int hashCode() {
this.useVirtualGrpcTransportThreads,
this.openTelemetry,
this.enableExtendedTracing,
- this.enableApiTracing);
+ this.enableApiTracing,
+ this.enableEndToEndTracing);
}
}
@@ -380,6 +384,9 @@ Spanner createSpanner(SpannerPoolKey key, ConnectionOptions options) {
if (!options.isRouteToLeader()) {
builder.disableLeaderAwareRouting();
}
+ if (options.isEndToEndTracingEnabled()) {
+ builder.setEnableEndToEndTracing(true);
+ }
if (key.usePlainText) {
// Credentials may not be sent over a plain text channel.
builder.setCredentials(NoCredentials.getInstance());
diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncTransactionManagerImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncTransactionManagerImplTest.java
index 08d22dd2d67..006a926e907 100644
--- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncTransactionManagerImplTest.java
+++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncTransactionManagerImplTest.java
@@ -16,12 +16,18 @@
package com.google.cloud.spanner;
+import static org.junit.Assert.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.clearInvocations;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import com.google.api.core.ApiFutures;
import com.google.cloud.Timestamp;
+import com.google.protobuf.ByteString;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Scope;
import org.junit.Test;
@@ -42,7 +48,7 @@ public void testCommitReturnsCommitStats() {
when(oTspan.makeCurrent()).thenReturn(mock(Scope.class));
try (AsyncTransactionManagerImpl manager =
new AsyncTransactionManagerImpl(session, span, Options.commitStats())) {
- when(session.newTransaction(Options.fromTransactionOptions(Options.commitStats())))
+ when(session.newTransaction(eq(Options.fromTransactionOptions(Options.commitStats())), any()))
.thenReturn(transaction);
when(transaction.ensureTxnAsync()).thenReturn(ApiFutures.immediateFuture(null));
Timestamp commitTimestamp = Timestamp.ofTimeMicroseconds(1);
@@ -54,4 +60,67 @@ public void testCommitReturnsCommitStats() {
verify(transaction).commitAsync();
}
}
+
+ @Test
+ public void testRetryUsesPreviousTransactionIdOnMultiplexedSession() {
+ // Set up mock transaction IDs
+ final ByteString mockTransactionId = ByteString.copyFromUtf8("mockTransactionId");
+ final ByteString mockPreviousTransactionId =
+ ByteString.copyFromUtf8("mockPreviousTransactionId");
+
+ Span oTspan = mock(Span.class);
+ ISpan span = new OpenTelemetrySpan(oTspan);
+ when(oTspan.makeCurrent()).thenReturn(mock(Scope.class));
+ // Mark the session as multiplexed.
+ when(session.getIsMultiplexed()).thenReturn(true);
+
+ // Initialize a mock transaction with transactionId = null, previousTransactionId = null.
+ transaction = mock(TransactionRunnerImpl.TransactionContextImpl.class);
+ when(transaction.ensureTxnAsync()).thenReturn(ApiFutures.immediateFuture(null));
+ when(session.newTransaction(eq(Options.fromTransactionOptions(Options.commitStats())), any()))
+ .thenReturn(transaction);
+
+ // Simulate an ABORTED error being thrown when `commitAsync()` is called.
+ doThrow(SpannerExceptionFactory.newSpannerException(ErrorCode.ABORTED, ""))
+ .when(transaction)
+ .commitAsync();
+
+ try (AsyncTransactionManagerImpl manager =
+ new AsyncTransactionManagerImpl(session, span, Options.commitStats())) {
+ manager.beginAsync();
+
+ // Verify that for the first transaction attempt, the `previousTransactionId` is
+ // ByteString.EMPTY.
+ // This is because no transaction has been previously aborted at this point.
+ verify(session)
+ .newTransaction(Options.fromTransactionOptions(Options.commitStats()), ByteString.EMPTY);
+ assertThrows(AbortedException.class, manager::commitAsync);
+ clearInvocations(session);
+
+ // Mock the transaction object to contain transactionID=null and
+ // previousTransactionId=mockPreviousTransactionId
+ when(transaction.getPreviousTransactionId()).thenReturn(mockPreviousTransactionId);
+ manager.resetForRetryAsync();
+ // Verify that in the first retry attempt, the `previousTransactionId`
+ // (mockPreviousTransactionId) is passed to the new transaction.
+ // This allows Spanner to retry the transaction using the ID of the aborted transaction.
+ verify(session)
+ .newTransaction(
+ Options.fromTransactionOptions(Options.commitStats()), mockPreviousTransactionId);
+ assertThrows(AbortedException.class, manager::commitAsync);
+ clearInvocations(session);
+
+ // Mock the transaction object to contain transactionID=mockTransactionId and
+ // previousTransactionId=mockPreviousTransactionId and transactionID = null
+ transaction.transactionId = mockTransactionId;
+ manager.resetForRetryAsync();
+ // Verify that the latest `transactionId` (mockTransactionId) is used in the retry.
+ // This ensures the retry logic is working as expected with the latest transaction ID.
+ verify(session)
+ .newTransaction(Options.fromTransactionOptions(Options.commitStats()), mockTransactionId);
+
+ when(transaction.rollbackAsync()).thenReturn(ApiFutures.immediateFuture(null));
+ manager.closeAsync();
+ }
+ }
}
diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/GrpcResultSetTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/GrpcResultSetTest.java
index 265da756b99..bad39e33dba 100644
--- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/GrpcResultSetTest.java
+++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/GrpcResultSetTest.java
@@ -36,6 +36,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.ByteString;
import com.google.spanner.v1.ExecuteSqlRequest.QueryMode;
+import com.google.spanner.v1.MultiplexedSessionPrecommitToken;
import com.google.spanner.v1.PartialResultSet;
import com.google.spanner.v1.QueryPlan;
import com.google.spanner.v1.ResultSetMetadata;
@@ -78,6 +79,9 @@ public SpannerException onError(SpannerException e, boolean withBeginTransaction
@Override
public void onDone(boolean withBeginTransaction) {}
+
+ @Override
+ public void onPrecommitToken(MultiplexedSessionPrecommitToken token) {}
}
@Before
diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java
index fd5dfa4055c..193b35c4fc6 100644
--- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java
+++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java
@@ -54,6 +54,7 @@
import com.google.spanner.v1.GetSessionRequest;
import com.google.spanner.v1.ListSessionsRequest;
import com.google.spanner.v1.ListSessionsResponse;
+import com.google.spanner.v1.MultiplexedSessionPrecommitToken;
import com.google.spanner.v1.PartialResultSet;
import com.google.spanner.v1.Partition;
import com.google.spanner.v1.PartitionOptions;
@@ -197,10 +198,15 @@ private static class PartialResultSetsIterator implements Iterator transactionCounters = new ConcurrentHashMap<>();
private ConcurrentMap> partitionTokens = new ConcurrentHashMap<>();
private ConcurrentMap transactionLastUsed = new ConcurrentHashMap<>();
+
+ // Stores the latest sequence number needed for the precommit token.
+ // The transaction entry is created only if the transaction is read-write and executed on a
+ // multiplexed session.
+ private static ConcurrentMap transactionSequenceNo =
+ new ConcurrentHashMap<>();
private int maxNumSessionsInOneBatch = 100;
private int maxTotalSessions = Integer.MAX_VALUE;
private Iterable batchWriteResult = new ArrayList<>();
@@ -1020,7 +1035,11 @@ public void executeSql(ExecuteSqlRequest request, StreamObserver resp
throw result.getException();
case RESULT_SET:
returnResultSet(
- result.getResultSet(), transactionId, request.getTransaction(), responseObserver);
+ result.getResultSet(),
+ transactionId,
+ request.getTransaction(),
+ responseObserver,
+ session);
break;
case UPDATE_COUNT:
if (isPartitionedDmlTransaction(transactionId)) {
@@ -1033,7 +1052,7 @@ public void executeSql(ExecuteSqlRequest request, StreamObserver resp
.build())
.build());
} else {
- responseObserver.onNext(
+ ResultSet.Builder resultSetBuilder =
ResultSet.newBuilder()
.setStats(
ResultSetStats.newBuilder()
@@ -1045,8 +1064,11 @@ public void executeSql(ExecuteSqlRequest request, StreamObserver resp
ignoreNextInlineBeginRequest.getAndSet(false)
? Transaction.getDefaultInstance()
: Transaction.newBuilder().setId(transactionId).build())
- .build())
- .build());
+ .build());
+ if (session.getMultiplexed() && isReadWriteTransaction(transactionId)) {
+ resultSetBuilder.setPrecommitToken(getResultSetPrecommitToken(transactionId));
+ }
+ responseObserver.onNext(resultSetBuilder.build());
}
break;
default:
@@ -1064,7 +1086,8 @@ private void returnResultSet(
ResultSet resultSet,
ByteString transactionId,
TransactionSelector transactionSelector,
- StreamObserver responseObserver) {
+ StreamObserver responseObserver,
+ Session session) {
ResultSetMetadata metadata = resultSet.getMetadata();
if (transactionId != null) {
metadata =
@@ -1079,7 +1102,12 @@ private void returnResultSet(
Transaction transaction = getTemporaryTransactionOrNull(transactionSelector);
metadata = metadata.toBuilder().setTransaction(transaction).build();
}
- resultSet = resultSet.toBuilder().setMetadata(metadata).build();
+ ResultSet.Builder resultSetBuilder = resultSet.toBuilder();
+ resultSetBuilder.setMetadata(metadata);
+ if (session.getMultiplexed() && isReadWriteTransaction(transactionId)) {
+ resultSetBuilder.setPrecommitToken(getResultSetPrecommitToken(transactionId));
+ }
+ resultSet = resultSetBuilder.build();
responseObserver.onNext(resultSet);
}
@@ -1174,6 +1202,9 @@ public void executeBatchDml(
.build());
}
builder.setStatus(status);
+ if (session.getMultiplexed() && isReadWriteTransaction(transactionId)) {
+ builder.setPrecommitToken(getExecuteBatchDmlResponsePrecommitToken(transactionId));
+ }
responseObserver.onNext(builder.build());
responseObserver.onCompleted();
} catch (StatusRuntimeException e) {
@@ -1242,7 +1273,8 @@ public void executeStreamingSql(
transactionId,
request.getTransaction(),
responseObserver,
- getExecuteStreamingSqlExecutionTime());
+ getExecuteStreamingSqlExecutionTime(),
+ session.getMultiplexed());
break;
case UPDATE_COUNT:
if (isPartitioned) {
@@ -1629,7 +1661,7 @@ public void read(final ReadRequest request, StreamObserver responseOb
cols);
StatementResult res = getResult(statement);
returnResultSet(
- res.getResultSet(), transactionId, request.getTransaction(), responseObserver);
+ res.getResultSet(), transactionId, request.getTransaction(), responseObserver, session);
responseObserver.onCompleted();
} catch (StatusRuntimeException e) {
responseObserver.onError(e);
@@ -1687,7 +1719,8 @@ public void streamingRead(
transactionId,
request.getTransaction(),
responseObserver,
- getStreamingReadExecutionTime());
+ getStreamingReadExecutionTime(),
+ session.getMultiplexed());
} catch (StatusRuntimeException e) {
responseObserver.onError(e);
} catch (Throwable t) {
@@ -1700,7 +1733,8 @@ private void returnPartialResultSet(
ByteString transactionId,
TransactionSelector transactionSelector,
StreamObserver responseObserver,
- SimulatedExecutionTime executionTime)
+ SimulatedExecutionTime executionTime,
+ boolean isMultiplexedSession)
throws Exception {
ResultSetMetadata metadata = resultSet.getMetadata();
if (transactionId == null) {
@@ -1717,7 +1751,11 @@ private void returnPartialResultSet(
.build();
}
resultSet = resultSet.toBuilder().setMetadata(metadata).build();
- PartialResultSetsIterator iterator = new PartialResultSetsIterator(resultSet);
+ PartialResultSetsIterator iterator =
+ new PartialResultSetsIterator(
+ resultSet,
+ isMultiplexedSession && isReadWriteTransaction(transactionId),
+ transactionId);
long index = 0L;
while (iterator.hasNext()) {
SimulatedExecutionTime.checkStreamException(
@@ -2051,6 +2089,7 @@ private void commitTransaction(ByteString transactionId) {
transactions.remove(transactionId);
isPartitionedDmlTransaction.remove(transactionId);
transactionLastUsed.remove(transactionId);
+ transactionSequenceNo.remove(transactionId);
}
@Override
@@ -2082,6 +2121,7 @@ void rollbackTransaction(ByteString transactionId) {
transactions.remove(transactionId);
isPartitionedDmlTransaction.remove(transactionId);
transactionLastUsed.remove(transactionId);
+ transactionSequenceNo.remove(transactionId);
}
void markAbortedTransaction(ByteString transactionId) {
@@ -2089,6 +2129,7 @@ void markAbortedTransaction(ByteString transactionId) {
transactions.remove(transactionId);
isPartitionedDmlTransaction.remove(transactionId);
transactionLastUsed.remove(transactionId);
+ transactionSequenceNo.remove(transactionId);
}
@Override
@@ -2293,6 +2334,7 @@ public void reset() {
transactionCounters = new ConcurrentHashMap<>();
partitionTokens = new ConcurrentHashMap<>();
transactionLastUsed = new ConcurrentHashMap<>();
+ transactionSequenceNo = new ConcurrentHashMap<>();
numSessionsCreated.set(0);
stickyGlobalExceptions = false;
@@ -2464,4 +2506,30 @@ Session getSession(String name) {
}
return null;
}
+
+ static MultiplexedSessionPrecommitToken getResultSetPrecommitToken(ByteString transactionId) {
+ return getPrecommitToken("ResultSetPrecommitToken", transactionId);
+ }
+
+ static MultiplexedSessionPrecommitToken getPartialResultSetPrecommitToken(
+ ByteString transactionId) {
+ return getPrecommitToken("PartialResultSetPrecommitToken", transactionId);
+ }
+
+ static MultiplexedSessionPrecommitToken getExecuteBatchDmlResponsePrecommitToken(
+ ByteString transactionId) {
+ return getPrecommitToken("ExecuteBatchDmlResponsePrecommitToken", transactionId);
+ }
+
+ static MultiplexedSessionPrecommitToken getPrecommitToken(
+ String value, ByteString transactionId) {
+ transactionSequenceNo.putIfAbsent(transactionId, new AtomicInteger(0));
+
+ // Generates an incrementing sequence number
+ int seqNum = transactionSequenceNo.get(transactionId).incrementAndGet();
+ return MultiplexedSessionPrecommitToken.newBuilder()
+ .setPrecommitToken(ByteString.copyFromUtf8(value))
+ .setSeqNum(seqNum)
+ .build();
+ }
}
diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java
index 2e412537882..c7d7b697d64 100644
--- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java
+++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java
@@ -16,6 +16,7 @@
package com.google.cloud.spanner;
+import static com.google.cloud.spanner.MockSpannerTestUtil.INVALID_UPDATE_STATEMENT;
import static com.google.cloud.spanner.MockSpannerTestUtil.UPDATE_COUNT;
import static com.google.cloud.spanner.MockSpannerTestUtil.UPDATE_STATEMENT;
import static com.google.common.truth.Truth.assertThat;
@@ -36,11 +37,14 @@
import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime;
import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult;
import com.google.cloud.spanner.Options.RpcPriority;
+import com.google.cloud.spanner.TransactionRunnerImpl.TransactionContextImpl;
import com.google.cloud.spanner.connection.RandomResultSetGenerator;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
+import com.google.spanner.v1.BeginTransactionRequest;
import com.google.spanner.v1.CommitRequest;
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.RequestOptions.Priority;
@@ -54,6 +58,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -70,6 +75,10 @@ public static void setupResults() {
mockSpanner.putStatementResults(
StatementResult.query(STATEMENT, new RandomResultSetGenerator(1).generate()));
mockSpanner.putStatementResult(StatementResult.update(UPDATE_STATEMENT, UPDATE_COUNT));
+ mockSpanner.putStatementResult(
+ StatementResult.exception(
+ INVALID_UPDATE_STATEMENT,
+ Status.INVALID_ARGUMENT.withDescription("invalid statement").asRuntimeException()));
}
@Before
@@ -745,6 +754,335 @@ public void testAsyncRunnerIsNonBlockingWithMultiplexedSession() throws Exceptio
assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get());
}
+ @Test
+ public void testAbortedReadWriteTxnUsesPreviousTxnIdOnRetryWithInlineBegin() {
+ DatabaseClientImpl client =
+ (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
+ // Force the Commit RPC to return Aborted the first time it is called. The exception is cleared
+ // after the first call, so the retry should succeed.
+ mockSpanner.setCommitExecutionTime(
+ SimulatedExecutionTime.ofException(
+ mockSpanner.createAbortedException(ByteString.copyFromUtf8("test"))));
+ TransactionRunner runner = client.readWriteTransaction();
+ AtomicReference validTransactionId = new AtomicReference<>();
+ runner.run(
+ transaction -> {
+ try (ResultSet resultSet = transaction.executeQuery(STATEMENT)) {
+ while (resultSet.next()) {}
+ }
+
+ TransactionContextImpl impl = (TransactionContextImpl) transaction;
+ if (validTransactionId.get() == null) {
+ // Track the first not-null transactionId. This transaction gets ABORTED during commit
+ // operation and gets retried.
+ validTransactionId.set(impl.transactionId);
+ }
+ return null;
+ });
+
+ List executeSqlRequests =
+ mockSpanner.getRequestsOfType(ExecuteSqlRequest.class);
+ assertEquals(2, executeSqlRequests.size());
+
+ // Verify the requests are executed using multiplexed sessions
+ for (ExecuteSqlRequest request : executeSqlRequests) {
+ assertTrue(mockSpanner.getSession(request.getSession()).getMultiplexed());
+ }
+
+ // Verify that the first request uses inline begin, and the previous transaction ID is set to
+ // ByteString.EMPTY
+ assertTrue(executeSqlRequests.get(0).hasTransaction());
+ assertTrue(executeSqlRequests.get(0).getTransaction().hasBegin());
+ assertTrue(executeSqlRequests.get(0).getTransaction().getBegin().hasReadWrite());
+ assertNotNull(
+ executeSqlRequests
+ .get(0)
+ .getTransaction()
+ .getBegin()
+ .getReadWrite()
+ .getMultiplexedSessionPreviousTransactionId());
+ assertEquals(
+ ByteString.EMPTY,
+ executeSqlRequests
+ .get(0)
+ .getTransaction()
+ .getBegin()
+ .getReadWrite()
+ .getMultiplexedSessionPreviousTransactionId());
+
+ // Verify that the second request uses inline begin, and the previous transaction ID is set
+ // appropriately
+ assertTrue(executeSqlRequests.get(1).hasTransaction());
+ assertTrue(executeSqlRequests.get(1).getTransaction().hasBegin());
+ assertTrue(executeSqlRequests.get(1).getTransaction().getBegin().hasReadWrite());
+ assertNotNull(
+ executeSqlRequests
+ .get(1)
+ .getTransaction()
+ .getBegin()
+ .getReadWrite()
+ .getMultiplexedSessionPreviousTransactionId());
+ assertNotEquals(
+ ByteString.EMPTY,
+ executeSqlRequests
+ .get(1)
+ .getTransaction()
+ .getBegin()
+ .getReadWrite()
+ .getMultiplexedSessionPreviousTransactionId());
+ assertEquals(
+ validTransactionId.get(),
+ executeSqlRequests
+ .get(1)
+ .getTransaction()
+ .getBegin()
+ .getReadWrite()
+ .getMultiplexedSessionPreviousTransactionId());
+ }
+
+ @Test
+ public void testAbortedReadWriteTxnUsesPreviousTxnIdOnRetryWithExplicitBegin() {
+ DatabaseClientImpl client =
+ (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
+ // Force the Commit RPC to return Aborted the first time it is called. The exception is cleared
+ // after the first call, so the retry should succeed.
+ mockSpanner.setCommitExecutionTime(
+ SimulatedExecutionTime.ofException(
+ mockSpanner.createAbortedException(ByteString.copyFromUtf8("test"))));
+ TransactionRunner runner = client.readWriteTransaction();
+ AtomicReference validTransactionId = new AtomicReference<>();
+ Long updateCount =
+ runner.run(
+ transaction -> {
+ // This update statement carries the BeginTransaction, but fails. This will
+ // cause the entire transaction to be retried with an explicit
+ // BeginTransaction RPC to ensure all statements in the transaction are
+ // actually executed against the same transaction.
+ TransactionContextImpl impl = (TransactionContextImpl) transaction;
+ if (validTransactionId.get() == null) {
+ // Track the first not-null transactionId. This transaction gets ABORTED during
+ // commit operation and gets retried.
+ validTransactionId.set(impl.transactionId);
+ }
+ SpannerException e =
+ assertThrows(
+ SpannerException.class,
+ () -> transaction.executeUpdate(INVALID_UPDATE_STATEMENT));
+ assertEquals(ErrorCode.INVALID_ARGUMENT, e.getErrorCode());
+ return transaction.executeUpdate(UPDATE_STATEMENT);
+ });
+
+ assertThat(updateCount).isEqualTo(1L);
+ List beginTransactionRequests =
+ mockSpanner.getRequestsOfType(BeginTransactionRequest.class);
+ assertEquals(2, beginTransactionRequests.size());
+
+ // Verify the requests are executed using multiplexed sessions
+ for (BeginTransactionRequest request : beginTransactionRequests) {
+ assertTrue(mockSpanner.getSession(request.getSession()).getMultiplexed());
+ }
+
+ // Verify that explicit begin transaction is called during retry, and the previous transaction
+ // ID is set to ByteString.EMPTY
+ assertTrue(beginTransactionRequests.get(0).hasOptions());
+ assertTrue(beginTransactionRequests.get(0).getOptions().hasReadWrite());
+ assertNotNull(
+ beginTransactionRequests
+ .get(0)
+ .getOptions()
+ .getReadWrite()
+ .getMultiplexedSessionPreviousTransactionId());
+ assertEquals(
+ ByteString.EMPTY,
+ beginTransactionRequests
+ .get(0)
+ .getOptions()
+ .getReadWrite()
+ .getMultiplexedSessionPreviousTransactionId());
+
+ // The previous transaction with id (txn1) fails during commit operation with ABORTED error.
+ // Verify that explicit begin transaction is called during retry, and the previous transaction
+ // ID is not ByteString.EMPTY (should be set to txn1)
+ assertTrue(beginTransactionRequests.get(1).hasOptions());
+ assertTrue(beginTransactionRequests.get(1).getOptions().hasReadWrite());
+ assertNotNull(
+ beginTransactionRequests
+ .get(1)
+ .getOptions()
+ .getReadWrite()
+ .getMultiplexedSessionPreviousTransactionId());
+ assertNotEquals(
+ ByteString.EMPTY,
+ beginTransactionRequests
+ .get(1)
+ .getOptions()
+ .getReadWrite()
+ .getMultiplexedSessionPreviousTransactionId());
+ assertEquals(
+ validTransactionId.get(),
+ beginTransactionRequests
+ .get(1)
+ .getOptions()
+ .getReadWrite()
+ .getMultiplexedSessionPreviousTransactionId());
+ }
+
+ @Test
+ public void testPrecommitTokenForResultSet() {
+ // This test verifies that the precommit token received from the ResultSet is properly tracked
+ // and set in the CommitRequest.
+ DatabaseClientImpl client =
+ (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
+
+ Long count =
+ client
+ .readWriteTransaction()
+ .run(
+ transaction -> {
+ long res = transaction.executeUpdate(UPDATE_STATEMENT);
+
+ // Verify that the latest precommit token is tracked in the transaction context.
+ TransactionContextImpl impl = (TransactionContextImpl) transaction;
+ assertNotNull(impl.getLatestPrecommitToken());
+ assertEquals(
+ ByteString.copyFromUtf8("ResultSetPrecommitToken"),
+ impl.getLatestPrecommitToken().getPrecommitToken());
+ return res;
+ });
+
+ assertNotNull(count);
+ assertEquals(1, count.longValue());
+
+ // Verify that the latest precommit token is set in the CommitRequest
+ List commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class);
+ assertEquals(1, commitRequests.size());
+ assertTrue(mockSpanner.getSession(commitRequests.get(0).getSession()).getMultiplexed());
+ assertNotNull(commitRequests.get(0).getPrecommitToken());
+ assertEquals(
+ ByteString.copyFromUtf8("ResultSetPrecommitToken"),
+ commitRequests.get(0).getPrecommitToken().getPrecommitToken());
+ }
+
+ @Test
+ public void testPrecommitTokenForExecuteBatchDmlResponse() {
+ // This test verifies that the precommit token received from the ExecuteBatchDmlResponse is
+ // properly tracked and set in the CommitRequest.
+ DatabaseClientImpl client =
+ (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
+
+ long[] count =
+ client
+ .readWriteTransaction()
+ .run(
+ transaction -> {
+ long[] res = transaction.batchUpdate(Lists.newArrayList(UPDATE_STATEMENT));
+
+ // Verify that the latest precommit token is tracked in the transaction context.
+ TransactionContextImpl impl = (TransactionContextImpl) transaction;
+ assertNotNull(impl.getLatestPrecommitToken());
+ assertEquals(
+ ByteString.copyFromUtf8("ExecuteBatchDmlResponsePrecommitToken"),
+ impl.getLatestPrecommitToken().getPrecommitToken());
+ return res;
+ });
+
+ assertNotNull(count);
+ assertEquals(1, count.length);
+
+ // Verify that the latest precommit token is set in the CommitRequest
+ List commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class);
+ assertEquals(1, commitRequests.size());
+ assertTrue(mockSpanner.getSession(commitRequests.get(0).getSession()).getMultiplexed());
+ assertNotNull(commitRequests.get(0).getPrecommitToken());
+ assertEquals(
+ ByteString.copyFromUtf8("ExecuteBatchDmlResponsePrecommitToken"),
+ commitRequests.get(0).getPrecommitToken().getPrecommitToken());
+ }
+
+ @Test
+ public void testPrecommitTokenForPartialResultSet() {
+ // This test verifies that the precommit token received from the PartialResultSet is properly
+ // tracked and set in the CommitRequest.
+ DatabaseClientImpl client =
+ (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
+
+ client
+ .readWriteTransaction()
+ .run(
+ transaction -> {
+ ResultSet resultSet = transaction.executeQuery(STATEMENT);
+ //noinspection StatementWithEmptyBody
+ while (resultSet.next()) {
+ // ignore
+ }
+
+ // Verify that the latest precommit token is tracked in the transaction context.
+ TransactionContextImpl impl = (TransactionContextImpl) transaction;
+ assertNotNull(impl.getLatestPrecommitToken());
+ assertEquals(
+ ByteString.copyFromUtf8("PartialResultSetPrecommitToken"),
+ impl.getLatestPrecommitToken().getPrecommitToken());
+ return null;
+ });
+
+ // Verify that the latest precommit token is set in the CommitRequest
+ List commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class);
+ assertEquals(1, commitRequests.size());
+ assertTrue(mockSpanner.getSession(commitRequests.get(0).getSession()).getMultiplexed());
+ assertNotNull(commitRequests.get(0).getPrecommitToken());
+ assertEquals(
+ ByteString.copyFromUtf8("PartialResultSetPrecommitToken"),
+ commitRequests.get(0).getPrecommitToken().getPrecommitToken());
+ }
+
+ @Test
+ public void testTxnTracksPrecommitTokenWithLatestSeqNo() {
+ // This test ensures that the read-write transaction tracks the precommit token with the
+ // highest sequence number and sets it in the CommitRequest.
+ DatabaseClientImpl client =
+ (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
+
+ client
+ .readWriteTransaction()
+ .run(
+ transaction -> {
+ // Returns a ResultSet containing the precommit token (ResultSetPrecommitToken)
+ transaction.executeUpdate(UPDATE_STATEMENT);
+
+ // Returns a PartialResultSet containing the precommit token
+ // (PartialResultSetPrecommitToken)
+ ResultSet resultSet = transaction.executeQuery(STATEMENT);
+ //noinspection StatementWithEmptyBody
+ while (resultSet.next()) {
+ // ignore
+ }
+
+ // Returns an ExecuteBatchDmlResponse containing the precommit token
+ // (ExecuteBatchDmlResponsePrecommitToken).
+ // Since this is the last request received by the mock Spanner, it should be the most
+ // recent precommit token tracked by the transaction context.
+ transaction.batchUpdate(Lists.newArrayList(UPDATE_STATEMENT));
+
+ // Verify that the latest precommit token with highest sequence number is tracked in
+ // the transaction context.
+ TransactionContextImpl impl = (TransactionContextImpl) transaction;
+ assertNotNull(impl.getLatestPrecommitToken());
+ assertEquals(
+ ByteString.copyFromUtf8("ExecuteBatchDmlResponsePrecommitToken"),
+ impl.getLatestPrecommitToken().getPrecommitToken());
+ return null;
+ });
+
+ // Verify that the latest precommit token is set in the CommitRequest
+ List commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class);
+ assertEquals(1, commitRequests.size());
+ assertTrue(mockSpanner.getSession(commitRequests.get(0).getSession()).getMultiplexed());
+ assertNotNull(commitRequests.get(0).getPrecommitToken());
+ assertEquals(
+ ByteString.copyFromUtf8("ExecuteBatchDmlResponsePrecommitToken"),
+ commitRequests.get(0).getPrecommitToken().getPrecommitToken());
+ }
+
private void waitForSessionToBeReplaced(DatabaseClientImpl client) {
assertNotNull(client.multiplexedSessionDatabaseClient);
SessionReference sessionReference =
diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetryBuiltInMetricsTracerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetryBuiltInMetricsTracerTest.java
index d9586acc956..8e3d0986343 100644
--- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetryBuiltInMetricsTracerTest.java
+++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetryBuiltInMetricsTracerTest.java
@@ -94,6 +94,7 @@ public static void setup() {
Attributes.builder()
.put(BuiltInMetricsConstant.PROJECT_ID_KEY, "test-project")
.put(BuiltInMetricsConstant.INSTANCE_CONFIG_ID_KEY, "unknown")
+ .put(BuiltInMetricsConstant.DIRECT_PATH_ENABLED_KEY, "false")
.put(
BuiltInMetricsConstant.LOCATION_ID_KEY,
BuiltInOpenTelemetryMetricsProvider.detectClientLocation())
diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ReadFormatTestRunner.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ReadFormatTestRunner.java
index c973b7e471e..2a399e6f486 100644
--- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ReadFormatTestRunner.java
+++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ReadFormatTestRunner.java
@@ -24,6 +24,7 @@
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.common.io.Resources;
import com.google.protobuf.util.JsonFormat;
+import com.google.spanner.v1.MultiplexedSessionPrecommitToken;
import com.google.spanner.v1.PartialResultSet;
import com.google.spanner.v1.Transaction;
import java.math.BigDecimal;
@@ -56,6 +57,9 @@ public SpannerException onError(SpannerException e, boolean withBeginTransaction
@Override
public void onDone(boolean withBeginTransaction) {}
+
+ @Override
+ public void onPrecommitToken(MultiplexedSessionPrecommitToken token) {}
}
public ReadFormatTestRunner(Class> clazz) throws InitializationError {
diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ResultSetsHelper.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ResultSetsHelper.java
index fc494c6f3ff..404973336ba 100644
--- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ResultSetsHelper.java
+++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ResultSetsHelper.java
@@ -19,6 +19,7 @@
import com.google.cloud.spanner.AbstractResultSet.CloseableIterator;
import com.google.cloud.spanner.AbstractResultSet.Listener;
import com.google.protobuf.ListValue;
+import com.google.spanner.v1.MultiplexedSessionPrecommitToken;
import com.google.spanner.v1.PartialResultSet;
import com.google.spanner.v1.Transaction;
import java.util.Iterator;
@@ -82,6 +83,9 @@ public SpannerException onError(SpannerException e, boolean withBeginTransaction
@Override
public void onDone(boolean withBeginTransaction) {}
+
+ @Override
+ public void onPrecommitToken(MultiplexedSessionPrecommitToken token) {}
});
}
}
diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java
index 998678e4296..c3e8d887ded 100644
--- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java
+++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java
@@ -1495,9 +1495,10 @@ public void testSessionNotFoundReadWriteTransaction() {
.build();
when(closedSession.asyncClose())
.thenReturn(ApiFutures.immediateFuture(Empty.getDefaultInstance()));
- when(closedSession.newTransaction(Options.fromTransactionOptions()))
+ when(closedSession.newTransaction(eq(Options.fromTransactionOptions()), any()))
.thenReturn(closedTransactionContext);
- when(closedSession.beginTransactionAsync(any(), eq(true), any())).thenThrow(sessionNotFound);
+ when(closedSession.beginTransactionAsync(any(), eq(true), any(), any()))
+ .thenThrow(sessionNotFound);
when(closedSession.getTracer()).thenReturn(tracer);
TransactionRunnerImpl closedTransactionRunner = new TransactionRunnerImpl(closedSession);
closedTransactionRunner.setSpan(span);
@@ -1510,9 +1511,9 @@ public void testSessionNotFoundReadWriteTransaction() {
when(openSession.getName())
.thenReturn("projects/dummy/instances/dummy/database/dummy/sessions/session-open");
final TransactionContextImpl openTransactionContext = mock(TransactionContextImpl.class);
- when(openSession.newTransaction(Options.fromTransactionOptions()))
+ when(openSession.newTransaction(eq(Options.fromTransactionOptions()), any()))
.thenReturn(openTransactionContext);
- when(openSession.beginTransactionAsync(any(), eq(true), any()))
+ when(openSession.beginTransactionAsync(any(), eq(true), any(), any()))
.thenReturn(ApiFutures.immediateFuture(ByteString.copyFromUtf8("open-txn")));
when(openSession.getTracer()).thenReturn(tracer);
TransactionRunnerImpl openTransactionRunner = new TransactionRunnerImpl(openSession);
diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerImplTest.java
index c3fcf1c7480..10b13125152 100644
--- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerImplTest.java
+++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerImplTest.java
@@ -20,7 +20,9 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
@@ -98,7 +100,7 @@ public void setUp() {
@Test
public void beginCalledTwiceFails() {
- when(session.newTransaction(Options.fromTransactionOptions())).thenReturn(txn);
+ when(session.newTransaction(eq(Options.fromTransactionOptions()), any())).thenReturn(txn);
assertThat(manager.begin()).isEqualTo(txn);
assertThat(manager.getState()).isEqualTo(TransactionState.STARTED);
IllegalStateException e = assertThrows(IllegalStateException.class, () -> manager.begin());
@@ -126,7 +128,7 @@ public void resetBeforeBeginFails() {
@Test
public void transactionRolledBackOnClose() {
- when(session.newTransaction(Options.fromTransactionOptions())).thenReturn(txn);
+ when(session.newTransaction(eq(Options.fromTransactionOptions()), any())).thenReturn(txn);
when(txn.isAborted()).thenReturn(false);
manager.begin();
manager.close();
@@ -135,7 +137,7 @@ public void transactionRolledBackOnClose() {
@Test
public void commitSucceeds() {
- when(session.newTransaction(Options.fromTransactionOptions())).thenReturn(txn);
+ when(session.newTransaction(eq(Options.fromTransactionOptions()), any())).thenReturn(txn);
Timestamp commitTimestamp = Timestamp.ofTimeMicroseconds(1);
CommitResponse response = new CommitResponse(commitTimestamp);
when(txn.getCommitResponse()).thenReturn(response);
@@ -147,7 +149,7 @@ public void commitSucceeds() {
@Test
public void resetAfterSuccessfulCommitFails() {
- when(session.newTransaction(Options.fromTransactionOptions())).thenReturn(txn);
+ when(session.newTransaction(eq(Options.fromTransactionOptions()), any())).thenReturn(txn);
manager.begin();
manager.commit();
IllegalStateException e =
@@ -157,21 +159,21 @@ public void resetAfterSuccessfulCommitFails() {
@Test
public void resetAfterAbortSucceeds() {
- when(session.newTransaction(Options.fromTransactionOptions())).thenReturn(txn);
+ when(session.newTransaction(eq(Options.fromTransactionOptions()), any())).thenReturn(txn);
manager.begin();
doThrow(SpannerExceptionFactory.newSpannerException(ErrorCode.ABORTED, "")).when(txn).commit();
assertThrows(AbortedException.class, () -> manager.commit());
assertEquals(TransactionState.ABORTED, manager.getState());
txn = Mockito.mock(TransactionRunnerImpl.TransactionContextImpl.class);
- when(session.newTransaction(Options.fromTransactionOptions())).thenReturn(txn);
+ when(session.newTransaction(eq(Options.fromTransactionOptions()), any())).thenReturn(txn);
assertThat(manager.resetForRetry()).isEqualTo(txn);
assertThat(manager.getState()).isEqualTo(TransactionState.STARTED);
}
@Test
public void resetAfterErrorFails() {
- when(session.newTransaction(Options.fromTransactionOptions())).thenReturn(txn);
+ when(session.newTransaction(eq(Options.fromTransactionOptions()), any())).thenReturn(txn);
manager.begin();
doThrow(SpannerExceptionFactory.newSpannerException(ErrorCode.UNKNOWN, "")).when(txn).commit();
SpannerException e = assertThrows(SpannerException.class, () -> manager.commit());
@@ -184,7 +186,7 @@ public void resetAfterErrorFails() {
@Test
public void rollbackAfterCommitFails() {
- when(session.newTransaction(Options.fromTransactionOptions())).thenReturn(txn);
+ when(session.newTransaction(eq(Options.fromTransactionOptions()), any())).thenReturn(txn);
manager.begin();
manager.commit();
IllegalStateException e = assertThrows(IllegalStateException.class, () -> manager.rollback());
@@ -193,7 +195,7 @@ public void rollbackAfterCommitFails() {
@Test
public void commitAfterRollbackFails() {
- when(session.newTransaction(Options.fromTransactionOptions())).thenReturn(txn);
+ when(session.newTransaction(eq(Options.fromTransactionOptions()), any())).thenReturn(txn);
manager.begin();
manager.rollback();
IllegalStateException e = assertThrows(IllegalStateException.class, () -> manager.commit());
@@ -363,4 +365,61 @@ public void inlineBegin() {
assertThat(transactionsStarted.get()).isEqualTo(1);
}
}
+
+ // This test ensures that when a transaction is aborted in a multiplexed session,
+ // the transaction ID of the aborted transaction is saved during the retry when a new transaction
+ // is created.
+ @Test
+ public void storePreviousTxnIdOnAbortForMultiplexedSession() {
+ txn = Mockito.mock(TransactionRunnerImpl.TransactionContextImpl.class);
+ final ByteString mockTransactionId = ByteString.copyFromUtf8("mockTransactionId");
+ txn.transactionId = mockTransactionId;
+ when(session.newTransaction(Options.fromTransactionOptions(), ByteString.EMPTY))
+ .thenReturn(txn);
+ manager.begin();
+ // Verify that for the first transaction attempt, the `previousTransactionId` is
+ // ByteString.EMPTY.
+ // This is because no transaction has been previously aborted at this point.
+ verify(session).newTransaction(Options.fromTransactionOptions(), ByteString.EMPTY);
+ doThrow(SpannerExceptionFactory.newSpannerException(ErrorCode.ABORTED, "")).when(txn).commit();
+ assertThrows(AbortedException.class, () -> manager.commit());
+
+ txn = Mockito.mock(TransactionRunnerImpl.TransactionContextImpl.class);
+ when(txn.getPreviousTransactionId()).thenReturn(mockTransactionId);
+ when(session.newTransaction(Options.fromTransactionOptions(), mockTransactionId))
+ .thenReturn(txn);
+ when(session.getIsMultiplexed()).thenReturn(true);
+ assertThat(manager.resetForRetry()).isEqualTo(txn);
+ // Verify that in the first retry attempt, the `previousTransactionId` is passed to the new
+ // transaction.
+ // This allows Spanner to retry the transaction using the ID of the aborted transaction.
+ verify(session).newTransaction(Options.fromTransactionOptions(), mockTransactionId);
+ }
+
+ // This test ensures that when a transaction is aborted in a regular session,
+ // the transaction ID of the aborted transaction is not saved during the retry when a new
+ // transaction is created.
+ @Test
+ public void skipTxnIdStorageOnAbortForRegularSession() {
+ txn = Mockito.mock(TransactionRunnerImpl.TransactionContextImpl.class);
+ final ByteString mockTransactionId = ByteString.copyFromUtf8("mockTransactionId");
+ txn.transactionId = mockTransactionId;
+ when(session.newTransaction(Options.fromTransactionOptions(), ByteString.EMPTY))
+ .thenReturn(txn);
+ manager.begin();
+ verify(session).newTransaction(Options.fromTransactionOptions(), ByteString.EMPTY);
+ doThrow(SpannerExceptionFactory.newSpannerException(ErrorCode.ABORTED, "")).when(txn).commit();
+ assertThrows(AbortedException.class, () -> manager.commit());
+ clearInvocations(session);
+
+ txn = Mockito.mock(TransactionRunnerImpl.TransactionContextImpl.class);
+ when(session.newTransaction(Options.fromTransactionOptions(), ByteString.EMPTY))
+ .thenReturn(txn);
+ when(session.getIsMultiplexed()).thenReturn(false);
+ assertThat(manager.resetForRetry()).isEqualTo(txn);
+ // Verify that in the first retry attempt, the `previousTransactionId` is not passed to the new
+ // transaction
+ // in case of regular sessions.
+ verify(session).newTransaction(Options.fromTransactionOptions(), ByteString.EMPTY);
+ }
}
diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java
index c647bb3642a..1fd6817ea96 100644
--- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java
+++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java
@@ -20,6 +20,7 @@
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.eq;
@@ -117,7 +118,7 @@ public void setUp() {
tracer = new TraceWrapper(Tracing.getTracer(), OpenTelemetry.noop().getTracer(""), false);
firstRun = true;
when(session.getErrorHandler()).thenReturn(DefaultErrorHandler.INSTANCE);
- when(session.newTransaction(Options.fromTransactionOptions())).thenReturn(txn);
+ when(session.newTransaction(eq(Options.fromTransactionOptions()), any())).thenReturn(txn);
when(session.getTracer()).thenReturn(tracer);
when(rpc.executeQuery(Mockito.any(ExecuteSqlRequest.class), Mockito.anyMap(), eq(true)))
.thenAnswer(
@@ -343,7 +344,8 @@ private long[] batchDmlException(int status) {
.setTracer(session.getTracer())
.setSpan(session.getTracer().getCurrentSpan())
.build();
- when(session.newTransaction(Options.fromTransactionOptions())).thenReturn(transaction);
+ when(session.newTransaction(eq(Options.fromTransactionOptions()), any()))
+ .thenReturn(transaction);
when(session.getName()).thenReturn(SessionId.of("p", "i", "d", "test").getName());
TransactionRunnerImpl runner = new TransactionRunnerImpl(session);
runner.setSpan(span);
diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ConnectionOptionsTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ConnectionOptionsTest.java
index 94a44579acf..f826ec08dfc 100644
--- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ConnectionOptionsTest.java
+++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ConnectionOptionsTest.java
@@ -322,6 +322,27 @@ public void testBuildWithRouteToLeader() {
assertTrue(options.isRouteToLeader());
}
+ @Test
+ public void testBuildWithEndToEndTracingEnabled() {
+ final String BASE_URI =
+ "cloudspanner:/projects/test-project-123/instances/test-instance-123/databases/test-database-123";
+ ConnectionOptions.Builder builder = ConnectionOptions.newBuilder();
+ builder.setUri(BASE_URI + "?enableEndToEndTracing=true");
+ builder.setCredentialsUrl(FILE_TEST_PATH);
+ ConnectionOptions options = builder.build();
+ assertEquals(options.getHost(), DEFAULT_HOST);
+ assertEquals(options.getProjectId(), TEST_PROJECT);
+ assertEquals(options.getInstanceId(), TEST_INSTANCE);
+ assertEquals(options.getDatabaseName(), TEST_DATABASE);
+ assertTrue(options.isEndToEndTracingEnabled());
+
+ // Test for default behavior for enableEndToEndTracing property.
+ builder = ConnectionOptions.newBuilder().setUri(BASE_URI);
+ builder.setCredentialsUrl(FILE_TEST_PATH);
+ options = builder.build();
+ assertFalse(options.isEndToEndTracingEnabled());
+ }
+
@Test
public void testBuildWithAutoConfigEmulatorAndHost() {
ConnectionOptions.Builder builder = ConnectionOptions.newBuilder();
diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/SpannerPoolTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/SpannerPoolTest.java
index 19d49139635..fea0b8aa6cf 100644
--- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/SpannerPoolTest.java
+++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/SpannerPoolTest.java
@@ -563,6 +563,58 @@ public void testEnableApiTracing() {
.build()));
}
+ @Test
+ public void testEnableEndToEndTracing() {
+ SpannerPoolKey keyWithoutApiTracingConfig =
+ SpannerPoolKey.of(
+ ConnectionOptions.newBuilder()
+ .setUri("cloudspanner:/projects/p/instances/i/databases/d")
+ .setCredentials(NoCredentials.getInstance())
+ .build());
+ SpannerPoolKey keyWithApiTracingEnabled =
+ SpannerPoolKey.of(
+ ConnectionOptions.newBuilder()
+ .setUri(
+ "cloudspanner:/projects/p/instances/i/databases/d?enableEndToEndTracing=true")
+ .setCredentials(NoCredentials.getInstance())
+ .build());
+ SpannerPoolKey keyWithApiTracingDisabled =
+ SpannerPoolKey.of(
+ ConnectionOptions.newBuilder()
+ .setUri(
+ "cloudspanner:/projects/p/instances/i/databases/d?enableEndToEndTracing=false")
+ .setCredentials(NoCredentials.getInstance())
+ .build());
+
+ assertNotEquals(keyWithoutApiTracingConfig, keyWithApiTracingEnabled);
+ assertEquals(keyWithoutApiTracingConfig, keyWithApiTracingDisabled);
+ assertNotEquals(keyWithApiTracingEnabled, keyWithApiTracingDisabled);
+
+ assertEquals(
+ keyWithApiTracingEnabled,
+ SpannerPoolKey.of(
+ ConnectionOptions.newBuilder()
+ .setUri(
+ "cloudspanner:/projects/p/instances/i/databases/d?enableEndToEndTracing=true")
+ .setCredentials(NoCredentials.getInstance())
+ .build()));
+ assertEquals(
+ keyWithApiTracingDisabled,
+ SpannerPoolKey.of(
+ ConnectionOptions.newBuilder()
+ .setUri(
+ "cloudspanner:/projects/p/instances/i/databases/d?enableEndToEndTracing=false")
+ .setCredentials(NoCredentials.getInstance())
+ .build()));
+ assertEquals(
+ keyWithoutApiTracingConfig,
+ SpannerPoolKey.of(
+ ConnectionOptions.newBuilder()
+ .setUri("cloudspanner:/projects/p/instances/i/databases/d")
+ .setCredentials(NoCredentials.getInstance())
+ .build()));
+ }
+
@Test
public void testOpenTelemetry() {
SpannerPool pool = createSubjectAndMocks();