Skip to content

Commit

Permalink
Update WithRetryableTransaction method
Browse files Browse the repository at this point in the history
Change the method signature to accept `sql.TxOptions` as the second
argument. This allows transactions to run at isolation levels other than
the default `READ COMMITTED` level.
  • Loading branch information
andrew-farries committed Feb 3, 2025
1 parent e16e921 commit 12abfb0
Show file tree
Hide file tree
Showing 4 changed files with 7 additions and 7 deletions.
2 changes: 1 addition & 1 deletion pkg/backfill/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ type batcher struct {
}

func (b *batcher) updateBatch(ctx context.Context, conn db.DB) error {
return conn.WithRetryableTransaction(ctx, func(ctx context.Context, tx *sql.Tx) error {
return conn.WithRetryableTransaction(ctx, nil, func(ctx context.Context, tx *sql.Tx) error {
// Build the query to update the next batch of rows
sql, err := templates.BuildSQL(b.BatchConfig)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions pkg/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ const (
type DB interface {
ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
WithRetryableTransaction(ctx context.Context, f func(context.Context, *sql.Tx) error) error
WithRetryableTransaction(ctx context.Context, opts *sql.TxOptions, f func(context.Context, *sql.Tx) error) error
RawConn() *sql.DB
Close() error
}
Expand Down Expand Up @@ -77,11 +77,11 @@ func (db *RDB) QueryContext(ctx context.Context, query string, args ...interface
}

// WithRetryableTransaction runs `f` in a transaction, retrying on lock_timeout errors.
func (db *RDB) WithRetryableTransaction(ctx context.Context, f func(context.Context, *sql.Tx) error) error {
func (db *RDB) WithRetryableTransaction(ctx context.Context, opts *sql.TxOptions, f func(context.Context, *sql.Tx) error) error {
b := backoff.New(maxBackoffDuration, backoffInterval)

for {
tx, err := db.DB.BeginTx(ctx, nil)
tx, err := db.DB.BeginTx(ctx, opts)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/db/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func TestWithRetryableTransaction(t *testing.T) {

// run a transaction that should retry until the lock is released
rdb := &db.RDB{DB: conn}
err := rdb.WithRetryableTransaction(ctx, func(ctx context.Context, tx *sql.Tx) error {
err := rdb.WithRetryableTransaction(ctx, nil, func(ctx context.Context, tx *sql.Tx) error {
return tx.QueryRowContext(ctx, "SELECT 1 FROM test").Err()
})
require.NoError(t, err)
Expand All @@ -149,7 +149,7 @@ func TestWithRetryableTransactionWhenContextCancelled(t *testing.T) {
// Cancel the context before the lock times out
go time.AfterFunc(500*time.Millisecond, cancel)

err := rdb.WithRetryableTransaction(ctx, func(ctx context.Context, tx *sql.Tx) error {
err := rdb.WithRetryableTransaction(ctx, nil, func(ctx context.Context, tx *sql.Tx) error {
return tx.QueryRowContext(ctx, "SELECT 1 FROM test").Err()
})
require.Errorf(t, err, "context canceled")
Expand Down
2 changes: 1 addition & 1 deletion pkg/db/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func (db *FakeDB) QueryContext(ctx context.Context, query string, args ...interf
return nil, nil
}

func (db *FakeDB) WithRetryableTransaction(ctx context.Context, f func(context.Context, *sql.Tx) error) error {
func (db *FakeDB) WithRetryableTransaction(ctx context.Context, opts *sql.TxOptions, f func(context.Context, *sql.Tx) error) error {
return nil
}

Expand Down

0 comments on commit 12abfb0

Please sign in to comment.