Skip to content

Commit

Permalink
chore(spanner): fix conection it tests
Browse files Browse the repository at this point in the history
  • Loading branch information
harshachinta committed Jan 23, 2025
1 parent c9842c9 commit 84045ef
Show file tree
Hide file tree
Showing 8 changed files with 74 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,10 @@ public boolean doCreateDefaultTestTable() {
@Before
public void clearTable() {
try (ITConnection connection = createConnection()) {
connection.execute(Statement.of("SELECT 1"));
connection.commit();
connection.bufferedWrite(Mutation.delete("TEST", KeySet.all()));
//connection.executeUpdate(Statement.of("DELETE FROM TEST WHERE TRUE"));
get(connection.commitAsync());
}
}
Expand Down Expand Up @@ -221,6 +224,7 @@ public void testCommitAborted() {
AbortInterceptor interceptor = new AbortInterceptor(0);
try (ITConnection connection =
createConnection(interceptor, new CountTransactionRetryListener())) {
interceptor.setUsingMultiplexedSession(isMultiplexedSessionsEnabledForRW(connection.getSpanner()));
ApiFuture<Long> count = getTestRecordCountAsync(connection);
// do an insert
ApiFuture<Long> updateCount =
Expand Down Expand Up @@ -253,6 +257,7 @@ public void testInsertAborted() {
AbortInterceptor interceptor = new AbortInterceptor(0);
try (ITConnection connection =
createConnection(interceptor, new CountTransactionRetryListener())) {
interceptor.setUsingMultiplexedSession(isMultiplexedSessionsEnabledForRW(connection.getSpanner()));
ApiFuture<Long> count = getTestRecordCountAsync(connection);
// indicate that the next statement should abort
interceptor.setProbability(1.0);
Expand All @@ -276,6 +281,7 @@ public void testUpdateAborted() {
AbortInterceptor interceptor = new AbortInterceptor(0);
try (ITConnection connection =
createConnection(interceptor, new CountTransactionRetryListener())) {
interceptor.setUsingMultiplexedSession(isMultiplexedSessionsEnabledForRW(connection.getSpanner()));
ApiFuture<Long> count = getTestRecordCountAsync(connection);
// insert a test record
connection.executeUpdateAsync(
Expand Down Expand Up @@ -309,6 +315,7 @@ public void testQueryAborted() {
AbortInterceptor interceptor = new AbortInterceptor(0);
try (ITConnection connection =
createConnection(interceptor, new CountTransactionRetryListener())) {
interceptor.setUsingMultiplexedSession(isMultiplexedSessionsEnabledForRW(connection.getSpanner()));
// insert a test record
connection.executeUpdateAsync(
Statement.of("INSERT INTO TEST (ID, NAME) VALUES (1, 'test aborted')"));
Expand Down Expand Up @@ -359,6 +366,7 @@ public void testNextCallAborted() {
AbortInterceptor interceptor = new AbortInterceptor(0);
try (ITConnection connection =
createConnection(interceptor, new CountTransactionRetryListener())) {
interceptor.setUsingMultiplexedSession(isMultiplexedSessionsEnabledForRW(connection.getSpanner()));
// insert two test records
connection.executeUpdateAsync(
Statement.of("INSERT INTO TEST (ID, NAME) VALUES (1, 'test 1')"));
Expand Down Expand Up @@ -392,6 +400,7 @@ public void testMultipleAborts() {
AbortInterceptor interceptor = new AbortInterceptor(0);
try (ITConnection connection =
createConnection(interceptor, new CountTransactionRetryListener())) {
interceptor.setUsingMultiplexedSession(isMultiplexedSessionsEnabledForRW(connection.getSpanner()));
ApiFuture<Long> count = getTestRecordCountAsync(connection);
// do three inserts which all will abort and retry
interceptor.setProbability(1.0);
Expand Down Expand Up @@ -428,6 +437,7 @@ public void testAbortAfterSelect() {
AbortInterceptor interceptor = new AbortInterceptor(0);
try (ITConnection connection =
createConnection(interceptor, new CountTransactionRetryListener())) {
interceptor.setUsingMultiplexedSession(isMultiplexedSessionsEnabledForRW(connection.getSpanner()));
ApiFuture<Long> count = getTestRecordCountAsync(connection);
// insert a test record
connection.executeUpdateAsync(
Expand Down Expand Up @@ -504,6 +514,7 @@ public void testAbortWithResultSetHalfway() {
AbortInterceptor interceptor = new AbortInterceptor(0);
try (ITConnection connection =
createConnection(interceptor, new CountTransactionRetryListener())) {
interceptor.setUsingMultiplexedSession(isMultiplexedSessionsEnabledForRW(connection.getSpanner()));
// insert two test records
connection.executeUpdateAsync(
Statement.of("INSERT INTO TEST (ID, NAME) VALUES (1, 'test 1')"));
Expand Down Expand Up @@ -539,6 +550,7 @@ public void testAbortWithResultSetFullyConsumed() {
AbortInterceptor interceptor = new AbortInterceptor(0);
try (ITConnection connection =
createConnection(interceptor, new CountTransactionRetryListener())) {
interceptor.setUsingMultiplexedSession(isMultiplexedSessionsEnabledForRW(connection.getSpanner()));
// insert two test records
connection.executeUpdateAsync(
Statement.of("INSERT INTO TEST (ID, NAME) VALUES (1, 'test 1')"));
Expand Down Expand Up @@ -581,6 +593,7 @@ public void testAbortWithConcurrentInsert() {
AbortInterceptor interceptor = new AbortInterceptor(0);
try (ITConnection connection =
createConnection(interceptor, new CountTransactionRetryListener())) {
interceptor.setUsingMultiplexedSession(isMultiplexedSessionsEnabledForRW(connection.getSpanner()));
// insert two test records
connection.executeUpdateAsync(
Statement.of("INSERT INTO TEST (ID, NAME) VALUES (1, 'test 1')"));
Expand Down Expand Up @@ -632,6 +645,7 @@ public void testAbortWithConcurrentDelete() {
AbortInterceptor interceptor = new AbortInterceptor(0);
// first insert two test records
try (ITConnection connection = createConnection()) {
interceptor.setUsingMultiplexedSession(isMultiplexedSessionsEnabledForRW(connection.getSpanner()));
connection.executeUpdateAsync(
Statement.of("INSERT INTO TEST (ID, NAME) VALUES (1, 'test 1')"));
connection.executeUpdateAsync(
Expand All @@ -641,6 +655,7 @@ public void testAbortWithConcurrentDelete() {
// open a new connection and select the two test records
try (ITConnection connection =
createConnection(interceptor, new CountTransactionRetryListener())) {
interceptor.setUsingMultiplexedSession(isMultiplexedSessionsEnabledForRW(connection.getSpanner()));
// select the test records and consume the entire result set
try (AsyncResultSet rs =
connection.executeQueryAsync(Statement.of("SELECT * FROM TEST ORDER BY ID"))) {
Expand Down Expand Up @@ -694,6 +709,7 @@ public void testAbortWithConcurrentUpdate() {
// open a new connection and select the two test records
try (ITConnection connection =
createConnection(interceptor, new CountTransactionRetryListener())) {
interceptor.setUsingMultiplexedSession(isMultiplexedSessionsEnabledForRW(connection.getSpanner()));
// select the test records and consume the entire result set
try (AsyncResultSet rs =
connection.executeQueryAsync(Statement.of("SELECT * FROM TEST ORDER BY ID"))) {
Expand Down Expand Up @@ -744,6 +760,7 @@ public void testAbortWithUnseenConcurrentInsert() throws InterruptedException {
AbortInterceptor interceptor = new AbortInterceptor(0);
try (ITConnection connection =
createConnection(interceptor, new CountTransactionRetryListener())) {
interceptor.setUsingMultiplexedSession(isMultiplexedSessionsEnabledForRW(connection.getSpanner()));
// insert three test records
connection.executeUpdateAsync(
Statement.of("INSERT INTO TEST (ID, NAME) VALUES (1, 'test 1')"));
Expand Down Expand Up @@ -833,6 +850,7 @@ public void testRetryLargeResultSet() {
final long UPDATED_RECORDS = 1000L;
AbortInterceptor interceptor = new AbortInterceptor(0);
try (ITConnection connection = createConnection()) {
interceptor.setUsingMultiplexedSession(isMultiplexedSessionsEnabledForRW(connection.getSpanner()));
// insert test records
for (int i = 0; i < NUMBER_OF_TEST_RECORDS; i++) {
connection.bufferedWrite(
Expand All @@ -845,6 +863,7 @@ public void testRetryLargeResultSet() {
}
try (ITConnection connection =
createConnection(interceptor, new CountTransactionRetryListener())) {
interceptor.setUsingMultiplexedSession(isMultiplexedSessionsEnabledForRW(connection.getSpanner()));
// select the test records and iterate over them
try (AsyncResultSet rs =
connection.executeQueryAsync(Statement.of("SELECT * FROM TEST ORDER BY ID"))) {
Expand All @@ -867,6 +886,7 @@ public void testRetryLargeResultSet() {
// Wait until the entire result set has been consumed.
get(finished);
}
interceptor.setUsingMultiplexedSession(isMultiplexedSessionsEnabledForRW(connection.getSpanner()));
// Do an update that will abort and retry.
interceptor.setProbability(1.0);
interceptor.setOnlyInjectOnce(true);
Expand Down Expand Up @@ -898,6 +918,7 @@ public void testRetryHighAbortRate() {
AbortInterceptor interceptor = new AbortInterceptor(0.25D);
try (ITConnection connection =
createConnection(interceptor, new CountTransactionRetryListener())) {
interceptor.setUsingMultiplexedSession(isMultiplexedSessionsEnabledForRW(connection.getSpanner()));
// insert test records
for (int i = 0; i < NUMBER_OF_TEST_RECORDS; i++) {
connection.bufferedWrite(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeFalse;

import com.google.cloud.spanner.BackupInfo.State;
import com.google.cloud.spanner.KeySet;
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.ParallelIntegrationTest;
Expand Down Expand Up @@ -51,7 +52,10 @@ public boolean doCreateDefaultTestTable() {
@Before
public void clearTestData() {
try (ITConnection connection = createConnection()) {
connection.execute(Statement.of("SELECT 1"));
connection.commit();
connection.bufferedWrite(Mutation.delete("TEST", KeySet.all()));
//connection.executeUpdate(Statement.of("DELETE FROM TEST WHERE TRUE"));
connection.commit();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ public boolean doCreateDefaultTestTable() {
@Before
public void setupTestData() {
try (ITConnection connection = createConnection()) {
connection.execute(Statement.of("SELECT 1"));
connection.commit();
connection.bufferedWrite(Mutation.delete("TEST", KeySet.all()));
connection.commit();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.google.cloud.spanner.Statement;
import com.google.cloud.spanner.connection.Connection;
import com.google.cloud.spanner.connection.ITAbstractSpannerTest;
import com.google.common.collect.ImmutableList;
import java.util.Arrays;
import java.util.Collections;
import org.junit.Before;
Expand Down Expand Up @@ -64,12 +65,19 @@ public void createTestTable() {
connection.runBatch();
}
}
try {
Thread.sleep(3000);
} catch (Exception e) {
System.out.println(e);
}
}

@Test
public void testExplainStatement() {
assumeFalse("Emulator does not support PostgreSQL Dialect", isUsingEmulator());
try (ITConnection connection = createConnection()) {
connection.execute(Statement.of("SELECT 1"));
connection.commit();
connection.bufferedWrite(
Arrays.asList(
Mutation.newInsertBuilder("TEST").set("ID").to(3L).set("NAME").to("TEST-3").build(),
Expand All @@ -89,6 +97,8 @@ public void testExplainStatement() {
public void testExplainAnalyzeStatement() {
assumeFalse("Emulator does not support PostgreSQL Dialect", isUsingEmulator());
try (ITConnection connection = createConnection()) {
connection.execute(Statement.of("SELECT 1"));
connection.commit();
connection.bufferedWrite(
Arrays.asList(
Mutation.newInsertBuilder("TEST").set("ID").to(1L).set("NAME").to("TEST-1").build(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public boolean doCreateDefaultTestTable() {
@Before
public void clearTestData() {
try (ITConnection connection = createConnection()) {
connection.execute(Statement.of("SELECT 1"));
connection.commit();
connection.bufferedWrite(Mutation.delete("TEST", KeySet.all()));
connection.commit();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public void test02_RunAbortedTest() {
long numberOfSongs = 0L;
AbortInterceptor interceptor = new AbortInterceptor(0.0D);
try (ITConnection connection = createConnection(interceptor)) {
interceptor.setUsingMultiplexedSession(isMultiplexedSessionsEnabledForRW(connection.getSpanner()));
connection.setAutocommit(false);
connection.setRetryAbortsInternally(true);
// Read all data from the different music tables in the transaction
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ public void testSqlScript() throws Exception {
public void testDoAllowBufferedWriteInReadWriteTransaction() {
try (ITConnection connection = createConnection()) {
assertThat(connection.isAutocommit(), is(false));
connection.execute(Statement.of("SELECT 1"));
connection.commit();
connection.bufferedWrite(
Mutation.newInsertBuilder("TEST").set("ID").to(1L).set("NAME").to("TEST").build());
connection.commit();
Expand Down
Loading

0 comments on commit 84045ef

Please sign in to comment.