Skip to content

Commit

Permalink
Backfill only rows present at backfill start
Browse files Browse the repository at this point in the history
Change the backfill algorithm to only backfill rows that were present at
the start of the backfill process. Rows inserted or updated after
backfill start will be backfilled by the already-installed `up` trigger
and do not need to be backfilled by the backfill process (although doing
so is safe from a correctness perspective).

Avoiding backfilling rows that were inserted or updated after the
backfill start ensures that the backfill process is guaranteed to
terminate, even if a large number of rows are inserted or updated during
the backfill process.

The new algorithm works as follows:
* Create a 'batch table' in the `pgroll` schema. The batch table is used
  to store the primary key values of each batch of rows to be updated
  during the backfill process. The table holds at most `batchSize` rows
  at a time and is `TRUNCATE`d at the start of each batch.
* Begin a `REPEATABLE READ` transaction and take a transaction snapshot.
  This transaction remains open for the duration of the backfill so that
  other transactions can use the snapshot.
* For each batch:
  1. The primary key values for each batch of rows to be updated is
     `INSERT INTO` a table. The transaction that does the `INSERT INTO`
     uses the snapshot taken in step 1 so that only rows present at the
     start of the backfill are visible.
  2. The batch of rows is updated in the table being backfilled by setting
     their primary keys to themselves (a no-op update). This update causes
     any `ON UPDATE` trigger to fire for the affected rows.

The 'batch table' is necesary as a temporary store of the primary key
values of the rows to be updated because the per-batch transaction that
selects the rows to be updated runs in a `REPEATABLE READ` transaction
(by necessity to use the transaction snapshot). Trying to update the
selected batch of rows in the same transaction would fail with
serialization errors in the case where a row in the batch had been
updated by a transaction committed after the snapshot was taken. Such
serialization errors can safely be ignored, as any rows updated after
the snapshot was taken will already have been backfilled by the `up`
trigger. In order to avoid the serialization errors therefore, the batch
of rows to be updated is written to the 'batch table' from where the
batch can be `UPDATE`d from a `READ COMMITTED` transaction that can not
encounter serialization errors.

The largest drawback of this approach is that it requires holding a
transaction open during the backfill process. Long-running transactions
can cause bloat in the database by preventing vacuuming of dead rows.
  • Loading branch information
andrew-farries committed Feb 3, 2025
1 parent 8be6c12 commit b676e32
Show file tree
Hide file tree
Showing 3 changed files with 232 additions and 44 deletions.
213 changes: 184 additions & 29 deletions pkg/backfill/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,19 @@ import (
"fmt"
"time"

"github.com/lib/pq"
"github.com/xataio/pgroll/pkg/backfill/templates"
"github.com/xataio/pgroll/pkg/db"
"github.com/xataio/pgroll/pkg/schema"
)

type Backfill struct {
conn db.DB
stateSchema string
batchSize int
batchDelay time.Duration
callbacks []CallbackFn
conn db.DB
stateSchema string
batchSize int
batchDelay time.Duration
batchTablePrefix string
callbacks []CallbackFn
}

