From 8c7d261f9056e984b4448224722b895dac071296 Mon Sep 17 00:00:00 2001 From: Kevin Cao Date: Fri, 27 Dec 2024 16:23:24 -0500 Subject: [PATCH] backup: implement backup compactions after incremental backups This commit supports compacting backups after each incremental backup if the chain has reached a certain threshold. Epic: none Release note: None --- pkg/backup/BUILD.bazel | 1 + pkg/backup/backup_compaction.go | 598 ++++++++++++++++++++++ pkg/backup/backup_job.go | 5 + pkg/backup/backupencryption/encryption.go | 67 ++- pkg/backup/restore_data_processor.go | 2 +- 5 files changed, 644 insertions(+), 29 deletions(-) create mode 100644 pkg/backup/backup_compaction.go diff --git a/pkg/backup/BUILD.bazel b/pkg/backup/BUILD.bazel index dfeb6f73839a..cf6f1b284637 100644 --- a/pkg/backup/BUILD.bazel +++ b/pkg/backup/BUILD.bazel @@ -6,6 +6,7 @@ go_library( srcs = [ "alter_backup_planning.go", "alter_backup_schedule.go", + "backup_compaction.go", "backup_job.go", "backup_metrics.go", "backup_planning.go", diff --git a/pkg/backup/backup_compaction.go b/pkg/backup/backup_compaction.go new file mode 100644 index 000000000000..211a7ee70916 --- /dev/null +++ b/pkg/backup/backup_compaction.go @@ -0,0 +1,598 @@ +// Copyright 2024 The Cockroach Authors. +// +// Use of this software is governed by the CockroachDB Software License +// included in the /LICENSE file. + +package backup + +import ( + "bytes" + "context" + "strings" + "time" + + "github.com/cockroachdb/cockroach/pkg/backup/backupbase" + "github.com/cockroachdb/cockroach/pkg/backup/backupdest" + "github.com/cockroachdb/cockroach/pkg/backup/backupencryption" + "github.com/cockroachdb/cockroach/pkg/backup/backupinfo" + "github.com/cockroachdb/cockroach/pkg/backup/backuppb" + "github.com/cockroachdb/cockroach/pkg/backup/backupsink" + "github.com/cockroachdb/cockroach/pkg/backup/backuputils" + "github.com/cockroachdb/cockroach/pkg/build" + "github.com/cockroachdb/cockroach/pkg/ccl/storageccl" + "github.com/cockroachdb/cockroach/pkg/cloud" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/multitenant/mtinfopb" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" + "github.com/cockroachdb/cockroach/pkg/sql/isql" + "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/cockroachdb/errors" + "github.com/gogo/protobuf/types" +) + +var backupCompactionThreshold = settings.RegisterIntSetting( + settings.ApplicationLevel, + "bulkio.backup.unsafe_compaction_threshold", + "backup chain length threshold at which backup compaction is triggered "+ + "(0 to disable compactions)", + 0, + settings.WithUnsafe, +) + +func maybeCompactIncrementals( + ctx context.Context, + execCtx sql.JobExecContext, + lastIncDetails jobspb.BackupDetails, + jobID jobspb.JobID, +) error { + threshold := backupCompactionThreshold.Get(&execCtx.ExecCfg().Settings.SV) + if threshold == 0 || !lastIncDetails.Destination.Exists || lastIncDetails.RevisionHistory { + return nil + } + execCfg := execCtx.ExecCfg() + + resolvedBaseDirs, resolvedIncDirs, _, err := resolveBackupDirs( + ctx, execCtx, lastIncDetails.Destination.To, + lastIncDetails.Destination.IncrementalStorage, + lastIncDetails.Destination.Subdir, + ) + mkStore := execCfg.DistSQLSrv.ExternalStorageFromURI + baseStores, baseCleanup, err := backupdest.MakeBackupDestinationStores( + ctx, execCtx.User(), mkStore, resolvedBaseDirs, + ) + if err != nil { + return err + } + defer func() { + if err := baseCleanup(); err != nil { + log.Warningf(ctx, "failed to cleanup base backup stores: %+v", err) + } + }() + incStores, incCleanup, err := backupdest.MakeBackupDestinationStores( + ctx, execCtx.User(), mkStore, resolvedIncDirs, + ) + if err != nil { + return err + } + defer func() { + if err := incCleanup(); err != nil { + log.Warningf(ctx, "failed to cleanup incremental backup stores: %+v", err) + } + }() + + ioConf := baseStores[0].ExternalIOConf() + kmsEnv := backupencryption.MakeBackupKMSEnv( + execCfg.Settings, + &ioConf, + execCfg.InternalDB, + execCtx.User(), + ) + encryption, err := backupencryption.GetEncryptionFromBaseStore( + ctx, baseStores[0], *lastIncDetails.EncryptionOptions, &kmsEnv, + ) + if err != nil { + return err + } + mem := execCfg.RootMemoryMonitor.MakeBoundAccount() + defer mem.Close(ctx) + + _, manifests, localityInfo, memReserved, err := backupdest.ResolveBackupManifests( + ctx, &mem, baseStores, incStores, mkStore, resolvedBaseDirs, + resolvedIncDirs, lastIncDetails.EndTime, encryption, &kmsEnv, + execCtx.User(), false, + ) + if err != nil { + return err + } + defer func() { + mem.Shrink(ctx, memReserved) + }() + + if int64(len(manifests)) < threshold { + return nil + } + + return compactIncrementals( + ctx, execCtx, lastIncDetails, jobID, manifests, encryption, &kmsEnv, localityInfo, + ) +} + +func compactIncrementals( + ctx context.Context, + execCtx sql.JobExecContext, + oldDetails jobspb.BackupDetails, + jobID jobspb.JobID, + backupChain []backuppb.BackupManifest, + encryption *jobspb.BackupEncryptionOptions, + kmsEnv cloud.KMSEnv, + localityInfo []jobspb.RestoreDetails_BackupLocalityInfo, +) error { + newDetails, err := makeCompactionBackupDetails( + oldDetails, backupChain[1:], + ) + if err != nil { + return err + } + execCfg := execCtx.ExecCfg() + dest, err := backupdest.ResolveDest( + ctx, + execCtx.User(), + newDetails.Destination, + newDetails.EndTime.AddDuration(10*time.Millisecond), + execCfg, + ) + if err != nil { + return err + } + if err := maybeWriteBackupLock(ctx, execCtx, dest, jobID); err != nil { + return err + } + + var tenantSpans []roachpb.Span + var tenantInfos []mtinfopb.TenantInfoWithUsage + insqlDB := execCtx.ExecCfg().InternalDB + if err := insqlDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { + tenantSpans, tenantInfos, err = getTenantInfo(ctx, execCtx.ExecCfg().Codec, txn, newDetails) + return err + }); err != nil { + return err + } + details, backupManifest, err := getBackupDetailAndManifest( + ctx, execCtx.ExecCfg(), tenantSpans, tenantInfos, newDetails, execCtx.User(), dest, + ) + if err != nil { + return err + } + // The details/manifest returned by getBackupDetailAndManifest have their + // start and end times set based on the job details. We need to update them to + // reflect the compacted backup's start and end times. + details.StartTime, details.EndTime = newDetails.StartTime, newDetails.EndTime + backupManifest.StartTime, backupManifest.EndTime = newDetails.StartTime, newDetails.EndTime + + backupManifest.IntroducedSpans, err = compactIntroducedSpans(ctx, *backupManifest, backupChain) + if err != nil { + return err + } + + if err := backupinfo.WriteBackupManifestCheckpoint( + ctx, dest.DefaultURI, encryption, kmsEnv, + backupManifest, execCtx.ExecCfg(), execCtx.User(), + ); err != nil { + return err + } + + chainToCompact := backupChain[1:] + layerToIterFactory, err := backupinfo.GetBackupManifestIterFactories( + ctx, execCtx.ExecCfg().DistSQLSrv.ExternalStorage, chainToCompact, encryption, kmsEnv, + ) + if err != nil { + return err + } + backupLocalityMap, err := makeBackupLocalityMap(localityInfo, execCtx.User()) + if err != nil { + return err + } + + introducedSpanFrontier, err := createIntroducedSpanFrontier(chainToCompact, backupManifest.EndTime) + if err != nil { + return err + } + defer introducedSpanFrontier.Release() + + targetSize := targetRestoreSpanSize.Get(&execCtx.ExecCfg().Settings.SV) + maxFiles := maxFileCount.Get(&execCtx.ExecCfg().Settings.SV) + + var fsc fileSpanComparator = &exclusiveEndKeyComparator{} + filter, err := makeSpanCoveringFilter( + backupManifest.Spans, + []jobspb.RestoreProgress_FrontierEntry{}, + introducedSpanFrontier, + targetSize, + maxFiles, + ) + + spanCh := make(chan execinfrapb.RestoreSpanEntry, 1000) + genSpan := func(ctx context.Context, spanCh chan execinfrapb.RestoreSpanEntry) error { + defer close(spanCh) + backupCodec, err := backupinfo.MakeBackupCodec(chainToCompact) + if err != nil { + return err + } + catalogDescs := make([]catalog.Descriptor, 0, len(backupManifest.Descriptors)) + var tables []catalog.TableDescriptor + for _, desc := range backupManifest.Descriptors { + catDesc := backupinfo.NewDescriptorForManifest(&desc) + catalogDescs = append(catalogDescs, catDesc) + if table, ok := catDesc.(catalog.TableDescriptor); ok { + tables = append(tables, table) + } + } + spans, err := spansForAllRestoreTableIndexes( + backupCodec, + tables, + nil, /* revs */ + false, /* schemaOnly */ + false, /* forOnlineRestore */ + ) + if err != nil { + return err + } + return errors.Wrap(generateAndSendImportSpans( + ctx, + spans, + chainToCompact, + layerToIterFactory, + backupLocalityMap, + filter, + fsc, + spanCh, + ), "generate and send import spans") + } + progCh := make(chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress) + processProg := func( + ctx context.Context, + progCh chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress, + backupManifest *backuppb.BackupManifest, + ) { + // When a processor is done exporting a span, it will send a progress update + // to progCh. + var numBackedUpFiles int64 + for progress := range progCh { + var progDetails backuppb.BackupManifest_Progress + if err := types.UnmarshalAny(&progress.ProgressDetails, &progDetails); err != nil { + log.Errorf(ctx, "unable to unmarshal backup progress details: %+v", err) + } + if backupManifest.RevisionStartTime.Less(progDetails.RevStartTime) { + backupManifest.RevisionStartTime = progDetails.RevStartTime + } + for _, file := range progDetails.Files { + backupManifest.Files = append(backupManifest.Files, file) + backupManifest.EntryCounts.Add(file.EntryCounts) + numBackedUpFiles++ + } + + // Update the per-component progress maintained by the job profiler. + perComponentProgress := make(map[execinfrapb.ComponentID]float32) + component := execinfrapb.ComponentID{ + SQLInstanceID: progress.NodeID, + FlowID: progress.FlowID, + Type: execinfrapb.ComponentID_PROCESSOR, + } + for processorID, fraction := range progress.CompletedFraction { + component.ID = processorID + perComponentProgress[component] = fraction + } + } + } + + store, err := execCtx.ExecCfg().DistSQLSrv.ExternalStorageFromURI(ctx, dest.DefaultURI, execCtx.User()) + if err != nil { + return err + } + defer store.Close() + var tasks []func(context.Context) error + tasks = append(tasks, + func(ctx context.Context) error { + return genSpan(ctx, spanCh) + }) + tasks = append(tasks, func(ctx context.Context) error { + return runCompaction(ctx, execCtx, encryption, spanCh, newDetails, backupManifest, progCh, store) + }) + tasks = append(tasks, func(ctx context.Context) error { + processProg(ctx, progCh, backupManifest) + return nil + }) + + if err := ctxgroup.GoAndWait(ctx, tasks...); err != nil { + return err + } + backupID := uuid.MakeV4() + backupManifest.ID = backupID + + if err := backupinfo.WriteBackupManifest(ctx, store, backupbase.BackupManifestName, + encryption, kmsEnv, backupManifest); err != nil { + return err + } + if backupinfo.WriteMetadataWithExternalSSTsEnabled.Get(&execCtx.ExecCfg().Settings.SV) { + if err := backupinfo.WriteMetadataWithExternalSSTs(ctx, store, encryption, + kmsEnv, backupManifest); err != nil { + return err + } + } + + statsTable := getTableStatsForBackup(ctx, execCtx.ExecCfg().TableStatsCache, backupManifest.Descriptors) + if err := backupinfo.WriteTableStatistics(ctx, store, encryption, kmsEnv, &statsTable); err != nil { + return err + } + + if backupinfo.WriteMetadataSST.Get(&execCtx.ExecCfg().Settings.SV) { + if err := backupinfo.WriteBackupMetadataSST(ctx, store, encryption, kmsEnv, backupManifest, + statsTable.Statistics); err != nil { + err = errors.Wrap(err, "writing forward-compat metadata sst") + if !build.IsRelease() { + return err + } + log.Warningf(ctx, "%+v", err) + } + } + return nil +} + +func runCompaction( + ctx context.Context, + execCtx sql.JobExecContext, + encryption *jobspb.BackupEncryptionOptions, + entries chan execinfrapb.RestoreSpanEntry, + details jobspb.BackupDetails, + manifest *backuppb.BackupManifest, + progCh chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress, + store cloud.ExternalStorage, +) error { + defer close(progCh) + var encryptionOptions *kvpb.FileEncryptionOptions + if encryption != nil { + encryptionOptions = &kvpb.FileEncryptionOptions{Key: encryption.Key} + } + sinkConf := backupsink.SSTSinkConf{ + ID: execCtx.ExecCfg().DistSQLSrv.NodeID.SQLInstanceID(), + Enc: encryptionOptions, + ProgCh: progCh, + Settings: &execCtx.ExecCfg().Settings.SV, + ElideMode: manifest.ElidedPrefix, + } + sink := backupsink.MakeSSTSinkKeyWriter(sinkConf, store, nil) + defer func() { + if err := sink.Flush(ctx); err != nil { + log.Warningf(ctx, "failed to flush sink: %v", err) + logClose(ctx, sink, "SST sink") + } + }() + + for { + select { + case entry, ok := <-entries: + if !ok { + return sink.Flush(ctx) + } + + sstIter, err := openSSTs(ctx, execCtx, entry, encryptionOptions, details) + if err != nil { + return errors.Wrap(err, "opening SSTs") + } + + if err := processSpanEntry(ctx, sstIter, sink); err != nil { + return errors.Wrap(err, "processing span entry") + } + } + } +} + +func processSpanEntry( + ctx context.Context, sstIter mergedSST, sink *backupsink.SSTSinkKeyWriter, +) error { + defer sstIter.cleanup() + entry := sstIter.entry + startKey, endKey := storage.MVCCKey{Key: entry.Span.Key}, storage.MVCCKey{Key: entry.Span.EndKey} + prefix, err := backupsink.ElidedPrefix(entry.Span.Key, entry.ElidedPrefix) + if err != nil { + return err + } + if prefix != nil { + startKey.Key = bytes.TrimPrefix(startKey.Key, prefix) + endKey.Key = bytes.TrimPrefix(endKey.Key, prefix) + } + iter := sstIter.iter + if err := sink.Reset(ctx, sstIter.entry.Span); err != nil { + return err + } + scratch := make([]byte, 0, len(prefix)) + scratch = append(scratch, prefix...) + for iter.SeekGE(startKey); ; iter.NextKey() { + var key storage.MVCCKey + if ok, err := iter.Valid(); err != nil { + return err + } else if key = iter.UnsafeKey(); !ok || !key.Less(endKey) { + break + } + value, err := iter.UnsafeValue() + if err != nil { + return err + } + fullKey := key + scratch = append(scratch[:len(prefix)], key.Key...) + fullKey.Key = scratch + if err := sink.WriteKey(ctx, fullKey, value); err != nil { + return err + } + } + return nil +} + +func openSSTs( + ctx context.Context, + execCtx sql.JobExecContext, + entry execinfrapb.RestoreSpanEntry, + encryptionOptions *kvpb.FileEncryptionOptions, + details jobspb.BackupDetails, +) (mergedSST, error) { + var dirs []cloud.ExternalStorage + storeFiles := make([]storageccl.StoreFile, 0, len(entry.Files)) + for idx := 0; idx < len(entry.Files); idx++ { + file := entry.Files[idx] + dir, err := execCtx.ExecCfg().DistSQLSrv.ExternalStorage(ctx, file.Dir) + if err != nil { + return mergedSST{}, err + } + dirs = append(dirs, dir) + storeFiles = append(storeFiles, storageccl.StoreFile{Store: dir, FilePath: file.Path}) + } + iterOpts := storage.IterOptions{ + // TODO (kev-cao): Come back and update this to range keys when + // SSTSinkKeyWriter has been updated to support range keys. + KeyTypes: storage.IterKeyTypePointsOnly, + LowerBound: keys.LocalMax, + UpperBound: keys.MaxKey, + } + iter, err := storageccl.ExternalSSTReader(ctx, storeFiles, encryptionOptions, iterOpts) + if err != nil { + return mergedSST{}, err + } + compactionIter, err := storage.NewBackupCompactionIterator(iter, details.EndTime) + if err != nil { + return mergedSST{}, err + } + return mergedSST{ + entry: entry, + iter: compactionIter, + cleanup: func() { + log.VInfof(ctx, 1, "finished with and closing %d files in span %d %v", len(entry.Files), entry.ProgressIdx, entry.Span.String()) + compactionIter.Close() + for _, dir := range dirs { + if err := dir.Close(); err != nil { + log.Warningf(ctx, "close export storage failed: %v", err) + } + } + }, + completeUpTo: details.EndTime, + }, nil +} + +// makeCompactionBackupDetails takes a chain of backups that are to be +// compacted and returns a corresponding BackupDetails for the compacted +// backup. It also takes in the job details for the last backup in its chain. +func makeCompactionBackupDetails( + details jobspb.BackupDetails, manifests []backuppb.BackupManifest, +) (jobspb.BackupDetails, error) { + if len(manifests) == 0 { + return jobspb.BackupDetails{}, errors.New("no backup manifests to compact") + } + details.StartTime = manifests[0].StartTime + details.EndTime = manifests[len(manifests)-1].EndTime + return details, nil +} + +// compactIntroducedSpans takes a compacted backup manifest and the full chain of backups it belongs to and +// computes the introduced spans for the compacted backup. +func compactIntroducedSpans( + ctx context.Context, manifest backuppb.BackupManifest, backupChain []backuppb.BackupManifest, +) (roachpb.Spans, error) { + if err := checkCoverage(ctx, manifest.Spans, backupChain); err != nil { + return roachpb.Spans{}, err + } + return filterSpans(manifest.Spans, backupChain[0].Spans), nil +} + +// ResolveBackupSubdir returns the resolved base full backup subdirectory from a +// specified sub-directory. subdir may be a specified path or the string +// "LATEST" to resolve the latest subdirectory. +func resolveBackupSubdir( + ctx context.Context, p sql.JobExecContext, mainFullBackupURI string, subdir string, +) (string, error) { + if strings.EqualFold(subdir, backupbase.LatestFileName) { + latest, err := backupdest.ReadLatestFile(ctx, mainFullBackupURI, + p.ExecCfg().DistSQLSrv.ExternalStorageFromURI, p.User()) + if err != nil { + return "", err + } + return latest, nil + } + return subdir, nil +} + +// ResolveBackupDirs resolves the sub-directory, base backup directory, and +// incremental backup directories for a backup collection. incrementalURIs may +// be empty if an incremental location is not specified. subdir can be a resolved +// sub-directory or the string "LATEST" to resolve the latest sub-directory. +func resolveBackupDirs( + ctx context.Context, + p sql.JobExecContext, + collectionURIs []string, + incrementalURIs []string, + subdir string, +) (resolvedBaseDirs, + resolvedIncDirs []string, resolvedSubdir string, err error) { + resolvedSubdir, err = resolveBackupSubdir(ctx, p, collectionURIs[0], subdir) + if err != nil { + return + } + resolvedBaseDirs, err = backuputils.AppendPaths(collectionURIs[:], resolvedSubdir) + if err != nil { + return + } + resolvedIncDirs, err = backupdest.ResolveIncrementalsBackupLocation( + ctx, p.User(), p.ExecCfg(), incrementalURIs, collectionURIs, resolvedSubdir, + ) + return +} + +// maybeWriteBackupLock attempts to write a backup lock for the given jobID, if +// it does not already exist. If another backup lock file for another job is +// found, it will return an error. +func maybeWriteBackupLock( + ctx context.Context, + execCtx sql.JobExecContext, + dest backupdest.ResolvedDestination, + jobID jobspb.JobID, +) error { + foundLockFile, err := backupinfo.CheckForBackupLock( + ctx, + execCtx.ExecCfg(), + dest.DefaultURI, + jobID, + execCtx.User(), + ) + if err != nil { + return err + } + + if !foundLockFile { + if err := backupinfo.CheckForPreviousBackup( + ctx, + execCtx.ExecCfg(), + dest.DefaultURI, + jobID, + execCtx.User(), + ); err != nil { + return err + } + if err := backupinfo.WriteBackupLock( + ctx, + execCtx.ExecCfg(), + dest.DefaultURI, + jobID, + execCtx.User(), + ); err != nil { + return err + } + } + return nil +} diff --git a/pkg/backup/backup_job.go b/pkg/backup/backup_job.go index 7dc4febd99d3..5b7afc824899 100644 --- a/pkg/backup/backup_job.go +++ b/pkg/backup/backup_job.go @@ -592,6 +592,7 @@ func (b *backupResumer) DumpTraceAfterRun() bool { func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error { // The span is finished by the registry executing the job. details := b.job.Details().(jobspb.BackupDetails) + origDetails := details p := execCtx.(sql.JobExecContext) if err := maybeRelocateJobExecution(ctx, b.job.ID(), p, details.ExecutionLocality, "BACKUP"); err != nil { @@ -999,6 +1000,10 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error { logutil.LogJobCompletion(ctx, b.getTelemetryEventType(), b.job.ID(), true, nil, res.Rows) } + if err := maybeCompactIncrementals(ctx, p, origDetails, b.job.ID()); err != nil { + return err + } + return b.maybeNotifyScheduledJobCompletion( ctx, jobs.StatusSucceeded, p.ExecCfg().JobsKnobs(), p.ExecCfg().InternalDB, ) diff --git a/pkg/backup/backupencryption/encryption.go b/pkg/backup/backupencryption/encryption.go index 2140b925f489..9f8bd151d3a3 100644 --- a/pkg/backup/backupencryption/encryption.go +++ b/pkg/backup/backupencryption/encryption.go @@ -341,39 +341,50 @@ func GetEncryptionFromBase( encryptionParams jobspb.BackupEncryptionOptions, kmsEnv cloud.KMSEnv, ) (*jobspb.BackupEncryptionOptions, error) { + if encryptionParams.Mode == jobspb.EncryptionMode_None { + return nil, nil + } + exportStore, err := makeCloudStorage(ctx, baseBackupURI, user) + if err != nil { + return nil, err + } + defer exportStore.Close() + return GetEncryptionFromBaseStore(ctx, exportStore, encryptionParams, kmsEnv) +} + +// GetEncryptionFromBaseStore retrieves the encryption options of a base backup store. +func GetEncryptionFromBaseStore( + ctx context.Context, + baseStore cloud.ExternalStorage, + encryptionParams jobspb.BackupEncryptionOptions, + kmsEnv cloud.KMSEnv, +) (*jobspb.BackupEncryptionOptions, error) { + if encryptionParams.Mode == jobspb.EncryptionMode_None { + return nil, nil + } + opts, err := ReadEncryptionOptions(ctx, baseStore) var encryptionOptions *jobspb.BackupEncryptionOptions - if encryptionParams.Mode != jobspb.EncryptionMode_None { - exportStore, err := makeCloudStorage(ctx, baseBackupURI, user) - if err != nil { - return nil, err + switch encryptionParams.Mode { + case jobspb.EncryptionMode_Passphrase: + encryptionOptions = &jobspb.BackupEncryptionOptions{ + Mode: jobspb.EncryptionMode_Passphrase, + Key: storageccl.GenerateKey([]byte(encryptionParams.RawPassphrase), opts[0].Salt), + } + case jobspb.EncryptionMode_KMS: + var defaultKMSInfo *jobspb.BackupEncryptionOptions_KMSInfo + for _, encFile := range opts { + defaultKMSInfo, err = ValidateKMSURIsAgainstFullBackup(ctx, encryptionParams.RawKmsUris, + NewEncryptedDataKeyMapFromProtoMap(encFile.EncryptedDataKeyByKMSMasterKeyID), kmsEnv) + if err == nil { + break + } } - defer exportStore.Close() - opts, err := ReadEncryptionOptions(ctx, exportStore) if err != nil { return nil, err } - - switch encryptionParams.Mode { - case jobspb.EncryptionMode_Passphrase: - encryptionOptions = &jobspb.BackupEncryptionOptions{ - Mode: jobspb.EncryptionMode_Passphrase, - Key: storageccl.GenerateKey([]byte(encryptionParams.RawPassphrase), opts[0].Salt), - } - case jobspb.EncryptionMode_KMS: - var defaultKMSInfo *jobspb.BackupEncryptionOptions_KMSInfo - for _, encFile := range opts { - defaultKMSInfo, err = ValidateKMSURIsAgainstFullBackup(ctx, encryptionParams.RawKmsUris, - NewEncryptedDataKeyMapFromProtoMap(encFile.EncryptedDataKeyByKMSMasterKeyID), kmsEnv) - if err == nil { - break - } - } - if err != nil { - return nil, err - } - encryptionOptions = &jobspb.BackupEncryptionOptions{ - Mode: jobspb.EncryptionMode_KMS, - KMSInfo: defaultKMSInfo} + encryptionOptions = &jobspb.BackupEncryptionOptions{ + Mode: jobspb.EncryptionMode_KMS, + KMSInfo: defaultKMSInfo, } } return encryptionOptions, nil diff --git a/pkg/backup/restore_data_processor.go b/pkg/backup/restore_data_processor.go index 48472ea34429..1616ca92b84c 100644 --- a/pkg/backup/restore_data_processor.go +++ b/pkg/backup/restore_data_processor.go @@ -289,7 +289,7 @@ func inputReader( type mergedSST struct { entry execinfrapb.RestoreSpanEntry - iter *storage.ReadAsOfIterator + iter storage.SimpleMVCCIterator cleanup func() completeUpTo hlc.Timestamp }