Skip to content

Commit

Permalink
Use options pattern to configure backfill (#630)
Browse files Browse the repository at this point in the history
Use the options pattern to configure backfill.

Backfill will take some new options as part of #583, so refactor the
`backfill` package to use the options pattern, given that `Start`
already takes many arguments.

Part of #583
  • Loading branch information
andrew-farries authored Jan 29, 2025
1 parent c237001 commit c4cd645
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 8 deletions.
36 changes: 29 additions & 7 deletions pkg/backfill/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,36 +16,58 @@ import (
"github.com/xataio/pgroll/pkg/schema"
)

type Backfill struct {
conn db.DB
batchSize int
batchDelay time.Duration
callbacks []CallbackFn
}

type CallbackFn func(done int64, total int64)

// New creates a new backfill operation with the given options. The backfill is
// not started until `Start` is invoked.
func New(conn db.DB, opts ...OptionFn) *Backfill {
b := &Backfill{
conn: conn,
batchSize: 1000,
}

for _, opt := range opts {
opt(b)
}

return b
}

// Start updates all rows in the given table, in batches, using the
// following algorithm:
// 1. Get the primary key column for the table.
// 2. Get the first batch of rows from the table, ordered by the primary key.
// 3. Update each row in the batch, setting the value of the primary key column to itself.
// 4. Repeat steps 2 and 3 until no more rows are returned.
func Start(ctx context.Context, conn db.DB, table *schema.Table, batchSize int, batchDelay time.Duration, cbs ...CallbackFn) error {
func (bf *Backfill) Start(ctx context.Context, table *schema.Table) error {
// get the backfill column
identityColumns := getIdentityColumns(table)
if identityColumns == nil {
return NotPossibleError{Table: table.Name}
}

total, err := getRowCount(ctx, conn, table.Name)
total, err := getRowCount(ctx, bf.conn, table.Name)
if err != nil {
return fmt.Errorf("get row count for %q: %w", table.Name, err)
}

// Create a batcher for the table.
b := newBatcher(table, batchSize)
b := newBatcher(table, bf.batchSize)

// Update each batch of rows, invoking callbacks for each one.
for batch := 0; ; batch++ {
for _, cb := range cbs {
cb(int64(batch*batchSize), total)
for _, cb := range bf.callbacks {
cb(int64(batch*bf.batchSize), total)
}

if err := b.updateBatch(ctx, conn); err != nil {
if err := b.updateBatch(ctx, bf.conn); err != nil {
if errors.Is(err, sql.ErrNoRows) {
break
}
Expand All @@ -55,7 +77,7 @@ func Start(ctx context.Context, conn db.DB, table *schema.Table, batchSize int,
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(batchDelay):
case <-time.After(bf.batchDelay):
}
}

Expand Down
29 changes: 29 additions & 0 deletions pkg/backfill/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// SPDX-License-Identifier: Apache-2.0

package backfill

import "time"

type OptionFn func(*Backfill)

// WithBatchSize sets the batch size for the backfill operation.
func WithBatchSize(batchSize int) OptionFn {
return func(o *Backfill) {
o.batchSize = batchSize
}
}

// WithBatchDelay sets the delay between batches for the backfill operation.
func WithBatchDelay(delay time.Duration) OptionFn {
return func(o *Backfill) {
o.batchDelay = delay
}
}

// WithCallbacks sets the callbacks for the backfill operation.
// Callbacks are invoked after each batch is processed.
func WithCallbacks(cbs ...CallbackFn) OptionFn {
return func(o *Backfill) {
o.callbacks = cbs
}
}
7 changes: 6 additions & 1 deletion pkg/roll/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,8 +310,13 @@ func (m *Roll) ensureView(ctx context.Context, version, name string, table *sche
}

func (m *Roll) performBackfills(ctx context.Context, tables []*schema.Table, cbs ...backfill.CallbackFn) error {
bf := backfill.New(m.pgConn,
backfill.WithBatchSize(m.backfillBatchSize),
backfill.WithBatchDelay(m.backfillBatchDelay),
backfill.WithCallbacks(cbs...))

for _, table := range tables {
if err := backfill.Start(ctx, m.pgConn, table, m.backfillBatchSize, m.backfillBatchDelay, cbs...); err != nil {
if err := bf.Start(ctx, table); err != nil {
errRollback := m.Rollback(ctx)

return errors.Join(
Expand Down

0 comments on commit c4cd645

Please sign in to comment.