type CallbackFn func(done int64, total int64)
Expand All @@ -28,9 +30,10 @@ type CallbackFn func(done int64, total int64)
// not started until `Start` is invoked.
func New(conn db.DB, opts ...OptionFn) *Backfill {
b := &Backfill{
conn: conn,
batchSize: 1000,
stateSchema: "pgroll",
conn: conn,
batchSize: 1000,
stateSchema: "pgroll",
batchTablePrefix: "batch_",
}

for _, opt := range opts {
Expand All @@ -40,30 +43,67 @@ func New(conn db.DB, opts ...OptionFn) *Backfill {
return b
}

// Start updates all rows in the given table, in batches, using the
// following algorithm:
// 1. Get the primary key column for the table.
// 2. Get the first batch of rows from the table, ordered by the primary key.
// 3. Update each row in the batch, setting the value of the primary key column to itself.
// 4. Repeat steps 2 and 3 until no more rows are returned.
// Start backfills all rows in the given table, in batches, using the following
// algorithm:
//
// 1. Begin a REPEATABLE READ transaction and take a transaction snapshot. This
// transaction remains open for the duration of the backfill so that other
// transactions can use the snapshot.
// Then for each batch:
// 2. The primary key values for each batch of rows to be updated is INSERTed
// INTO a table. The transaction that does the INSERT INTO uses the snapshot
// taken in step 1 so that only rows present at the start of the backfill are
// visible.
// 3. The batch of rows is updated in the table being backfilled by setting
// their primary keys to themselves (a no-op update). This update causes any ON
// UPDATE trigger to fire for the affected rows.
func (bf *Backfill) Start(ctx context.Context, table *schema.Table) error {
// get the backfill column
// Get the columns to use as the identity columns for the backfill
identityColumns := getIdentityColumns(table)
if identityColumns == nil {
return NotPossibleError{Table: table.Name}
}

// Get the total number of rows in the table
total, err := getRowCount(ctx, bf.conn, table.Name)
if err != nil {
return fmt.Errorf("get row count for %q: %w", table.Name, err)
}

// Begin a REPEATABLE READ transaction. The transaction is used to take a
// snapshot that is passed to each batch select operation. This ensures that
// each batch selection acts only on rows visible at the start of the
// backfill process.
tx, err := bf.conn.RawConn().BeginTx(ctx, &sql.TxOptions{
Isolation: sql.LevelRepeatableRead,
})
if err != nil {
return fmt.Errorf("begin transaction: %w", err)
}
defer tx.Rollback()

// Create a transaction snapshot
snapshotID, err := getSnapshotID(ctx, tx)
if err != nil {
return fmt.Errorf("get snapshot ID: %w", err)
}

// Create a table to store the primary key values of each batch of rows
err = bf.createBatchTable(ctx, bf.conn, table.Name, identityColumns)
if err != nil {
return fmt.Errorf("create batch table: %w", err)
}
defer bf.dropBatchTable(ctx, bf.conn, table.Name)

// Create a batcher for the table.
b := batcher{
BatchConfig: templates.BatchConfig{
TableName: table.Name,
PrimaryKey: identityColumns,
BatchSize: bf.batchSize,
TableName: table.Name,
PrimaryKey: identityColumns,
BatchSize: bf.batchSize,
SnapshotID: snapshotID,
StateSchema: bf.stateSchema,
BatchTablePrefix: bf.batchTablePrefix,
},
}

Expand All @@ -73,11 +113,20 @@ func (bf *Backfill) Start(ctx context.Context, table *schema.Table) error {
cb(int64(batch*bf.batchSize), total)
}

if err := b.updateBatch(ctx, bf.conn); err != nil {
// Insert the (primary keys of) the next batch of rows to be updated into
// the batch table
err := b.selectBatch(ctx, bf.conn)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
break
}
return err
return fmt.Errorf("select batch: %w", err)
}

// Update the batch of rows
err = b.updateBatch(ctx, bf.conn)
if err != nil {
return fmt.Errorf("update batch: %w", err)
}

select {
Expand All @@ -90,6 +139,52 @@ func (bf *Backfill) Start(ctx context.Context, table *schema.Table) error {
return nil
}

// createBatchTable creates the table used to store the primary key values of each
// batch of rows to be updated during the backfill process.
func (bf *Backfill) createBatchTable(ctx context.Context, conn db.DB, tableName string, idColumns []string) error {
// Drop the batch table if it already exists
_, err := conn.ExecContext(ctx, fmt.Sprintf("DROP TABLE IF EXISTS %s.%s",
pq.QuoteIdentifier(bf.stateSchema),
pq.QuoteIdentifier(bf.batchTablePrefix+tableName)))
if err != nil {
return err
}

// Build the query to create the batch table
sql, err := templates.BuildCreateBatchTable(templates.CreateBatchTableConfig{
StateSchema: bf.stateSchema,
BatchTablePrefix: bf.batchTablePrefix,
TableName: tableName,
IDColumns: idColumns,
})
if err != nil {
return err
}

// Execute the query to create the batch table
_, err = conn.ExecContext(ctx, sql)
return err
}

// dropBatchTable drops the table used to store the primary key values of each
// batch of rows to be updated during the backfill process.
func (bf *Backfill) dropBatchTable(ctx context.Context, conn db.DB, tableName string) error {
_, err := conn.ExecContext(ctx, fmt.Sprintf("DROP TABLE IF EXISTS %s.%s",
pq.QuoteIdentifier(bf.stateSchema),
pq.QuoteIdentifier(bf.batchTablePrefix+tableName)))
return err
}

// getSnapshotID exports a snapshot from the given transaction.
func getSnapshotID(ctx context.Context, tx *sql.Tx) (string, error) {
var snapshotID string

err := tx.QueryRowContext(ctx, "SELECT pg_export_snapshot()").
Scan(&snapshotID)

return snapshotID, err
}

// getRowCount will attempt to get the row count for the given table. It first attempts to get an
// estimate and if that is zero, falls back to a full table scan.
func getRowCount(ctx context.Context, conn db.DB, tableName string) (int64, error) {
Expand Down Expand Up @@ -132,7 +227,9 @@ func getRowCount(ctx context.Context, conn db.DB, tableName string) (int64, erro
return total, nil
}

// IsPossible will return an error if the backfill operation is not supported.
// IsPossible will return an error if the backfill operation is not supported
// on the table. A backfill is not possible if the table does not have suitable
// identity columns.
func IsPossible(table *schema.Table) error {
cols := getIdentityColumns(table)
if cols == nil {
Expand All @@ -142,7 +239,9 @@ func IsPossible(table *schema.Table) error {
return nil
}

// getIdentityColumn will return a column suitable for use in a backfill operation.
// getIdentityColumn will return the identity columns to use for the backfill.
// If the table has a primary key, it will use that. Otherwise, if the table
// has a UNIQUE, NOT NULL column, it will use that.
func getIdentityColumns(table *schema.Table) []string {
pks := table.GetPrimaryKey()
if len(pks) != 0 {
Expand Down Expand Up @@ -170,24 +269,80 @@ type batcher struct {
templates.BatchConfig
}

func (b *batcher) selectBatch(ctx context.Context, conn db.DB) error {
opts := &sql.TxOptions{Isolation: sql.LevelRepeatableRead}

return conn.WithRetryableTransaction(ctx, opts, func(ctx context.Context, tx *sql.Tx) error {
// Set the transaction snapshot. This ensures that row selection for each
// batch sees only rows that were present in the table at the start of the
// backfill process.
_, err := tx.ExecContext(ctx, fmt.Sprintf("SET TRANSACTION SNAPSHOT %s",
pq.QuoteLiteral(b.SnapshotID)))
if err != nil {
return err
}

// Truncate the batch table ready for the next batch of rows
_, err = tx.ExecContext(ctx, fmt.Sprintf("TRUNCATE TABLE %s.%s",
pq.QuoteIdentifier(b.StateSchema),
pq.QuoteIdentifier(b.BatchTablePrefix+b.TableName)))
if err != nil {
return err
}

// Build the query to retrieve the next batch of rows
query, err := templates.BuildSelectBatchInto(b.BatchConfig)
if err != nil {
return err
}

// Execute the query to select the next batch of rows into the batch table
result, err := tx.ExecContext(ctx, query)
if err != nil {
return err
}

// Get the number of rows inserted by the query
n, err := result.RowsAffected()
if err != nil {
return err
}

// If no rows were inserted, return a no rows error
if n == 0 {
return sql.ErrNoRows
}

return nil
})
}

// updateBatch takes the next batch of rows to be updated from the batch table
// and updates them
func (b *batcher) updateBatch(ctx context.Context, conn db.DB) error {
return conn.WithRetryableTransaction(ctx, nil, func(ctx context.Context, tx *sql.Tx) error {
// Build the query to update the next batch of rows
sql, err := templates.BuildSQL(b.BatchConfig)
// Build the statement to update the batch of rows
query, err := templates.BuildUpdateBatch(b.BatchConfig)
if err != nil {
return err
}

// Execute the query to update the next batch of rows and update the last PK
// value for the next batch
// Execute the query to update the batch of rows
row := tx.QueryRowContext(ctx, query)

// Initialize the LastValue slice if it is nil
if b.LastValue == nil {
b.LastValue = make([]string, len(b.PrimaryKey))
}
wrapper := make([]any, len(b.LastValue))
for i := range b.LastValue {

// Create a slice of pointers to the LastValue slice
wrapper := make([]interface{}, len(b.LastValue))
for i := range wrapper {
wrapper[i] = &b.LastValue[i]
}
err = tx.QueryRowContext(ctx, sql).Scan(wrapper...)

// Retrieve the last value of the primary key columns for the batch
err = row.Scan(wrapper...)
if err != nil {
return err
}
Expand Down
32 changes: 25 additions & 7 deletions pkg/backfill/templates/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,35 @@ import (
)

type BatchConfig struct {
TableName string
PrimaryKey []string
LastValue []string
BatchSize int
TableName string
PrimaryKey []string
LastValue []string
BatchSize int
SnapshotID string
StateSchema string
BatchTablePrefix string
}

func BuildSQL(cfg BatchConfig) (string, error) {
return executeTemplate("sql", SQL, cfg)
type CreateBatchTableConfig struct {
StateSchema string
BatchTablePrefix string
TableName string
IDColumns []string
}

func executeTemplate(name, content string, cfg BatchConfig) (string, error) {
func BuildCreateBatchTable(cfg CreateBatchTableConfig) (string, error) {
return executeTemplate("create_batch_table", CreateBatchTable, cfg)
}

func BuildSelectBatchInto(cfg BatchConfig) (string, error) {
return executeTemplate("select_batch_into", SelectBatchInto, cfg)
}

func BuildUpdateBatch(cfg BatchConfig) (string, error) {
return executeTemplate("update_batch", UpdateBatch, cfg)
}

func executeTemplate(name, content string, cfg any) (string, error) {
ql := pq.QuoteLiteral
qi := pq.QuoteIdentifier

Expand Down
31 changes: 23 additions & 8 deletions pkg/backfill/templates/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,37 @@

package templates

const SQL = `WITH batch AS
(
// CreateBatchTable is a template for creating a batch table. The batch table
// is used to store the primary key values of each batch of rows to be
// backfilled.
const CreateBatchTable = `CREATE UNLOGGED TABLE IF NOT EXISTS
{{ .StateSchema | qi }}.{{ printf "%s%s" .BatchTablePrefix .TableName | qi}} AS
SELECT {{ commaSeparate (quoteIdentifiers .IDColumns) }}
FROM {{ .TableName | qi }}
WHERE false
`

// SelectBatchInto is a template for selecting the primary key values of the
// rows to be backfilled and inserting them into the batch table.
const SelectBatchInto = `INSERT INTO {{ .StateSchema | qi }}.{{ printf "%s%s" .BatchTablePrefix .TableName | qi }}
({{ commaSeparate (quoteIdentifiers .PrimaryKey) }})
SELECT {{ commaSeparate (quoteIdentifiers .PrimaryKey) }}
FROM {{ .TableName | qi}}
{{ if .LastValue -}}
FROM {{ .TableName | qi }}
{{ if .LastValue | len -}}
WHERE ({{ commaSeparate (quoteIdentifiers .PrimaryKey) }}) > ({{ commaSeparate (quoteLiterals .LastValue) }})
{{ end -}}
ORDER BY {{ commaSeparate (quoteIdentifiers .PrimaryKey) }}
LIMIT {{ .BatchSize }}
FOR NO KEY UPDATE
),
update AS
`

// UpdateBatch is a template for updating those rows in the target table having
// primary keys in the batch table. Each update sets the target table rows'
// primary keys to themselves, triggering a no-op update to the row.
const UpdateBatch = `WITH update AS
(
UPDATE {{ .TableName | qi }}
SET {{ updateSetClause .TableName .PrimaryKey }}
FROM batch
FROM {{ .StateSchema | qi }}.{{ printf "%s%s" .BatchTablePrefix .TableName | qi }} AS batch
WHERE {{ updateWhereClause .TableName .PrimaryKey }}
RETURNING {{ updateReturnClause .TableName .PrimaryKey }}
)
Expand Down

0 comments on commit b676e32

Please sign in to comment.